This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 5ec167b8 Node js push consumer (#1201)
5ec167b8 is described below
commit 5ec167b860da43a20993899e563c55486ce316a8
Author: zhaohai <[email protected]>
AuthorDate: Thu Mar 19 15:28:04 2026 +0800
Node js push consumer (#1201)
* add push consumer
* update check
* add example
* add log
* add setInvisibleDuration
* update proto
* setAutoRenew true
* The fix adheres to best practices for both TypeScript and ESLint
* Better load balancing: Random start index avoids hotspot concentration.
Stronger robustness: Parameter validation prevents illegal input.
Easy to debug: detailed startup and shutdown logs.
No bugs: Fixed infinite recursion issue.
* 详细的启动和关闭日志,清晰的错误堆栈,准确的重试追踪。
* Calling send() before startup throws an error. Calling send() after
startup works normally. Calling send() after shutdown throws an error. Check
the status before the transaction starts.
* The commit log is complete and clear, the rollback log is complete and
clear, and the error handling is comprehensive.
* Fix the issue message for review.
* Fix null pointer exceptions and clear the timer after exception catching.
---
nodejs/examples/PushConsumer.ts | 165 ++++++++
nodejs/examples/PushConsumerQuickStart.ts | 65 +++
nodejs/proto/apache/rocketmq/v2/definition.proto | 6 +
nodejs/proto/apache/rocketmq/v2/service.proto | 32 ++
nodejs/src/client/BaseClient.ts | 14 +
nodejs/src/client/Logger.ts | 1 +
nodejs/src/client/TelemetrySession.ts | 24 +-
.../{client/Logger.ts => consumer/Assignment.ts} | 34 +-
.../{client/Logger.ts => consumer/Assignments.ts} | 41 +-
nodejs/src/consumer/{index.ts => ConsumeResult.ts} | 9 +-
nodejs/src/consumer/ConsumeService.ts | 54 +++
.../{client/Logger.ts => consumer/ConsumeTask.ts} | 35 +-
nodejs/src/consumer/Consumer.ts | 18 +
nodejs/src/consumer/FifoConsumeService.ts | 50 +++
.../src/consumer/{index.ts => MessageListener.ts} | 11 +-
nodejs/src/consumer/ProcessQueue.ts | 326 ++++++++++++++
nodejs/src/consumer/PushConsumer.ts | 470 +++++++++++++++++++++
nodejs/src/consumer/PushSubscriptionSettings.ts | 123 ++++++
nodejs/src/consumer/SimpleConsumer.ts | 27 +-
.../StandardConsumeService.ts} | 39 +-
nodejs/src/consumer/index.ts | 11 +
nodejs/src/producer/Producer.ts | 87 +++-
nodejs/src/producer/Transaction.ts | 16 +
23 files changed, 1552 insertions(+), 106 deletions(-)
diff --git a/nodejs/examples/PushConsumer.ts b/nodejs/examples/PushConsumer.ts
new file mode 100644
index 00000000..ea53a377
--- /dev/null
+++ b/nodejs/examples/PushConsumer.ts
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * PushConsumer Example
+ *
+ * This example demonstrates how to use PushConsumer to consume messages.
+ * PushConsumer is a push-mode consumer that actively pulls messages from the
server
+ * and pushes them to the listener for processing.
+ */
+
+import { PushConsumer, ConsumeResult } from '../src';
+import type { MessageView } from '../src';
+
+// Get configuration from environment variables
+const ACCESS_KEY = process.env.ROCKETMQ_ACCESS_KEY || 'yourAccessKey';
+const SECRET_KEY = process.env.ROCKETMQ_SECRET_KEY || 'yourSecretKey';
+const ENDPOINT = process.env.ROCKETMQ_ENDPOINT || 'localhost:8080';
+
+async function main() {
+ console.log('========== PushConsumer Example ==========');
+
+ // 1. Define message listener
+ const messageListener = {
+ async consume(messageView: MessageView): Promise<ConsumeResult> {
+ // Process received messages here
+ console.log('Received message:', {
+ messageId: messageView.messageId,
+ topic: messageView.topic,
+ tag: messageView.tag,
+ keys: messageView.keys,
+ body: messageView.body.toString('utf-8'),
+ deliveryAttempt: messageView.deliveryAttempt,
+ });
+
+ // Simulate business processing
+ try {
+ // TODO: Add your business logic here
+ await doBusinessLogic(messageView);
+
+ // Return success to indicate message has been consumed successfully
+ return ConsumeResult.SUCCESS;
+ } catch (error) {
+ console.error('Failed to process message:', error);
+ // Return failure, message will be retried
+ return ConsumeResult.FAILURE;
+ }
+ },
+ };
+
+ // 2. Configure PushConsumer
+ const pushConsumer = new PushConsumer({
+ // Basic configuration
+ namespace: process.env.ROCKETMQ_NAMESPACE || 'yourNamespace', // Namespace
+ endpoints: ENDPOINT, // RocketMQ server endpoint
+
+ // Authentication credentials (optional)
+ sessionCredentials: {
+ accessKey: ACCESS_KEY, // AccessKey for authentication
+ accessSecret: SECRET_KEY, // SecretKey for authentication
+ // securityToken: 'yourSecurityToken', // SecurityToken, optional
+ },
+
+ // Consumer group configuration
+ consumerGroup: 'yourConsumerGroup',
+
+ // Subscription configuration: Map<topic, filterExpression>
+ // filterExpression can be a string (TAG expression) or FilterExpression
object
+ subscriptions: new Map([
+ ['yourTopic1', '*'], // Subscribe to all TAGs
+ ['yourTopic2', 'TagA || TagB'], // Subscribe to specific TAGs
+ // ['yourTopic3', new FilterExpression('yourSqlExpression',
FilterType.SQL92)],
+ ]),
+
+ // Message listener
+ messageListener: messageListener,
+
+ // Cache configuration (optional)
+ maxCacheMessageCount: 1024, // Max cached messages per queue,
default 1024
+ maxCacheMessageSizeInBytes: 67108864, // Max cached bytes per queue
(64MB), default 64MB
+
+ // Long polling timeout configuration (optional)
+ longPollingTimeout: 30000, // Long polling timeout in
milliseconds, default 30000ms
+
+ // Request timeout configuration (optional)
+ requestTimeout: 3000, // Request timeout in milliseconds,
default 3000ms
+
+ // Logger configuration (optional)
+ // logger: yourCustomLogger, // Custom logger
+ });
+
+ try {
+ // 3. Start consumer
+ console.log('Starting PushConsumer...');
+ await pushConsumer.startup();
+ console.log('PushConsumer started successfully!');
+ console.log('Client ID:', pushConsumer.getClientId());
+ console.log('Consumer Group:', pushConsumer.getConsumerGroup());
+
+ // 4. Dynamic subscription (optional)
+ // Can add new subscriptions at runtime
+ // import { FilterExpression } from '../src';
+ // await pushConsumer.subscribe('newTopic', new FilterExpression('TagC'));
+
+ // 5. Unsubscribe (optional)
+ // pushConsumer.unsubscribe('yourTopic1');
+
+ // Keep program running, waiting for messages
+ console.log('\nPress Ctrl+C to exit...');
+
+ // Graceful shutdown handling
+ process.on('SIGINT', async () => {
+ console.log('\nShutting down PushConsumer...');
+ await shutdown(pushConsumer);
+ process.exit(0);
+ });
+
+ // Keep program running
+ await new Promise(() => {});
+ } catch (error) {
+ console.error('Failed to start PushConsumer:', error);
+ await shutdown(pushConsumer);
+ process.exit(1);
+ }
+}
+
+// Business logic processing function example
+async function doBusinessLogic(messageView: MessageView): Promise<void> {
+ // Simulate asynchronous business processing
+ await new Promise(resolve => setTimeout(resolve, 100));
+
+ // Implement your business logic here
+ // For example:
+ // - Parse message content
+ // - Call database
+ // - Call external API
+ // - Send notifications, etc.
+}
+
+// Gracefully shutdown consumer
+async function shutdown(pushConsumer: PushConsumer) {
+ try {
+ await pushConsumer.shutdown();
+ console.log('PushConsumer has been closed');
+ } catch (error) {
+ console.error('Error occurred while closing PushConsumer:', error);
+ }
+}
+
+// Run example
+main().catch(console.error);
diff --git a/nodejs/examples/PushConsumerQuickStart.ts
b/nodejs/examples/PushConsumerQuickStart.ts
new file mode 100644
index 00000000..655bf245
--- /dev/null
+++ b/nodejs/examples/PushConsumerQuickStart.ts
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * PushConsumer Quick Start Example
+ *
+ * The simplest way to use PushConsumer
+ */
+
+import { PushConsumer, ConsumeResult, type MessageView } from '../src';
+
+async function quickStart() {
+ // Create PushConsumer instance
+ const pushConsumer = new PushConsumer({
+ namespace: process.env.ROCKETMQ_NAMESPACE || '', // Namespace, can be
empty string
+ endpoints: process.env.ROCKETMQ_ENDPOINT || 'localhost:8080',
+ consumerGroup: 'yourConsumerGroup',
+
+ // Subscribe to topic and TAG
+ subscriptions: new Map([
+ ['yourTopic', '*'], // Subscribe to yourTopic, receive all TAGs
+ ]),
+
+ // Message listener - this is the core processing logic
+ messageListener: {
+ async consume(messageView: MessageView): Promise<ConsumeResult> {
+ console.log('Received message:', messageView.body.toString('utf-8'));
+
+ // TODO: Process your business logic here
+
+ return ConsumeResult.SUCCESS;
+ },
+ },
+ });
+
+ try {
+ // Start consumer
+ await pushConsumer.startup();
+ console.log('PushConsumer started, waiting for messages...');
+
+ // Keep running
+ await new Promise(() => {});
+ } catch (error) {
+ console.error('Error:', error);
+ await pushConsumer.shutdown();
+ throw error;
+ }
+}
+
+// Run example
+quickStart().catch(console.error);
diff --git a/nodejs/proto/apache/rocketmq/v2/definition.proto
b/nodejs/proto/apache/rocketmq/v2/definition.proto
index 753bfceb..468c4105 100644
--- a/nodejs/proto/apache/rocketmq/v2/definition.proto
+++ b/nodejs/proto/apache/rocketmq/v2/definition.proto
@@ -346,6 +346,8 @@ enum Code {
CLIENT_ID_REQUIRED = 40017;
// Polling time is illegal.
ILLEGAL_POLLING_TIME = 40018;
+ // Offset is illegal.
+ ILLEGAL_OFFSET = 40019;
// Generic code indicates that the client request lacks valid authentication
// credentials for the requested resource.
@@ -365,6 +367,8 @@ enum Code {
TOPIC_NOT_FOUND = 40402;
// Consumer group resource does not exist.
CONSUMER_GROUP_NOT_FOUND = 40403;
+ // Offset not found from server.
+ OFFSET_NOT_FOUND = 40404;
// Generic code representing client side timeout when connecting to, reading
data from, or write data to server.
REQUEST_TIMEOUT = 40800;
@@ -373,6 +377,8 @@ enum Code {
PAYLOAD_TOO_LARGE = 41300;
// Message body size exceeds the threshold.
MESSAGE_BODY_TOO_LARGE = 41301;
+ // Message body is empty.
+ MESSAGE_BODY_EMPTY = 41302;
// Generic code for use cases where pre-conditions are not met.
// For example, if a producer instance is used to publish messages without
prior start() invocation,
diff --git a/nodejs/proto/apache/rocketmq/v2/service.proto
b/nodejs/proto/apache/rocketmq/v2/service.proto
index f662f769..18db185a 100644
--- a/nodejs/proto/apache/rocketmq/v2/service.proto
+++ b/nodejs/proto/apache/rocketmq/v2/service.proto
@@ -66,6 +66,8 @@ message SendResultEntry {
string message_id = 2;
string transaction_id = 3;
int64 offset = 4;
+ // Unique handle to identify message to recall, support delay message for
now.
+ string recall_handle = 5;
}
message SendMessageResponse {
@@ -97,6 +99,7 @@ message ReceiveMessageRequest {
// For message auto renew and clean
bool auto_renew = 6;
optional google.protobuf.Duration long_polling_timeout = 7;
+ optional string attempt_id = 8;
}
message ReceiveMessageResponse {
@@ -169,6 +172,8 @@ message EndTransactionResponse { Status status = 1; }
message PrintThreadStackTraceCommand { string nonce = 1; }
+message ReconnectEndpointsCommand { string nonce = 1; }
+
message ThreadStackTrace {
string nonce = 1;
optional string thread_stack_trace = 2;
@@ -213,6 +218,9 @@ message TelemetryCommand {
// Request client to verify the consumption of the appointed message.
VerifyMessageCommand verify_message_command = 7;
+
+ // Request client to reconnect server use the latest endpoints.
+ ReconnectEndpointsCommand reconnect_endpoints_command = 8;
}
}
@@ -292,6 +300,17 @@ message QueryOffsetResponse {
int64 offset = 2;
}
+message RecallMessageRequest {
+ Resource topic = 1;
+ // Refer to SendResultEntry.
+ string recall_handle = 2;
+}
+
+message RecallMessageResponse {
+ Status status = 1;
+ string message_id = 2;
+}
+
// For all the RPCs in MessagingService, the following error handling policies
// apply:
//
@@ -377,12 +396,19 @@ service MessagingService {
rpc ForwardMessageToDeadLetterQueue(ForwardMessageToDeadLetterQueueRequest)
returns (ForwardMessageToDeadLetterQueueResponse) {}
+ // PullMessage and ReceiveMessage RPCs serve a similar purpose,
+ // which is to attempt to get messages from the server, but with different
semantics.
rpc PullMessage(PullMessageRequest) returns (stream PullMessageResponse) {}
+ // Update the consumption progress of the designated queue of the
+ // consumer group to the remote.
rpc UpdateOffset(UpdateOffsetRequest) returns (UpdateOffsetResponse) {}
+ // Query the consumption progress of the designated queue of the
+ // consumer group to the remote.
rpc GetOffset(GetOffsetRequest) returns (GetOffsetResponse) {}
+ // Query the offset of the designated queue by the query offset policy.
rpc QueryOffset(QueryOffsetRequest) returns (QueryOffsetResponse) {}
// Commits or rollback one transactional message.
@@ -408,4 +434,10 @@ service MessagingService {
// ChangeInvisibleDuration to lengthen invisible duration.
rpc ChangeInvisibleDuration(ChangeInvisibleDurationRequest) returns
(ChangeInvisibleDurationResponse) {
}
+
+ // Recall a message,
+ // for delay message, should recall before delivery time, like the rollback
operation of transaction message,
+ // for normal message, not supported for now.
+ rpc RecallMessage(RecallMessageRequest) returns (RecallMessageResponse) {
+ }
}
\ No newline at end of file
diff --git a/nodejs/src/client/BaseClient.ts b/nodejs/src/client/BaseClient.ts
index 77f790b9..d1651663 100644
--- a/nodejs/src/client/BaseClient.ts
+++ b/nodejs/src/client/BaseClient.ts
@@ -89,6 +89,7 @@ export abstract class BaseClient {
#startupResolve?: () => void;
#startupReject?: (err: Error) => void;
#timers: NodeJS.Timeout[] = [];
+ #running = false;
constructor(options: BaseClientOptions) {
this.logger = options.logger ?? getDefaultLogger();
@@ -125,8 +126,11 @@ export abstract class BaseClient {
}
async #startup() {
+ this.logger.info('Begin to execute startup flow, clientId=%s, topics=%d',
+ this.clientId, this.topics.size);
// fetch topic route
await this.updateRoutes();
+ this.logger.info('Topic routes updated, clientId=%s', this.clientId);
// update topic route every 30s
this.#timers.push(setInterval(async () => {
this.updateRoutes();
@@ -146,19 +150,27 @@ export abstract class BaseClient {
// doStats()
if (this.topics.size > 0) {
+ this.logger.info('Waiting for first onSettingsCommand, clientId=%s',
this.clientId);
// wait for this first onSettingsCommand call
// eslint-disable-next-line @typescript-eslint/no-unused-vars
await new Promise<void>((resolve, reject) => {
this.#startupReject = reject;
this.#startupResolve = resolve;
});
+ this.logger.info('Received first onSettingsCommand, clientId=%s',
this.clientId);
this.#startupReject = undefined;
this.#startupResolve = undefined;
}
+ this.#running = true;
+ }
+
+ isRunning(): boolean {
+ return this.#running;
}
async shutdown() {
this.logger.info('Begin to shutdown the rocketmq client, clientId=%s',
this.clientId);
+ this.#running = false;
while (this.#timers.length > 0) {
const timer = this.#timers.pop();
clearInterval(timer);
@@ -351,6 +363,8 @@ export abstract class BaseClient {
}
onSettingsCommand(_endpoints: Endpoints, settings: SettingsPB) {
+ this.logger.info('Received settings command, clientId=%s, settings=%j',
+ this.clientId, settings.toObject());
// final Metric metric = new Metric(settings.getMetric());
// clientMeterManager.reset(metric);
this.getSettings().sync(settings);
diff --git a/nodejs/src/client/Logger.ts b/nodejs/src/client/Logger.ts
index 8a5abbeb..269883bd 100644
--- a/nodejs/src/client/Logger.ts
+++ b/nodejs/src/client/Logger.ts
@@ -23,6 +23,7 @@ export interface ILogger {
info(...args: any[]): void;
warn(...args: any[]): void;
error(...args: any[]): void;
+ debug?(...args: any[]): void;
close?(...args: any[]): void;
}
diff --git a/nodejs/src/client/TelemetrySession.ts
b/nodejs/src/client/TelemetrySession.ts
index 3812f928..e36fd19e 100644
--- a/nodejs/src/client/TelemetrySession.ts
+++ b/nodejs/src/client/TelemetrySession.ts
@@ -31,6 +31,8 @@ export class TelemetrySession {
this.#endpoints = endpoints;
this.#baseClient = baseClient;
this.#logger = logger;
+ this.#logger.info('Creating telemetry session, endpoints=%s, clientId=%s',
+ endpoints, baseClient.clientId);
this.#renewStream(true);
}
@@ -51,12 +53,22 @@ export class TelemetrySession {
}
#renewStream(inited: boolean) {
- this.#stream = this.#baseClient.createTelemetryStream(this.#endpoints);
- this.#stream.on('data', this.#onData.bind(this));
- this.#stream.once('error', this.#onError.bind(this));
- this.#stream.once('end', this.#onEnd.bind(this));
- if (!inited) {
- this.syncSettings();
+ try {
+ this.#logger.debug?.('Creating telemetry stream, endpoints=%s,
clientId=%s, inited=%s',
+ this.#endpoints, this.#baseClient.clientId, inited);
+ this.#stream = this.#baseClient.createTelemetryStream(this.#endpoints);
+ this.#stream.on('data', this.#onData.bind(this));
+ this.#stream.once('error', this.#onError.bind(this));
+ this.#stream.once('end', this.#onEnd.bind(this));
+ if (!inited) {
+ this.#logger.info('Syncing settings to new stream, endpoints=%s,
clientId=%s',
+ this.#endpoints, this.#baseClient.clientId);
+ this.syncSettings();
+ }
+ } catch (err) {
+ this.#logger.error('Failed to create telemetry stream, endpoints=%s,
clientId=%s, error=%s',
+ this.#endpoints, this.#baseClient.clientId, err);
+ throw err;
}
}
diff --git a/nodejs/src/client/Logger.ts b/nodejs/src/consumer/Assignment.ts
similarity index 54%
copy from nodejs/src/client/Logger.ts
copy to nodejs/src/consumer/Assignment.ts
index 8a5abbeb..e281b3f6 100644
--- a/nodejs/src/client/Logger.ts
+++ b/nodejs/src/consumer/Assignment.ts
@@ -15,21 +15,25 @@
* limitations under the License.
*/
-import path from 'node:path';
-import { homedir } from 'node:os';
-import { EggLogger } from 'egg-logger';
+import { MessageQueue } from '../route';
-export interface ILogger {
- info(...args: any[]): void;
- warn(...args: any[]): void;
- error(...args: any[]): void;
- close?(...args: any[]): void;
-}
+export class Assignment {
+ readonly messageQueue: MessageQueue;
+
+ constructor(messageQueue: MessageQueue) {
+ this.messageQueue = messageQueue;
+ }
+
+ equals(other: Assignment): boolean {
+ if (this === other) return true;
+ if (!other) return false;
+ return this.messageQueue === other.messageQueue ||
+ (this.messageQueue.queueId === other.messageQueue.queueId &&
+ this.messageQueue.topic.name === other.messageQueue.topic.name &&
+ this.messageQueue.broker.name === other.messageQueue.broker.name);
+ }
-export function getDefaultLogger() {
- const file = path.join(homedir(),
'logs/rocketmq/rocketmq_client_nodejs.log');
- return new EggLogger({
- file,
- level: 'INFO',
- });
+ toString(): string {
+ return `Assignment{messageQueue=${JSON.stringify(this.messageQueue)}}`;
+ }
}
diff --git a/nodejs/src/client/Logger.ts b/nodejs/src/consumer/Assignments.ts
similarity index 50%
copy from nodejs/src/client/Logger.ts
copy to nodejs/src/consumer/Assignments.ts
index 8a5abbeb..a6655114 100644
--- a/nodejs/src/client/Logger.ts
+++ b/nodejs/src/consumer/Assignments.ts
@@ -15,21 +15,32 @@
* limitations under the License.
*/
-import path from 'node:path';
-import { homedir } from 'node:os';
-import { EggLogger } from 'egg-logger';
+import { Assignment } from './Assignment';
-export interface ILogger {
- info(...args: any[]): void;
- warn(...args: any[]): void;
- error(...args: any[]): void;
- close?(...args: any[]): void;
-}
+export class Assignments {
+ readonly #assignmentList: Assignment[];
+
+ constructor(assignmentList: Assignment[]) {
+ this.#assignmentList = assignmentList;
+ }
+
+ getAssignmentList(): Assignment[] {
+ return this.#assignmentList;
+ }
+
+ equals(other?: Assignments): boolean {
+ if (this === other) return true;
+ if (!other) return false;
+ if (this.#assignmentList.length !== other.#assignmentList.length) return
false;
+ for (let i = 0; i < this.#assignmentList.length; i++) {
+ if (!this.#assignmentList[i].equals(other.#assignmentList[i])) {
+ return false;
+ }
+ }
+ return true;
+ }
-export function getDefaultLogger() {
- const file = path.join(homedir(),
'logs/rocketmq/rocketmq_client_nodejs.log');
- return new EggLogger({
- file,
- level: 'INFO',
- });
+ toString(): string {
+ return `Assignments{assignmentList=[${this.#assignmentList.map(a =>
a.toString()).join(', ')}]}`;
+ }
}
diff --git a/nodejs/src/consumer/index.ts b/nodejs/src/consumer/ConsumeResult.ts
similarity index 81%
copy from nodejs/src/consumer/index.ts
copy to nodejs/src/consumer/ConsumeResult.ts
index 73dbd62f..e3d2aafa 100644
--- a/nodejs/src/consumer/index.ts
+++ b/nodejs/src/consumer/ConsumeResult.ts
@@ -15,8 +15,7 @@
* limitations under the License.
*/
-export * from './Consumer';
-export * from './FilterExpression';
-export * from './SimpleConsumer';
-export * from './SimpleSubscriptionSettings';
-export * from './SubscriptionLoadBalancer';
+export enum ConsumeResult {
+ SUCCESS = 'SUCCESS',
+ FAILURE = 'FAILURE',
+}
diff --git a/nodejs/src/consumer/ConsumeService.ts
b/nodejs/src/consumer/ConsumeService.ts
new file mode 100644
index 00000000..a1a64d8b
--- /dev/null
+++ b/nodejs/src/consumer/ConsumeService.ts
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { MessageView } from '../message';
+import { ConsumeResult } from './ConsumeResult';
+import { ConsumeTask } from './ConsumeTask';
+import { MessageListener } from './MessageListener';
+import type { ProcessQueue } from './ProcessQueue';
+
+export abstract class ConsumeService {
+ protected readonly clientId: string;
+ readonly #messageListener: MessageListener;
+ #aborted = false;
+
+ constructor(clientId: string, messageListener: MessageListener) {
+ this.clientId = clientId;
+ this.#messageListener = messageListener;
+ }
+
+ abstract consume(pq: ProcessQueue, messageViews: MessageView[]): void;
+
+ async consumeMessage(messageView: MessageView, delay = 0):
Promise<ConsumeResult> {
+ if (this.#aborted) {
+ return ConsumeResult.FAILURE;
+ }
+ const task = new ConsumeTask(this.clientId, this.#messageListener,
messageView);
+ if (delay <= 0) {
+ return task.call();
+ }
+ await new Promise<void>(resolve => setTimeout(resolve, delay));
+ if (this.#aborted) {
+ return ConsumeResult.FAILURE;
+ }
+ return task.call();
+ }
+
+ abort() {
+ this.#aborted = true;
+ }
+}
diff --git a/nodejs/src/client/Logger.ts b/nodejs/src/consumer/ConsumeTask.ts
similarity index 52%
copy from nodejs/src/client/Logger.ts
copy to nodejs/src/consumer/ConsumeTask.ts
index 8a5abbeb..48931264 100644
--- a/nodejs/src/client/Logger.ts
+++ b/nodejs/src/consumer/ConsumeTask.ts
@@ -15,21 +15,26 @@
* limitations under the License.
*/
-import path from 'node:path';
-import { homedir } from 'node:os';
-import { EggLogger } from 'egg-logger';
+import { MessageView } from '../message';
+import { ConsumeResult } from './ConsumeResult';
+import { MessageListener } from './MessageListener';
-export interface ILogger {
- info(...args: any[]): void;
- warn(...args: any[]): void;
- error(...args: any[]): void;
- close?(...args: any[]): void;
-}
+export class ConsumeTask {
+ readonly #messageListener: MessageListener;
+ readonly #messageView: MessageView;
+
+ constructor(_clientId: string, messageListener: MessageListener,
messageView: MessageView) {
+ this.#messageListener = messageListener;
+ this.#messageView = messageView;
+ }
-export function getDefaultLogger() {
- const file = path.join(homedir(),
'logs/rocketmq/rocketmq_client_nodejs.log');
- return new EggLogger({
- file,
- level: 'INFO',
- });
+ async call(): Promise<ConsumeResult> {
+ try {
+ const result = await this.#messageListener.consume(this.#messageView);
+ return result;
+ } catch (e) {
+ // Message listener raised an exception while consuming messages
+ return ConsumeResult.FAILURE;
+ }
+ }
}
diff --git a/nodejs/src/consumer/Consumer.ts b/nodejs/src/consumer/Consumer.ts
index cdc78bf3..19c7f66c 100644
--- a/nodejs/src/consumer/Consumer.ts
+++ b/nodejs/src/consumer/Consumer.ts
@@ -108,4 +108,22 @@ export abstract class Consumer extends BaseClient {
StatusChecker.check(response.status);
return response.receiptHandle;
}
+
+ /**
+ * Expose public methods for ProcessQueue to access RPC operations
+ */
+ async ackMessageViaRpc(endpoints: any, request: AckMessageRequest, timeout:
number) {
+ const res = await this.rpcClientManager.ackMessage(endpoints, request,
timeout);
+ return res;
+ }
+
+ async changeInvisibleDurationViaRpc(endpoints: any, request:
ChangeInvisibleDurationRequest, timeout: number) {
+ const res = await this.rpcClientManager.changeInvisibleDuration(endpoints,
request, timeout);
+ return res;
+ }
+
+ async forwardMessageToDeadLetterQueueViaRpc(endpoints: any, request: any,
timeout: number) {
+ const res = await
this.rpcClientManager.forwardMessageToDeadLetterQueue(endpoints, request,
timeout);
+ return res;
+ }
}
diff --git a/nodejs/src/consumer/FifoConsumeService.ts
b/nodejs/src/consumer/FifoConsumeService.ts
new file mode 100644
index 00000000..58d07e1e
--- /dev/null
+++ b/nodejs/src/consumer/FifoConsumeService.ts
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { MessageView } from '../message';
+import { ConsumeService } from './ConsumeService';
+import { MessageListener } from './MessageListener';
+import type { ProcessQueue } from './ProcessQueue';
+
+export class FifoConsumeService extends ConsumeService {
+ constructor(clientId: string, messageListener: MessageListener) {
+ super(clientId, messageListener);
+ }
+
+ consume(pq: ProcessQueue, messageViews: MessageView[]): void {
+ this.#consumeIteratively(pq, messageViews, 0);
+ }
+
+ #consumeIteratively(pq: ProcessQueue, messageViews: MessageView[], index:
number): void {
+ if (index >= messageViews.length) {
+ return;
+ }
+
+ const messageView = messageViews[index];
+
+ if (messageView.corrupted) {
+ pq.discardFifoMessage(messageView);
+ this.#consumeIteratively(pq, messageViews, index + 1);
+ return;
+ }
+
+ this.consumeMessage(messageView)
+ .then(result => pq.eraseFifoMessage(messageView, result))
+ .then(() => this.#consumeIteratively(pq, messageViews, index + 1))
+ .catch(() => this.#consumeIteratively(pq, messageViews, index + 1));
+ }
+}
diff --git a/nodejs/src/consumer/index.ts
b/nodejs/src/consumer/MessageListener.ts
similarity index 79%
copy from nodejs/src/consumer/index.ts
copy to nodejs/src/consumer/MessageListener.ts
index 73dbd62f..a696226f 100644
--- a/nodejs/src/consumer/index.ts
+++ b/nodejs/src/consumer/MessageListener.ts
@@ -15,8 +15,9 @@
* limitations under the License.
*/
-export * from './Consumer';
-export * from './FilterExpression';
-export * from './SimpleConsumer';
-export * from './SimpleSubscriptionSettings';
-export * from './SubscriptionLoadBalancer';
+import { MessageView } from '../message';
+import { ConsumeResult } from './ConsumeResult';
+
+export interface MessageListener {
+ consume(messageView: MessageView): ConsumeResult | Promise<ConsumeResult>;
+}
diff --git a/nodejs/src/consumer/ProcessQueue.ts
b/nodejs/src/consumer/ProcessQueue.ts
new file mode 100644
index 00000000..65eaca7b
--- /dev/null
+++ b/nodejs/src/consumer/ProcessQueue.ts
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { randomUUID } from 'node:crypto';
+import { Code } from '../../proto/apache/rocketmq/v2/definition_pb';
+import { MessageView } from '../message';
+import { MessageQueue } from '../route';
+import { TooManyRequestsException } from '../exception';
+import { ConsumeResult } from './ConsumeResult';
+import { FilterExpression } from './FilterExpression';
+import type { PushConsumer } from './PushConsumer';
+
+const ACK_MESSAGE_FAILURE_BACKOFF_DELAY = 1000;
+const CHANGE_INVISIBLE_DURATION_FAILURE_BACKOFF_DELAY = 1000;
+const FORWARD_MESSAGE_TO_DLQ_FAILURE_BACKOFF_DELAY = 1000;
+
+const RECEIVING_FLOW_CONTROL_BACKOFF_DELAY = 20;
+const RECEIVING_FAILURE_BACKOFF_DELAY = 1000;
+const RECEIVING_BACKOFF_DELAY_WHEN_CACHE_IS_FULL = 1000;
+
+export class ProcessQueue {
+ readonly #consumer: PushConsumer;
+ #dropped = false;
+ readonly #mq: MessageQueue;
+ readonly #filterExpression: FilterExpression;
+ readonly #cachedMessages: MessageView[] = [];
+ #cachedMessagesBytes = 0;
+ #activityTime = Date.now();
+ #cacheFullTime = 0;
+ #aborted = false;
+
+ constructor(consumer: PushConsumer, mq: MessageQueue, filterExpression:
FilterExpression) {
+ this.#consumer = consumer;
+ this.#mq = mq;
+ this.#filterExpression = filterExpression;
+ }
+
+ getMessageQueue(): MessageQueue {
+ return this.#mq;
+ }
+
+ drop(): void {
+ this.#dropped = true;
+ }
+
+ expired(): boolean {
+ const longPollingTimeout =
this.#consumer.getPushConsumerSettings().getLongPollingTimeout();
+ const requestTimeout = this.#consumer.requestTimeoutValue;
+ const maxIdleDuration = (longPollingTimeout + requestTimeout) * 3;
+ const idleDuration = Date.now() - this.#activityTime;
+ if (idleDuration < maxIdleDuration) {
+ return false;
+ }
+ const afterCacheFullDuration = Date.now() - this.#cacheFullTime;
+ if (afterCacheFullDuration < maxIdleDuration) {
+ return false;
+ }
+ return true;
+ }
+
+ cacheMessages(messageList: MessageView[]): void {
+ for (const messageView of messageList) {
+ this.#cachedMessages.push(messageView);
+ this.#cachedMessagesBytes += messageView.body.length;
+ }
+ }
+
+ #getReceptionBatchSize(): number {
+ const bufferSize = Math.max(
+ this.#consumer.cacheMessageCountThresholdPerQueue() -
this.cachedMessagesCount(),
+ 1,
+ );
+ return Math.min(bufferSize,
this.#consumer.getPushConsumerSettings().getReceiveBatchSize());
+ }
+
+ fetchMessageImmediately(): void {
+ this.#receiveMessageImmediately();
+ }
+
+ onReceiveMessageException(t: Error, attemptId?: string): void {
+ const delay = t instanceof TooManyRequestsException
+ ? RECEIVING_FLOW_CONTROL_BACKOFF_DELAY
+ : RECEIVING_FAILURE_BACKOFF_DELAY;
+ this.#receiveMessageLater(delay, attemptId);
+ }
+
+ #receiveMessageLater(delay: number, attemptId?: string): void {
+ if (this.#aborted) return;
+ setTimeout(() => {
+ if (this.#aborted) return;
+ this.#receiveMessage(attemptId);
+ }, delay);
+ }
+
+ #generateAttemptId(): string {
+ return randomUUID();
+ }
+
+ receiveMessage(attemptId?: string): void {
+ this.#receiveMessage(attemptId);
+ }
+
+ #receiveMessage(attemptId?: string): void {
+ if (this.#dropped) {
+ return;
+ }
+ if (this.#isCacheFull()) {
+ this.#receiveMessageLater(RECEIVING_BACKOFF_DELAY_WHEN_CACHE_IS_FULL,
attemptId);
+ return;
+ }
+ this.#receiveMessageImmediately(attemptId);
+ }
+
+ #receiveMessageImmediately(attemptId?: string): void {
+ if (this.#aborted) return;
+ attemptId = attemptId ?? this.#generateAttemptId();
+ try {
+ const batchSize = this.#getReceptionBatchSize();
+ const longPollingTimeout =
this.#consumer.getPushConsumerSettings().getLongPollingTimeout();
+ const request = this.#consumer.wrapPushReceiveMessageRequest(
+ batchSize, this.#mq, this.#filterExpression, longPollingTimeout,
attemptId,
+ );
+ this.#activityTime = Date.now();
+
+ this.#consumer.receiveMessage(request, this.#mq, longPollingTimeout)
+ .then(messages => {
+ this.#onReceiveMessageResult(messages);
+ })
+ .catch(err => {
+ this.onReceiveMessageException(err, attemptId);
+ });
+ } catch (err) {
+ this.onReceiveMessageException(err as Error, attemptId);
+ }
+ }
+
+ #onReceiveMessageResult(messages: MessageView[]): void {
+ if (messages.length > 0) {
+ this.cacheMessages(messages);
+ this.#consumer.getConsumeService().consume(this, messages);
+ }
+ this.#receiveMessage();
+ }
+
+ #isCacheFull(): boolean {
+ const cacheMessageCountThreshold =
this.#consumer.cacheMessageCountThresholdPerQueue();
+ const actualCount = this.cachedMessagesCount();
+ if (cacheMessageCountThreshold <= actualCount) {
+ this.#cacheFullTime = Date.now();
+ return true;
+ }
+
+ const cacheMessageBytesThreshold =
this.#consumer.cacheMessageBytesThresholdPerQueue();
+ if (cacheMessageBytesThreshold <= this.#cachedMessagesBytes) {
+ this.#cacheFullTime = Date.now();
+ return true;
+ }
+
+ return false;
+ }
+
+ eraseMessage(messageView: MessageView, consumeResult: ConsumeResult): void {
+ const task = consumeResult === ConsumeResult.SUCCESS
+ ? this.#ackMessage(messageView)
+ : this.#nackMessage(messageView);
+ task.finally(() => {
+ this.#evictCache(messageView);
+ });
+ }
+
+ async #ackMessage(messageView: MessageView, attempt = 1): Promise<void> {
+ try {
+ const endpoints = messageView.endpoints;
+ const response = await this.#consumer.ackMessageViaRpc(
+ endpoints,
+ this.#consumer.wrapAckMessageRequest(messageView),
+ this.#consumer.requestTimeoutValue,
+ );
+ const status = response.getStatus()?.toObject();
+ if (status?.code === Code.INVALID_RECEIPT_HANDLE) {
+ return; // Forgive to retry
+ }
+ if (status?.code !== Code.OK) {
+ await this.#ackMessageLater(messageView, attempt + 1);
+ }
+ } catch {
+ await this.#ackMessageLater(messageView, attempt + 1);
+ }
+ }
+
+ async #ackMessageLater(messageView: MessageView, attempt: number):
Promise<void> {
+ if (this.#aborted) return;
+ await new Promise<void>(resolve => setTimeout(resolve,
ACK_MESSAGE_FAILURE_BACKOFF_DELAY));
+ if (this.#aborted) return;
+ await this.#ackMessage(messageView, attempt);
+ }
+
+ async #nackMessage(messageView: MessageView): Promise<void> {
+ const retryPolicy = this.#consumer.getRetryPolicy();
+ const delay = retryPolicy?.getNextAttemptDelay(messageView.deliveryAttempt
?? 1) ?? 0;
+ await this.#changeInvisibleDuration(messageView, delay);
+ }
+
+ async #changeInvisibleDuration(messageView: MessageView, duration: number,
attempt = 1): Promise<void> {
+ try {
+ const endpoints = messageView.endpoints;
+ const response = await this.#consumer.changeInvisibleDurationViaRpc(
+ endpoints,
+ this.#consumer.wrapChangeInvisibleDurationRequest(messageView,
duration),
+ this.#consumer.requestTimeoutValue,
+ );
+ const status = response.getStatus()?.toObject();
+ if (status?.code === Code.INVALID_RECEIPT_HANDLE) {
+ return; // Forgive to retry
+ }
+ if (status?.code !== Code.OK) {
+ await this.#changeInvisibleDurationLater(messageView, duration,
attempt + 1);
+ }
+ } catch {
+ await this.#changeInvisibleDurationLater(messageView, duration, attempt
+ 1);
+ }
+ }
+
+ async #changeInvisibleDurationLater(messageView: MessageView, duration:
number, attempt: number): Promise<void> {
+ if (this.#aborted) return;
+ await new Promise<void>(resolve => setTimeout(resolve,
CHANGE_INVISIBLE_DURATION_FAILURE_BACKOFF_DELAY));
+ if (this.#aborted) return;
+ await this.#changeInvisibleDuration(messageView, duration, attempt);
+ }
+
+ async eraseFifoMessage(messageView: MessageView, consumeResult:
ConsumeResult): Promise<void> {
+ const retryPolicy = this.#consumer.getRetryPolicy();
+ const maxAttempts = retryPolicy?.getMaxAttempts() ?? 1;
+ let attempt = messageView.deliveryAttempt ?? 1;
+
+ if (consumeResult === ConsumeResult.FAILURE && attempt < maxAttempts) {
+ const nextAttemptDelay = retryPolicy?.getNextAttemptDelay(attempt) ?? 0;
+ // Redeliver the fifo message
+ const result = await
this.#consumer.getConsumeService().consumeMessage(messageView,
nextAttemptDelay);
+ await this.eraseFifoMessage(messageView, result);
+ } else {
+ const task = consumeResult === ConsumeResult.SUCCESS
+ ? this.#ackMessage(messageView)
+ : this.#forwardToDeadLetterQueue(messageView);
+ await task;
+ this.#evictCache(messageView);
+ }
+ }
+
+ async #forwardToDeadLetterQueue(messageView: MessageView, attempt = 1):
Promise<void> {
+ try {
+ const endpoints = messageView.endpoints;
+ const response = await
this.#consumer.forwardMessageToDeadLetterQueueViaRpc(
+ endpoints,
+ this.#consumer.wrapForwardMessageToDeadLetterQueueRequest(messageView),
+ this.#consumer.requestTimeoutValue,
+ );
+ const status = response.getStatus();
+ if (!status) {
+ throw new Error('Missing status in forward to dead letter queue
response');
+ }
+ const statusObj = status.toObject();
+ if (statusObj.code !== Code.OK) {
+ await this.#forwardToDeadLetterQueueLater(messageView, attempt + 1);
+ }
+ } catch (err) {
+ if ((err as Error).message === 'Missing status in forward to dead letter
queue response') {
+ throw err; // Re-throw critical error
+ }
+ await this.#forwardToDeadLetterQueueLater(messageView, attempt + 1);
+ }
+ }
+
+ async #forwardToDeadLetterQueueLater(messageView: MessageView, attempt:
number): Promise<void> {
+ if (this.#aborted) return;
+ await new Promise<void>(resolve => setTimeout(resolve,
FORWARD_MESSAGE_TO_DLQ_FAILURE_BACKOFF_DELAY));
+ if (this.#aborted) return;
+ await this.#forwardToDeadLetterQueue(messageView, attempt);
+ }
+
+ discardMessage(messageView: MessageView): void {
+ this.#nackMessage(messageView).finally(() => {
+ this.#evictCache(messageView);
+ });
+ }
+
+ discardFifoMessage(messageView: MessageView): void {
+ this.#forwardToDeadLetterQueue(messageView).finally(() => {
+ this.#evictCache(messageView);
+ });
+ }
+
+ #evictCache(messageView: MessageView): void {
+ const index = this.#cachedMessages.indexOf(messageView);
+ if (index !== -1) {
+ this.#cachedMessages.splice(index, 1);
+ this.#cachedMessagesBytes -= messageView.body.length;
+ }
+ }
+
+ cachedMessagesCount(): number {
+ return this.#cachedMessages.length;
+ }
+
+ cachedMessageBytes(): number {
+ return this.#cachedMessagesBytes;
+ }
+
+ abort(): void {
+ this.#aborted = true;
+ }
+}
diff --git a/nodejs/src/consumer/PushConsumer.ts
b/nodejs/src/consumer/PushConsumer.ts
new file mode 100644
index 00000000..b6841b8c
--- /dev/null
+++ b/nodejs/src/consumer/PushConsumer.ts
@@ -0,0 +1,470 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { ClientType } from '../../proto/apache/rocketmq/v2/definition_pb';
+import {
+ AckMessageRequest,
+ ChangeInvisibleDurationRequest,
+ ForwardMessageToDeadLetterQueueRequest,
+ HeartbeatRequest,
+ NotifyClientTerminationRequest,
+ QueryAssignmentRequest,
+ ReceiveMessageRequest,
+} from '../../proto/apache/rocketmq/v2/service_pb';
+import { MessageView } from '../message';
+import { MessageQueue, TopicRouteData } from '../route';
+import { StatusChecker } from '../exception';
+import { RetryPolicy } from '../retry';
+import { createDuration, createResource } from '../util';
+import { Consumer, ConsumerOptions } from './Consumer';
+import { FilterExpression } from './FilterExpression';
+import { PushSubscriptionSettings } from './PushSubscriptionSettings';
+import { ConsumeService } from './ConsumeService';
+import { StandardConsumeService } from './StandardConsumeService';
+import { FifoConsumeService } from './FifoConsumeService';
+import { ProcessQueue } from './ProcessQueue';
+import { Assignment } from './Assignment';
+import { Assignments } from './Assignments';
+import { MessageListener } from './MessageListener';
+
+const ASSIGNMENT_SCAN_SCHEDULE_DELAY = 1000;
+const ASSIGNMENT_SCAN_SCHEDULE_PERIOD = 5000;
+
+export interface PushConsumerOptions extends ConsumerOptions {
+ subscriptions: Map<string/* topic */, FilterExpression | string>;
+ messageListener: MessageListener;
+ maxCacheMessageCount?: number;
+ maxCacheMessageSizeInBytes?: number;
+ longPollingTimeout?: number;
+}
+
+class ConsumeMetrics {
+ receptionTimes = 0;
+ receivedMessagesQuantity = 0;
+ consumptionOkQuantity = 0;
+ consumptionErrorQuantity = 0;
+}
+
+export class PushConsumer extends Consumer {
+ readonly #pushSubscriptionSettings: PushSubscriptionSettings;
+ readonly #subscriptionExpressions = new Map<string, FilterExpression>();
+ readonly #cacheAssignments = new Map<string, Assignments>();
+ readonly #messageListener: MessageListener;
+ readonly #maxCacheMessageCount: number;
+ readonly #maxCacheMessageSizeInBytes: number;
+ readonly #processQueueTable = new Map<string /* mq key */, { mq:
MessageQueue; pq: ProcessQueue }>();
+ readonly #metrics = new ConsumeMetrics();
+ #consumeService!: ConsumeService;
+ #scanAssignmentTimer?: NodeJS.Timeout;
+
+ constructor(options: PushConsumerOptions) {
+ options.topics = Array.from(options.subscriptions.keys());
+ super(options);
+
+ for (const [ topic, filter ] of options.subscriptions.entries()) {
+ if (typeof filter === 'string') {
+ this.#subscriptionExpressions.set(topic, new FilterExpression(filter));
+ } else {
+ this.#subscriptionExpressions.set(topic, filter);
+ }
+ }
+
+ this.#messageListener = options.messageListener;
+ this.#maxCacheMessageCount = options.maxCacheMessageCount ?? 1024;
+ this.#maxCacheMessageSizeInBytes = options.maxCacheMessageSizeInBytes ??
64 * 1024 * 1024;
+
+ this.#pushSubscriptionSettings = new PushSubscriptionSettings(
+ options.namespace, this.clientId, this.endpoints,
+ this.consumerGroup, this.requestTimeout, this.#subscriptionExpressions,
+ );
+ }
+
+ async startup() {
+ this.logger.info('Begin to start the rocketmq push consumer, clientId=%s,
consumerGroup=%s',
+ this.clientId, this.consumerGroup);
+ await super.startup();
+ this.logger.info('Super startup completed, clientId=%s', this.clientId);
+ try {
+ this.#consumeService = this.#createConsumeService();
+ // Start scanning assignments periodically
+ this.logger.info('Starting assignment scanning, clientId=%s',
this.clientId);
+ setTimeout(() => this.#scanAssignments(),
ASSIGNMENT_SCAN_SCHEDULE_DELAY);
+ this.#scanAssignmentTimer = setInterval(() => this.#scanAssignments(),
ASSIGNMENT_SCAN_SCHEDULE_PERIOD);
+ this.logger.info('Push consumer started successfully, clientId=%s',
this.clientId);
+ } catch (err) {
+ this.logger.error('Failed to start push consumer, cleaning up resources,
clientId=%s, error=%s',
+ this.clientId, err);
+ // Clean up timers if initialization fails
+ if (this.#scanAssignmentTimer) {
+ clearInterval(this.#scanAssignmentTimer);
+ this.#scanAssignmentTimer = undefined;
+ }
+ throw err;
+ }
+ }
+
+ async shutdown() {
+ this.logger.info('Begin to shutdown the rocketmq push consumer,
clientId=%s', this.clientId);
+ // Stop scanning assignments
+ if (this.#scanAssignmentTimer) {
+ clearInterval(this.#scanAssignmentTimer);
+ this.#scanAssignmentTimer = undefined;
+ }
+ // Drop all process queues
+ this.logger.info('Dropping all process queues, clientId=%s, queueCount=%d',
+ this.clientId, this.#processQueueTable.size);
+ for (const { pq } of this.#processQueueTable.values()) {
+ pq.drop();
+ pq.abort();
+ }
+ this.#processQueueTable.clear();
+ // Shutdown consume service
+ if (this.#consumeService) {
+ this.logger.info('Shutting down consume service, clientId=%s',
this.clientId);
+ this.#consumeService.abort();
+ }
+ await super.shutdown();
+ this.logger.info('Push consumer has been shutdown successfully,
clientId=%s', this.clientId);
+ }
+
+ protected getSettings() {
+ return this.#pushSubscriptionSettings;
+ }
+
+ protected wrapHeartbeatRequest() {
+ return new HeartbeatRequest()
+ .setClientType(ClientType.PUSH_CONSUMER)
+ .setGroup(createResource(this.consumerGroup));
+ }
+
+ protected wrapNotifyClientTerminationRequest() {
+ return new NotifyClientTerminationRequest()
+ .setGroup(createResource(this.consumerGroup));
+ }
+
+ // eslint-disable-next-line @typescript-eslint/no-unused-vars
+ protected onTopicRouteDataUpdate(_topic: string, _topicRouteData:
TopicRouteData) {
+ // No-op for push consumer; assignments are queried separately
+ }
+
+ #createConsumeService(): ConsumeService {
+ if (this.#pushSubscriptionSettings.isFifo()) {
+ return new FifoConsumeService(this.clientId, this.#messageListener);
+ }
+ return new StandardConsumeService(this.clientId, this.#messageListener);
+ }
+
+ async subscribe(topic: string, filterExpression: FilterExpression) {
+ await this.getRouteData(topic);
+ this.#subscriptionExpressions.set(topic, filterExpression);
+ }
+
+ unsubscribe(topic: string) {
+ this.#subscriptionExpressions.delete(topic);
+ }
+
+ // --- Public methods for ProcessQueue access ---
+
+ get requestTimeoutValue(): number {
+ return this.requestTimeout;
+ }
+
+ getPushConsumerSettings(): PushSubscriptionSettings {
+ return this.#pushSubscriptionSettings;
+ }
+
+ getConsumerGroup(): string {
+ return this.consumerGroup;
+ }
+
+ getConsumeService(): ConsumeService {
+ return this.#consumeService;
+ }
+
+ getRetryPolicy(): RetryPolicy | undefined {
+ return this.#pushSubscriptionSettings.getRetryPolicy();
+ }
+
+ getClientId(): string {
+ return this.clientId;
+ }
+
+ // --- Metrics access ---
+
+ getReceptionTimes(): number {
+ return this.#metrics.receptionTimes;
+ }
+
+ getReceivedMessagesQuantity(): number {
+ return this.#metrics.receivedMessagesQuantity;
+ }
+
+ getConsumptionOkQuantity(): number {
+ return this.#metrics.consumptionOkQuantity;
+ }
+
+ getConsumptionErrorQuantity(): number {
+ return this.#metrics.consumptionErrorQuantity;
+ }
+
+ incrementReceptionTimes() {
+ this.#metrics.receptionTimes++;
+ }
+
+ incrementReceivedMessagesQuantity(count: number) {
+ this.#metrics.receivedMessagesQuantity += count;
+ }
+
+ incrementConsumptionOkQuantity() {
+ this.#metrics.consumptionOkQuantity++;
+ }
+
+ incrementConsumptionErrorQuantity() {
+ this.#metrics.consumptionErrorQuantity++;
+ }
+
+ // --- Statistics ---
+
+ doStats() {
+ const receptionTimes = this.#metrics.receptionTimes;
+ this.#metrics.receptionTimes = 0;
+ const receivedMessagesQuantity = this.#metrics.receivedMessagesQuantity;
+ this.#metrics.receivedMessagesQuantity = 0;
+ const consumptionOkQuantity = this.#metrics.consumptionOkQuantity;
+ this.#metrics.consumptionOkQuantity = 0;
+ const consumptionErrorQuantity = this.#metrics.consumptionErrorQuantity;
+ this.#metrics.consumptionErrorQuantity = 0;
+
+ this.logger.info('clientId=%s, consumerGroup=%s, receptionTimes=%d,
receivedMessagesQuantity=%d, '
+ + 'consumptionOkQuantity=%d, consumptionErrorQuantity=%d',
+ this.clientId, this.consumerGroup, receptionTimes,
receivedMessagesQuantity,
+ consumptionOkQuantity, consumptionErrorQuantity);
+ }
+
+ wrapPushReceiveMessageRequest(
+ batchSize: number,
+ mq: MessageQueue,
+ filterExpression: FilterExpression,
+ longPollingTimeout: number,
+ attemptId?: string,
+ ) {
+ const request = new ReceiveMessageRequest()
+ .setGroup(createResource(this.consumerGroup))
+ .setMessageQueue(mq.toProtobuf())
+ .setFilterExpression(filterExpression.toProtobuf())
+ .setLongPollingTimeout(createDuration(longPollingTimeout))
+ .setBatchSize(batchSize)
+ .setAutoRenew(true);
+ if (attemptId) {
+ request.setAttemptId(attemptId);
+ }
+ return request;
+ }
+
+ async receiveMessage(request: ReceiveMessageRequest, mq: MessageQueue,
awaitDuration: number) {
+ return super.receiveMessage(request, mq, awaitDuration);
+ }
+
+ wrapAckMessageRequest(messageView: MessageView) {
+ const request = new AckMessageRequest()
+ .setGroup(createResource(this.consumerGroup))
+ .setTopic(createResource(messageView.topic));
+ request.addEntries()
+ .setMessageId(messageView.messageId)
+ .setReceiptHandle(messageView.receiptHandle);
+ return request;
+ }
+
+ wrapChangeInvisibleDurationRequest(messageView: MessageView,
invisibleDuration: number) {
+ return new ChangeInvisibleDurationRequest()
+ .setGroup(createResource(this.consumerGroup))
+ .setTopic(createResource(messageView.topic))
+ .setReceiptHandle(messageView.receiptHandle)
+ .setInvisibleDuration(createDuration(invisibleDuration))
+ .setMessageId(messageView.messageId);
+ }
+
+ wrapForwardMessageToDeadLetterQueueRequest(messageView: MessageView) {
+ const retryPolicy = this.getRetryPolicy();
+ return new ForwardMessageToDeadLetterQueueRequest()
+ .setGroup(createResource(this.consumerGroup))
+ .setTopic(createResource(messageView.topic))
+ .setReceiptHandle(messageView.receiptHandle)
+ .setMessageId(messageView.messageId)
+ .setDeliveryAttempt(messageView.deliveryAttempt ?? 0)
+ .setMaxDeliveryAttempts(retryPolicy?.getMaxAttempts() ?? 1);
+ }
+
+ // --- Internal: queue size and cache threshold ---
+
+ getQueueSize(): number {
+ return this.#processQueueTable.size;
+ }
+
+ cacheMessageBytesThresholdPerQueue(): number {
+ const size = this.getQueueSize();
+ if (size <= 0) return 0;
+ return Math.max(1, Math.floor(this.#maxCacheMessageSizeInBytes / size));
+ }
+
+ cacheMessageCountThresholdPerQueue(): number {
+ const size = this.getQueueSize();
+ if (size <= 0) return 0;
+ return Math.max(1, Math.floor(this.#maxCacheMessageCount / size));
+ }
+
+ // --- Internal: assignment scanning ---
+
+ #scanAssignments() {
+ try {
+ this.logger.debug?.('Scanning assignments, clientId=%s,
subscriptionCount=%d',
+ this.clientId, this.#subscriptionExpressions.size);
+ for (const [ topic, filterExpression ] of this.#subscriptionExpressions)
{
+ const existed = this.#cacheAssignments.get(topic);
+ this.#queryAssignment(topic)
+ .then(latest => {
+ this.logger.debug?.('Query assignment result, topic=%s,
clientId=%s, assignmentCount=%d',
+ topic, this.clientId, latest.getAssignmentList().length);
+ if (latest.getAssignmentList().length === 0) {
+ if (!existed || existed.getAssignmentList().length === 0) {
+ this.logger.info('Acquired empty assignments from remote,
would scan later, topic=%s, '
+ + 'clientId=%s', topic, this.clientId);
+ return;
+ }
+ this.logger.warn('Attention!!! acquired empty assignments from
remote, but existed assignments'
+ + ' is not empty, topic=%s, clientId=%s', topic,
this.clientId);
+ }
+
+ if (!latest.equals(existed)) {
+ this.logger.info('Assignments of topic=%s has changed, %j => %j,
clientId=%s', topic, existed,
+ latest, this.clientId);
+ this.#syncProcessQueue(topic, latest, filterExpression);
+ this.#cacheAssignments.set(topic, latest);
+ return;
+ }
+ this.logger.debug?.('Assignments of topic=%s remains the same,
assignments=%j, clientId=%s', topic,
+ existed, this.clientId);
+ // Process queue may be dropped, need to be synchronized anyway.
+ this.#syncProcessQueue(topic, latest, filterExpression);
+ })
+ .catch(err => {
+ this.logger.error('Exception raised while scanning the
assignments, topic=%s, clientId=%s, error=%s',
+ topic, this.clientId, err);
+ });
+ }
+ } catch (err) {
+ this.logger.error('Exception raised while scanning the assignments for
all topics, clientId=%s, error=%s',
+ this.clientId, err);
+ }
+ }
+
+ async #queryAssignment(topic: string): Promise<Assignments> {
+ const topicRouteData = await this.getRouteData(topic);
+ const endpointsList = topicRouteData.getTotalEndpoints();
+ if (endpointsList.length === 0) {
+ throw new Error(`No endpoints available for topic=${topic}`);
+ }
+ const endpoints = endpointsList[0];
+
+ const request = new QueryAssignmentRequest()
+ .setTopic(createResource(topic))
+ .setGroup(createResource(this.consumerGroup))
+ .setEndpoints(this.endpoints.toProtobuf());
+
+ const response = await this.rpcClientManager.queryAssignment(
+ endpoints, request, this.requestTimeout,
+ );
+ const status = response.getStatus();
+ if (!status) {
+ throw new Error('Missing status in query assignment response');
+ }
+ StatusChecker.check(status.toObject());
+
+ const assignmentList = response.getAssignmentsList().map(assignment => {
+ const mqPb = assignment.getMessageQueue()!;
+ const mq = new MessageQueue(mqPb);
+ return new Assignment(mq);
+ });
+
+ return new Assignments(assignmentList);
+ }
+
+ #syncProcessQueue(topic: string, assignments: Assignments, filterExpression:
FilterExpression) {
+ const latestMqKeys = new Set<string>();
+ for (const assignment of assignments.getAssignmentList()) {
+ latestMqKeys.add(this.#mqKey(assignment.messageQueue));
+ }
+
+ // Find active message queues
+ const activeMqKeys = new Set<string>();
+ for (const [ mqKey, { mq, pq }] of this.#processQueueTable) {
+ if (mq.topic.name !== topic) {
+ continue;
+ }
+ if (!latestMqKeys.has(mqKey)) {
+ this.logger.info('Drop message queue according to the latest
assignmentList, mq=%s@%s@%s, clientId=%s',
+ mq.topic.name, mq.broker.name, mq.queueId, this.clientId);
+ this.#dropProcessQueue(mqKey);
+ continue;
+ }
+ if (pq.expired()) {
+ this.logger.warn('Drop message queue because it is expired,
mq=%s@%s@%s, clientId=%s',
+ mq.topic.name, mq.broker.name, mq.queueId, this.clientId);
+ this.#dropProcessQueue(mqKey);
+ continue;
+ }
+ activeMqKeys.add(mqKey);
+ }
+
+ // Create new process queues for new assignments
+ for (const assignment of assignments.getAssignmentList()) {
+ const mqKey = this.#mqKey(assignment.messageQueue);
+ if (activeMqKeys.has(mqKey)) {
+ continue;
+ }
+ const processQueue = this.#createProcessQueue(assignment.messageQueue,
filterExpression);
+ if (processQueue) {
+ this.logger.info('Start to fetch message from remote, mq=%s@%s@%s,
clientId=%s',
+ assignment.messageQueue.topic.name,
assignment.messageQueue.broker.name,
+ assignment.messageQueue.queueId, this.clientId);
+ processQueue.fetchMessageImmediately();
+ }
+ }
+ }
+
+ #mqKey(mq: MessageQueue): string {
+ return `${mq.topic.name}@${mq.broker.name}@${mq.queueId}`;
+ }
+
+ #dropProcessQueue(mqKey: string) {
+ const entry = this.#processQueueTable.get(mqKey);
+ if (entry) {
+ entry.pq.drop();
+ this.#processQueueTable.delete(mqKey);
+ }
+ }
+
+ #createProcessQueue(mq: MessageQueue, filterExpression: FilterExpression):
ProcessQueue | null {
+ const mqKey = this.#mqKey(mq);
+ if (this.#processQueueTable.has(mqKey)) {
+ return null;
+ }
+ const processQueue = new ProcessQueue(this, mq, filterExpression);
+ this.#processQueueTable.set(mqKey, { mq, pq: processQueue });
+ return processQueue;
+ }
+}
diff --git a/nodejs/src/consumer/PushSubscriptionSettings.ts
b/nodejs/src/consumer/PushSubscriptionSettings.ts
new file mode 100644
index 00000000..fe97f6b1
--- /dev/null
+++ b/nodejs/src/consumer/PushSubscriptionSettings.ts
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import {
+ Settings as SettingsPB,
+ ClientType,
+ Subscription,
+ RetryPolicy as RetryPolicyPB,
+} from '../../proto/apache/rocketmq/v2/definition_pb';
+import { Endpoints } from '../route';
+import { Settings, UserAgent } from '../client';
+import { ExponentialBackoffRetryPolicy, RetryPolicy } from '../retry';
+import { createDuration, createResource } from '../util';
+import { FilterExpression } from './FilterExpression';
+
+export class PushSubscriptionSettings extends Settings {
+ readonly #group: string;
+ readonly #subscriptionExpressions: Map<string, FilterExpression>;
+ #fifo = false;
+ #receiveBatchSize = 32;
+ #longPollingTimeout = 30000; // ms
+
+ constructor(
+ namespace: string,
+ clientId: string,
+ accessPoint: Endpoints,
+ consumerGroup: string,
+ requestTimeout: number,
+ subscriptionExpressions: Map<string, FilterExpression>,
+ longPollingTimeout?: number,
+ ) {
+ super(namespace, clientId, ClientType.PUSH_CONSUMER, accessPoint,
requestTimeout);
+ this.#group = consumerGroup;
+ this.#subscriptionExpressions = subscriptionExpressions;
+ if (longPollingTimeout !== undefined) {
+ this.#longPollingTimeout = longPollingTimeout;
+ }
+ }
+
+ isFifo(): boolean {
+ return this.#fifo;
+ }
+
+ getReceiveBatchSize(): number {
+ return this.#receiveBatchSize;
+ }
+
+ getLongPollingTimeout(): number {
+ return this.#longPollingTimeout;
+ }
+
+ getRetryPolicy(): RetryPolicy | undefined {
+ return this.retryPolicy;
+ }
+
+ toProtobuf(): SettingsPB {
+ const subscription = new Subscription()
+ .setGroup(createResource(this.#group));
+
+ for (const [ topic, filterExpression ] of
this.#subscriptionExpressions.entries()) {
+ subscription.addSubscriptions()
+ .setTopic(createResource(topic))
+ .setExpression(filterExpression.toProtobuf());
+ }
+
+ return new SettingsPB()
+ .setClientType(this.clientType)
+ .setAccessPoint(this.accessPoint.toProtobuf())
+ .setRequestTimeout(createDuration(this.requestTimeout))
+ .setSubscription(subscription)
+ .setUserAgent(UserAgent.INSTANCE.toProtobuf());
+ }
+
+ sync(settings: SettingsPB): void {
+ if (settings.getPubSubCase() !== SettingsPB.PubSubCase.SUBSCRIPTION) {
+ return;
+ }
+ const subscription = settings.getSubscription();
+ if (subscription) {
+ this.#fifo = subscription.getFifo() ?? false;
+ this.#receiveBatchSize = subscription.getReceiveBatchSize() ?? 32;
+ const longPollingTimeout = subscription.getLongPollingTimeout();
+ if (longPollingTimeout) {
+ this.#longPollingTimeout = longPollingTimeout.getSeconds() * 1000 +
+ Math.floor(longPollingTimeout.getNanos() / 1000000);
+ }
+ }
+ const backoffPolicy = settings.getBackoffPolicy();
+ if (backoffPolicy) {
+ switch (backoffPolicy.getStrategyCase()) {
+ case RetryPolicyPB.StrategyCase.EXPONENTIAL_BACKOFF: {
+ const exponential =
backoffPolicy.getExponentialBackoff()!.toObject();
+ this.retryPolicy = new ExponentialBackoffRetryPolicy(
+ backoffPolicy.getMaxAttempts(),
+ exponential.initial?.seconds,
+ exponential.max?.seconds,
+ exponential.multiplier,
+ );
+ break;
+ }
+ case RetryPolicyPB.StrategyCase.CUSTOMIZED_BACKOFF:
+ // CustomizedBackoffRetryPolicy not yet implemented in Node.js
+ break;
+ default:
+ break;
+ }
+ }
+ }
+}
diff --git a/nodejs/src/consumer/SimpleConsumer.ts
b/nodejs/src/consumer/SimpleConsumer.ts
index b5b045a6..994ae957 100644
--- a/nodejs/src/consumer/SimpleConsumer.ts
+++ b/nodejs/src/consumer/SimpleConsumer.ts
@@ -25,6 +25,8 @@ import { SimpleSubscriptionSettings } from
'./SimpleSubscriptionSettings';
import { SubscriptionLoadBalancer } from './SubscriptionLoadBalancer';
import { Consumer, ConsumerOptions } from './Consumer';
+const RANDOM_INDEX = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER);
+
export interface SimpleConsumerOptions extends ConsumerOptions {
/**
* support tag string as filter, e.g.:
@@ -46,7 +48,7 @@ export class SimpleConsumer extends Consumer {
readonly #subscriptionExpressions = new Map<string, FilterExpression>();
readonly #subscriptionRouteDataCache = new Map<string,
SubscriptionLoadBalancer>();
readonly #awaitDuration: number;
- #topicIndex = 0;
+ #topicIndex = RANDOM_INDEX;
constructor(options: SimpleConsumerOptions) {
options.topics = Array.from(options.subscriptions.keys());
@@ -113,8 +115,16 @@ export class SimpleConsumer extends Consumer {
}
async receive(maxMessageNum = 10, invisibleDuration = 15000) {
+ if (maxMessageNum <= 0) {
+ throw new Error(`maxMessageNum must be greater than 0, but got
${maxMessageNum}`);
+ }
+
const topic = this.#nextTopic();
- const filterExpression = this.#subscriptionExpressions.get(topic)!;
+ const filterExpression = this.#subscriptionExpressions.get(topic);
+ if (!filterExpression) {
+ throw new Error(`No subscription found for topic=${topic}, please
subscribe first`);
+ }
+
const loadBalancer = await this.#getSubscriptionLoadBalancer(topic);
const mq = loadBalancer.takeMessageQueue();
const request = this.wrapReceiveMessageRequest(maxMessageNum, mq,
filterExpression,
@@ -126,15 +136,18 @@ export class SimpleConsumer extends Consumer {
await this.ackMessage(message);
}
- async changeInvisibleDuration0(message: MessageView, invisibleDuration:
number) {
- await this.changeInvisibleDuration0(message, invisibleDuration);
+ async changeInvisibleDuration(message: MessageView, invisibleDuration:
number) {
+ const response = await this.invisibleDuration(message, invisibleDuration);
+ // Refresh receipt handle manually
+ (message as any).receiptHandle = response;
}
#nextTopic() {
const topics = Array.from(this.#subscriptionExpressions.keys());
- if (this.#topicIndex >= topics.length) {
- this.#topicIndex = 0;
+ if (topics.length === 0) {
+ throw new Error('No subscriptions available to receive messages');
}
- return topics[this.#topicIndex++];
+ const index = Math.abs(this.#topicIndex++ % topics.length);
+ return topics[index];
}
}
diff --git a/nodejs/src/client/Logger.ts
b/nodejs/src/consumer/StandardConsumeService.ts
similarity index 50%
copy from nodejs/src/client/Logger.ts
copy to nodejs/src/consumer/StandardConsumeService.ts
index 8a5abbeb..3121f210 100644
--- a/nodejs/src/client/Logger.ts
+++ b/nodejs/src/consumer/StandardConsumeService.ts
@@ -15,21 +15,30 @@
* limitations under the License.
*/
-import path from 'node:path';
-import { homedir } from 'node:os';
-import { EggLogger } from 'egg-logger';
+import { MessageView } from '../message';
+import { ConsumeService } from './ConsumeService';
+import { MessageListener } from './MessageListener';
+import type { ProcessQueue } from './ProcessQueue';
-export interface ILogger {
- info(...args: any[]): void;
- warn(...args: any[]): void;
- error(...args: any[]): void;
- close?(...args: any[]): void;
-}
+export class StandardConsumeService extends ConsumeService {
+ constructor(clientId: string, messageListener: MessageListener) {
+ super(clientId, messageListener);
+ }
+
+ consume(pq: ProcessQueue, messageViews: MessageView[]): void {
+ for (const messageView of messageViews) {
+ if (messageView.corrupted) {
+ pq.discardMessage(messageView);
+ continue;
+ }
-export function getDefaultLogger() {
- const file = path.join(homedir(),
'logs/rocketmq/rocketmq_client_nodejs.log');
- return new EggLogger({
- file,
- level: 'INFO',
- });
+ this.consumeMessage(messageView)
+ .then(result => {
+ pq.eraseMessage(messageView, result);
+ })
+ .catch(() => {
+ // Should never reach here.
+ });
+ }
+ }
}
diff --git a/nodejs/src/consumer/index.ts b/nodejs/src/consumer/index.ts
index 73dbd62f..8b1cfc1c 100644
--- a/nodejs/src/consumer/index.ts
+++ b/nodejs/src/consumer/index.ts
@@ -20,3 +20,14 @@ export * from './FilterExpression';
export * from './SimpleConsumer';
export * from './SimpleSubscriptionSettings';
export * from './SubscriptionLoadBalancer';
+export * from './ConsumeResult';
+export * from './MessageListener';
+export * from './Assignment';
+export * from './Assignments';
+export * from './PushSubscriptionSettings';
+export * from './ConsumeTask';
+export * from './ConsumeService';
+export * from './StandardConsumeService';
+export * from './FifoConsumeService';
+export * from './ProcessQueue';
+export * from './PushConsumer';
diff --git a/nodejs/src/producer/Producer.ts b/nodejs/src/producer/Producer.ts
index 8484167f..9e281e36 100644
--- a/nodejs/src/producer/Producer.ts
+++ b/nodejs/src/producer/Producer.ts
@@ -21,6 +21,7 @@ import {
ClientType,
MessageType,
TransactionResolution,
+ TransactionSource,
} from '../../proto/apache/rocketmq/v2/definition_pb';
import {
EndTransactionRequest,
@@ -71,24 +72,50 @@ export class Producer extends BaseClient {
this.#checker = options.checker;
}
+ async startup() {
+ this.logger.info('Begin to start the rocketmq producer, clientId=%s',
this.clientId);
+ await super.startup();
+ this.logger.info('The rocketmq producer starts successfully, clientId=%s',
this.clientId);
+ }
+
+ async shutdown() {
+ this.logger.info('Begin to shutdown the rocketmq producer, clientId=%s',
this.clientId);
+ await super.shutdown();
+ this.logger.info('Shutdown the rocketmq producer successfully,
clientId=%s', this.clientId);
+ }
+
get publishingSettings() {
return this.#publishingSettings;
}
beginTransaction() {
assert(this.#checker, 'Transaction checker should not be null');
+ // Check producer status before beginning transaction
+ if (!this.isRunning()) {
+ this.logger.error('Unable to begin a transaction because producer is not
running, clientId=%s', this.clientId);
+ throw new Error('Producer is not running now');
+ }
return new Transaction(this);
}
async endTransaction(endpoints: Endpoints, message: Message, messageId:
string,
- transactionId: string, resolution: TransactionResolution) {
+ transactionId: string, resolution: TransactionResolution, source:
TransactionSource = TransactionSource.SOURCE_CLIENT) {
+ const resolutionStr = resolution === TransactionResolution.COMMIT ?
'COMMIT' : 'ROLLBACK';
+ const sourceStr = TransactionSource[source];
+ this.logger.info('Begin to end transaction, messageId=%s,
transactionId=%s, resolution=%s, source=%s, clientId=%s',
+ messageId, transactionId, resolutionStr, sourceStr, this.clientId);
+
const request = new EndTransactionRequest()
.setMessageId(messageId)
.setTransactionId(transactionId)
.setTopic(createResource(message.topic).setResourceNamespace(this.namespace))
- .setResolution(resolution);
+ .setResolution(resolution)
+ .setSource(source);
const response = await this.rpcClientManager.endTransaction(endpoints,
request, this.requestTimeout);
StatusChecker.check(response.getStatus()?.toObject());
+
+ this.logger.info('End transaction successfully, messageId=%s,
transactionId=%s, resolution=%s, source=%s, clientId=%s',
+ messageId, transactionId, resolutionStr, sourceStr, this.clientId);
}
async onRecoverOrphanedTransactionCommand(endpoints: Endpoints, command:
RecoverOrphanedTransactionCommand) {
@@ -114,7 +141,9 @@ export class Producer extends BaseClient {
if (resolution === null || resolution ===
TransactionResolution.TRANSACTION_RESOLUTION_UNSPECIFIED) {
return;
}
- await this.endTransaction(endpoints, messageView, messageId,
transactionId, resolution);
+ // Use SOURCE_SERVER_CHECK for transaction recovery
+ await this.endTransaction(endpoints, messageView, messageId,
transactionId, resolution,
+ TransactionSource.SOURCE_SERVER_CHECK);
this.logger.info('Recover orphaned transaction message success,
transactionId=%s, resolution=%s, messageId=%s, clientId=%s',
transactionId, resolution, messageId, this.clientId);
} catch (err) {
@@ -143,14 +172,26 @@ export class Producer extends BaseClient {
return sendReceipts[0];
}
- const publishingMessage = transaction.tryAddMessage(message);
- const sendReceipts = await this.#send([ message ], true);
- const sendReceipt = sendReceipts[0];
- transaction.tryAddReceipt(publishingMessage, sendReceipt);
- return sendReceipt;
+ // Send transactional message
+ try {
+ const publishingMessage = transaction.tryAddMessage(message);
+ const sendReceipts = await this.#send([ message ], true);
+ const sendReceipt = sendReceipts[0];
+ transaction.tryAddReceipt(publishingMessage, sendReceipt);
+ return sendReceipt;
+ } catch (err) {
+ this.logger.error('Failed to send transactional message, clientId=%s,
error=%s', this.clientId, err);
+ throw err;
+ }
}
async #send(messages: MessageOptions[], txEnabled: boolean) {
+ // Check producer status before message publishing
+ if (!this.isRunning()) {
+ this.logger.error('Unable to send message because producer is not
running, clientId=%s', this.clientId);
+ throw new Error('Producer is not running now');
+ }
+
const pubMessages: PublishingMessage[] = [];
const topics = new Set<string>();
for (const message of messages) {
@@ -158,21 +199,21 @@ export class Producer extends BaseClient {
topics.add(message.topic);
}
if (topics.size > 1) {
- throw new TypeError(`Messages to send have different
topics=${JSON.stringify(topics)}`);
+ throw new TypeError(`Messages to send have different
topics=${JSON.stringify(Array.from(topics))}`);
}
const topic = pubMessages[0].topic;
const messageType = pubMessages[0].messageType;
const messageGroup = pubMessages[0].messageGroup;
const messageTypes = new Set(pubMessages.map(m => m.messageType));
if (messageTypes.size > 1) {
- throw new TypeError(`Messages to send have different
types=${JSON.stringify(messageTypes)}`);
+ throw new TypeError(`Messages to send have different
types=${JSON.stringify(Array.from(messageTypes))}`);
}
// Message group must be same if message type is FIFO, or no need to
proceed.
if (messageType === MessageType.FIFO) {
const messageGroups = new Set(pubMessages.map(m => m.messageGroup!));
if (messageGroups.size > 1) {
- throw new TypeError(`FIFO messages to send have message groups,
messageGroups=${JSON.stringify(messageGroups)}`);
+ throw new TypeError(`FIFO messages to send have message groups,
messageGroups=${JSON.stringify(Array.from(messageGroups))}`);
}
}
@@ -223,42 +264,42 @@ export class Producer extends BaseClient {
sendReceipts = SendReceipt.processResponseInvocation(mq, response);
} catch (err) {
const messageIds = messages.map(m => m.messageId);
- // Isolate endpoints because of sending failure.
+ // Isolate endpoints because of sending failure
this.#isolate(endpoints);
if (attempt >= maxAttempts) {
- // No need more attempts.
- this.logger.error('Failed to send message(s) finally, run out of
attempt times, maxAttempts=%s, attempt=%s, topic=%s, messageId(s)=%s,
endpoints=%s, clientId=%s, error=%s',
+ // No more attempts
+ this.logger.error('Failed to send message(s) finally, run out of
attempt times, maxAttempts=%d, attempt=%d, topic=%s, messageId(s)=%s,
endpoints=%s, clientId=%s, error=%s',
maxAttempts, attempt, topic, messageIds, endpoints, this.clientId,
err);
throw err;
}
- // No need more attempts for transactional message.
+ // No more attempts for transactional message
if (messageType === MessageType.TRANSACTION) {
- this.logger.error('Failed to send transactional message finally,
maxAttempts=%s, attempt=%s, topic=%s, messageId(s)=%s, endpoints=%s,
clientId=%s, error=%s',
+ this.logger.error('Failed to send transactional message finally,
maxAttempts=%d, attempt=%d, topic=%s, messageId(s)=%s, endpoints=%s,
clientId=%s, error=%s',
maxAttempts, attempt, topic, messageIds, endpoints, this.clientId,
err);
throw err;
}
- // Try to do more attempts.
+ // Try next attempt
const nextAttempt = 1 + attempt;
- // Retry immediately if the request is not throttled.
+ // Retry immediately if the request is not throttled
if (!(err instanceof TooManyRequestsException)) {
- this.logger.warn('Failed to send message, would attempt to resend
right now, maxAttempts=%s, attempt=%s, topic=%s, messageId(s)=%s, endpoints=%s,
clientId=%s, error=%s',
+ this.logger.warn('Failed to send message, would attempt to resend
right now, maxAttempts=%d, attempt=%d, topic=%s, messageId(s)=%s, endpoints=%s,
clientId=%s, error=%s',
maxAttempts, attempt, topic, messageIds, endpoints, this.clientId,
err);
return this.#send0(topic, messageType, candidates, messages,
nextAttempt);
}
const delay = this.#getRetryPolicy().getNextAttemptDelay(nextAttempt);
- this.logger.warn('Failed to send message due to too many requests, would
attempt to resend after %sms, maxAttempts=%s, attempt=%s, topic=%s,
messageId(s)=%s, endpoints=%s, clientId=%s, error=%s',
+ this.logger.warn('Failed to send message due to too many requests, would
attempt to resend after %dms, maxAttempts=%d, attempt=%d, topic=%s,
messageId(s)=%s, endpoints=%s, clientId=%s, error=%s',
delay, maxAttempts, attempt, topic, messageIds, endpoints,
this.clientId, err);
await setTimeout(delay);
return this.#send0(topic, messageType, candidates, messages,
nextAttempt);
}
- // Resend message(s) successfully.
+ // Resend message(s) successfully
if (attempt > 1) {
const messageIds = sendReceipts.map(r => r.messageId);
- this.logger.info('Resend message successfully, topic=%s,
messageId(s)=%j, maxAttempts=%s, attempt=%s, endpoints=%s, clientId=%s',
+ this.logger.info('Resend message successfully, topic=%s,
messageId(s)=%s, maxAttempts=%d, attempt=%d, endpoints=%s, clientId=%s',
topic, messageIds, maxAttempts, attempt, endpoints, this.clientId);
}
- // Send message(s) successfully on first attempt, return directly.
+ // Send message(s) successfully on first attempt, return directly
return sendReceipts;
}
diff --git a/nodejs/src/producer/Transaction.ts
b/nodejs/src/producer/Transaction.ts
index 85d525ea..a172b352 100644
--- a/nodejs/src/producer/Transaction.ts
+++ b/nodejs/src/producer/Transaction.ts
@@ -50,21 +50,37 @@ export class Transaction {
if (this.#messageSendReceiptMap.size === 0) {
throw new TypeError('Transactional message has not been sent yet');
}
+
+ const logger = (this.#producer as any).logger;
+ logger.info('Begin to commit transaction, messageCount=%d, clientId=%s',
+ this.#messageSendReceiptMap.size, (this.#producer as any).clientId);
+
for (const [ messageId, sendReceipt ] of
this.#messageSendReceiptMap.entries()) {
const publishingMessage = this.#messageMap.get(messageId)!;
await this.#producer.endTransaction(sendReceipt.endpoints,
publishingMessage,
sendReceipt.messageId, sendReceipt.transactionId,
TransactionResolution.COMMIT);
}
+
+ logger.info('Commit transaction successfully, messageCount=%d,
clientId=%s',
+ this.#messageSendReceiptMap.size, (this.#producer as any).clientId);
}
async rollback() {
if (this.#messageSendReceiptMap.size === 0) {
throw new TypeError('Transactional message has not been sent yet');
}
+
+ const logger = (this.#producer as any).logger;
+ logger.info('Begin to rollback transaction, messageCount=%d, clientId=%s',
+ this.#messageSendReceiptMap.size, (this.#producer as any).clientId);
+
for (const [ messageId, sendReceipt ] of
this.#messageSendReceiptMap.entries()) {
const publishingMessage = this.#messageMap.get(messageId)!;
await this.#producer.endTransaction(sendReceipt.endpoints,
publishingMessage,
sendReceipt.messageId, sendReceipt.transactionId,
TransactionResolution.ROLLBACK);
}
+
+ logger.info('Rollback transaction successfully, messageCount=%d,
clientId=%s',
+ this.#messageSendReceiptMap.size, (this.#producer as any).clientId);
}
}