This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ec167b8 Node js push consumer (#1201)
5ec167b8 is described below

commit 5ec167b860da43a20993899e563c55486ce316a8
Author: zhaohai <[email protected]>
AuthorDate: Thu Mar 19 15:28:04 2026 +0800

    Node js push consumer (#1201)
    
    * add push consumer
    
    * update check
    
    * add example
    
    * add log
    
    * add setInvisibleDuration
    
    * update proto
    
    * setAutoRenew true
    
    * The fix adheres to best practices for both TypeScript and ESLint
    
    * Better load balancing: Random start index avoids hotspot concentration.
    Stronger robustness: Parameter validation prevents illegal input.
    Easy to debug: detailed startup and shutdown logs.
    No bugs: Fixed infinite recursion issue.
    
    * 详细的启动和关闭日志,清晰的错误堆栈,准确的重试追踪。
    
    * Calling send() before startup throws an error. Calling send() after 
startup works normally. Calling send() after shutdown throws an error. Check 
the status before the transaction starts.
    
    * The commit log is complete and clear, the rollback log is complete and 
clear, and the error handling is comprehensive.
    
    * Fix the issue message for review.
    
    * Fix null pointer exceptions and clear the timer after exception catching.
---
 nodejs/examples/PushConsumer.ts                    | 165 ++++++++
 nodejs/examples/PushConsumerQuickStart.ts          |  65 +++
 nodejs/proto/apache/rocketmq/v2/definition.proto   |   6 +
 nodejs/proto/apache/rocketmq/v2/service.proto      |  32 ++
 nodejs/src/client/BaseClient.ts                    |  14 +
 nodejs/src/client/Logger.ts                        |   1 +
 nodejs/src/client/TelemetrySession.ts              |  24 +-
 .../{client/Logger.ts => consumer/Assignment.ts}   |  34 +-
 .../{client/Logger.ts => consumer/Assignments.ts}  |  41 +-
 nodejs/src/consumer/{index.ts => ConsumeResult.ts} |   9 +-
 nodejs/src/consumer/ConsumeService.ts              |  54 +++
 .../{client/Logger.ts => consumer/ConsumeTask.ts}  |  35 +-
 nodejs/src/consumer/Consumer.ts                    |  18 +
 nodejs/src/consumer/FifoConsumeService.ts          |  50 +++
 .../src/consumer/{index.ts => MessageListener.ts}  |  11 +-
 nodejs/src/consumer/ProcessQueue.ts                | 326 ++++++++++++++
 nodejs/src/consumer/PushConsumer.ts                | 470 +++++++++++++++++++++
 nodejs/src/consumer/PushSubscriptionSettings.ts    | 123 ++++++
 nodejs/src/consumer/SimpleConsumer.ts              |  27 +-
 .../StandardConsumeService.ts}                     |  39 +-
 nodejs/src/consumer/index.ts                       |  11 +
 nodejs/src/producer/Producer.ts                    |  87 +++-
 nodejs/src/producer/Transaction.ts                 |  16 +
 23 files changed, 1552 insertions(+), 106 deletions(-)

diff --git a/nodejs/examples/PushConsumer.ts b/nodejs/examples/PushConsumer.ts
new file mode 100644
index 00000000..ea53a377
--- /dev/null
+++ b/nodejs/examples/PushConsumer.ts
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * PushConsumer Example
+ * 
+ * This example demonstrates how to use PushConsumer to consume messages.
+ * PushConsumer is a push-mode consumer that actively pulls messages from the 
server
+ * and pushes them to the listener for processing.
+ */
+
+import { PushConsumer, ConsumeResult } from '../src';
+import type { MessageView } from '../src';
+
+// Get configuration from environment variables
+const ACCESS_KEY = process.env.ROCKETMQ_ACCESS_KEY || 'yourAccessKey';
+const SECRET_KEY = process.env.ROCKETMQ_SECRET_KEY || 'yourSecretKey';
+const ENDPOINT = process.env.ROCKETMQ_ENDPOINT || 'localhost:8080';
+
+async function main() {
+  console.log('========== PushConsumer Example ==========');
+
+  // 1. Define message listener
+  const messageListener = {
+    async consume(messageView: MessageView): Promise<ConsumeResult> {
+      // Process received messages here
+      console.log('Received message:', {
+        messageId: messageView.messageId,
+        topic: messageView.topic,
+        tag: messageView.tag,
+        keys: messageView.keys,
+        body: messageView.body.toString('utf-8'),
+        deliveryAttempt: messageView.deliveryAttempt,
+      });
+
+      // Simulate business processing
+      try {
+        // TODO: Add your business logic here
+        await doBusinessLogic(messageView);
+        
+        // Return success to indicate message has been consumed successfully
+        return ConsumeResult.SUCCESS;
+      } catch (error) {
+        console.error('Failed to process message:', error);
+        // Return failure, message will be retried
+        return ConsumeResult.FAILURE;
+      }
+    },
+  };
+
+  // 2. Configure PushConsumer
+  const pushConsumer = new PushConsumer({
+    // Basic configuration
+    namespace: process.env.ROCKETMQ_NAMESPACE || 'yourNamespace', // Namespace
+    endpoints: ENDPOINT,        // RocketMQ server endpoint
+    
+    // Authentication credentials (optional)
+    sessionCredentials: {
+      accessKey: ACCESS_KEY,    // AccessKey for authentication
+      accessSecret: SECRET_KEY, // SecretKey for authentication
+      // securityToken: 'yourSecurityToken', // SecurityToken, optional
+    },
+    
+    // Consumer group configuration
+    consumerGroup: 'yourConsumerGroup',
+    
+    // Subscription configuration: Map<topic, filterExpression>
+    // filterExpression can be a string (TAG expression) or FilterExpression 
object
+    subscriptions: new Map([
+      ['yourTopic1', '*'],              // Subscribe to all TAGs
+      ['yourTopic2', 'TagA || TagB'],   // Subscribe to specific TAGs
+      // ['yourTopic3', new FilterExpression('yourSqlExpression', 
FilterType.SQL92)],
+    ]),
+    
+    // Message listener
+    messageListener: messageListener,
+    
+    // Cache configuration (optional)
+    maxCacheMessageCount: 1024,         // Max cached messages per queue, 
default 1024
+    maxCacheMessageSizeInBytes: 67108864, // Max cached bytes per queue 
(64MB), default 64MB
+    
+    // Long polling timeout configuration (optional)
+    longPollingTimeout: 30000,          // Long polling timeout in 
milliseconds, default 30000ms
+    
+    // Request timeout configuration (optional)
+    requestTimeout: 3000,               // Request timeout in milliseconds, 
default 3000ms
+    
+    // Logger configuration (optional)
+    // logger: yourCustomLogger,        // Custom logger
+  });
+
+  try {
+    // 3. Start consumer
+    console.log('Starting PushConsumer...');
+    await pushConsumer.startup();
+    console.log('PushConsumer started successfully!');
+    console.log('Client ID:', pushConsumer.getClientId());
+    console.log('Consumer Group:', pushConsumer.getConsumerGroup());
+
+    // 4. Dynamic subscription (optional)
+    // Can add new subscriptions at runtime
+    // import { FilterExpression } from '../src';
+    // await pushConsumer.subscribe('newTopic', new FilterExpression('TagC'));
+
+    // 5. Unsubscribe (optional)
+    // pushConsumer.unsubscribe('yourTopic1');
+
+    // Keep program running, waiting for messages
+    console.log('\nPress Ctrl+C to exit...');
+    
+    // Graceful shutdown handling
+    process.on('SIGINT', async () => {
+      console.log('\nShutting down PushConsumer...');
+      await shutdown(pushConsumer);
+      process.exit(0);
+    });
+
+    // Keep program running
+    await new Promise(() => {});
+  } catch (error) {
+    console.error('Failed to start PushConsumer:', error);
+    await shutdown(pushConsumer);
+    process.exit(1);
+  }
+}
+
+// Business logic processing function example
+async function doBusinessLogic(messageView: MessageView): Promise<void> {
+  // Simulate asynchronous business processing
+  await new Promise(resolve => setTimeout(resolve, 100));
+  
+  // Implement your business logic here
+  // For example:
+  // - Parse message content
+  // - Call database
+  // - Call external API
+  // - Send notifications, etc.
+}
+
+// Gracefully shutdown consumer
+async function shutdown(pushConsumer: PushConsumer) {
+  try {
+    await pushConsumer.shutdown();
+    console.log('PushConsumer has been closed');
+  } catch (error) {
+    console.error('Error occurred while closing PushConsumer:', error);
+  }
+}
+
+// Run example
+main().catch(console.error);
diff --git a/nodejs/examples/PushConsumerQuickStart.ts 
b/nodejs/examples/PushConsumerQuickStart.ts
new file mode 100644
index 00000000..655bf245
--- /dev/null
+++ b/nodejs/examples/PushConsumerQuickStart.ts
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * PushConsumer Quick Start Example
+ * 
+ * The simplest way to use PushConsumer
+ */
+
+import { PushConsumer, ConsumeResult, type MessageView } from '../src';
+
+async function quickStart() {
+  // Create PushConsumer instance
+  const pushConsumer = new PushConsumer({
+    namespace: process.env.ROCKETMQ_NAMESPACE || '', // Namespace, can be 
empty string
+    endpoints: process.env.ROCKETMQ_ENDPOINT || 'localhost:8080',
+    consumerGroup: 'yourConsumerGroup',
+    
+    // Subscribe to topic and TAG
+    subscriptions: new Map([
+      ['yourTopic', '*'],  // Subscribe to yourTopic, receive all TAGs
+    ]),
+    
+    // Message listener - this is the core processing logic
+    messageListener: {
+      async consume(messageView: MessageView): Promise<ConsumeResult> {
+        console.log('Received message:', messageView.body.toString('utf-8'));
+        
+        // TODO: Process your business logic here
+        
+        return ConsumeResult.SUCCESS;
+      },
+    },
+  });
+
+  try {
+    // Start consumer
+    await pushConsumer.startup();
+    console.log('PushConsumer started, waiting for messages...');
+    
+    // Keep running
+    await new Promise(() => {});
+  } catch (error) {
+    console.error('Error:', error);
+    await pushConsumer.shutdown();
+    throw error;
+  }
+}
+
+// Run example
+quickStart().catch(console.error);
diff --git a/nodejs/proto/apache/rocketmq/v2/definition.proto 
b/nodejs/proto/apache/rocketmq/v2/definition.proto
index 753bfceb..468c4105 100644
--- a/nodejs/proto/apache/rocketmq/v2/definition.proto
+++ b/nodejs/proto/apache/rocketmq/v2/definition.proto
@@ -346,6 +346,8 @@ enum Code {
   CLIENT_ID_REQUIRED = 40017;
   // Polling time is illegal.
   ILLEGAL_POLLING_TIME = 40018;
+  // Offset is illegal.
+  ILLEGAL_OFFSET = 40019;
 
   // Generic code indicates that the client request lacks valid authentication
   // credentials for the requested resource.
@@ -365,6 +367,8 @@ enum Code {
   TOPIC_NOT_FOUND = 40402;
   // Consumer group resource does not exist.
   CONSUMER_GROUP_NOT_FOUND = 40403;
+  // Offset not found from server.
+  OFFSET_NOT_FOUND = 40404;
 
   // Generic code representing client side timeout when connecting to, reading 
data from, or write data to server.
   REQUEST_TIMEOUT = 40800;
@@ -373,6 +377,8 @@ enum Code {
   PAYLOAD_TOO_LARGE = 41300;
   // Message body size exceeds the threshold.
   MESSAGE_BODY_TOO_LARGE = 41301;
+  // Message body is empty.
+  MESSAGE_BODY_EMPTY = 41302;
 
   // Generic code for use cases where pre-conditions are not met.
   // For example, if a producer instance is used to publish messages without 
prior start() invocation,
diff --git a/nodejs/proto/apache/rocketmq/v2/service.proto 
b/nodejs/proto/apache/rocketmq/v2/service.proto
index f662f769..18db185a 100644
--- a/nodejs/proto/apache/rocketmq/v2/service.proto
+++ b/nodejs/proto/apache/rocketmq/v2/service.proto
@@ -66,6 +66,8 @@ message SendResultEntry {
   string message_id = 2;
   string transaction_id = 3;
   int64 offset = 4;
+  // Unique handle to identify message to recall, support delay message for 
now.
+  string recall_handle = 5;
 }
 
 message SendMessageResponse {
@@ -97,6 +99,7 @@ message ReceiveMessageRequest {
   // For message auto renew and clean
   bool auto_renew = 6;
   optional google.protobuf.Duration long_polling_timeout = 7;
+  optional string attempt_id = 8;
 }
 
 message ReceiveMessageResponse {
@@ -169,6 +172,8 @@ message EndTransactionResponse { Status status = 1; }
 
 message PrintThreadStackTraceCommand { string nonce = 1; }
 
+message ReconnectEndpointsCommand { string nonce = 1; }
+
 message ThreadStackTrace {
   string nonce = 1;
   optional string thread_stack_trace = 2;
@@ -213,6 +218,9 @@ message TelemetryCommand {
 
     // Request client to verify the consumption of the appointed message.
     VerifyMessageCommand verify_message_command = 7;
+
+    // Request client to reconnect server use the latest endpoints.
+    ReconnectEndpointsCommand reconnect_endpoints_command = 8;
   }
 }
 
@@ -292,6 +300,17 @@ message QueryOffsetResponse {
   int64 offset = 2;
 }
 
+message RecallMessageRequest {
+  Resource topic = 1;
+  // Refer to SendResultEntry.
+  string recall_handle = 2;
+}
+
+message RecallMessageResponse {
+  Status status = 1;
+  string message_id = 2;
+}
+
 // For all the RPCs in MessagingService, the following error handling policies
 // apply:
 //
@@ -377,12 +396,19 @@ service MessagingService {
   rpc ForwardMessageToDeadLetterQueue(ForwardMessageToDeadLetterQueueRequest)
       returns (ForwardMessageToDeadLetterQueueResponse) {}
 
+  // PullMessage and ReceiveMessage RPCs serve a similar purpose,
+  // which is to attempt to get messages from the server, but with different 
semantics.
   rpc PullMessage(PullMessageRequest) returns (stream PullMessageResponse) {}
 
+  // Update the consumption progress of the designated queue of the
+  // consumer group to the remote.
   rpc UpdateOffset(UpdateOffsetRequest) returns (UpdateOffsetResponse) {}
 
+  // Query the consumption progress of the designated queue of the
+  // consumer group to the remote.
   rpc GetOffset(GetOffsetRequest) returns (GetOffsetResponse) {}
 
+  // Query the offset of the designated queue by the query offset policy.
   rpc QueryOffset(QueryOffsetRequest) returns (QueryOffsetResponse) {}
 
   // Commits or rollback one transactional message.
@@ -408,4 +434,10 @@ service MessagingService {
   // ChangeInvisibleDuration to lengthen invisible duration.
   rpc ChangeInvisibleDuration(ChangeInvisibleDurationRequest) returns 
(ChangeInvisibleDurationResponse) {
   }
+
+  // Recall a message,
+  // for delay message, should recall before delivery time, like the rollback 
operation of transaction message,
+  // for normal message, not supported for now.
+  rpc RecallMessage(RecallMessageRequest) returns (RecallMessageResponse) {
+  }
 }
\ No newline at end of file
diff --git a/nodejs/src/client/BaseClient.ts b/nodejs/src/client/BaseClient.ts
index 77f790b9..d1651663 100644
--- a/nodejs/src/client/BaseClient.ts
+++ b/nodejs/src/client/BaseClient.ts
@@ -89,6 +89,7 @@ export abstract class BaseClient {
   #startupResolve?: () => void;
   #startupReject?: (err: Error) => void;
   #timers: NodeJS.Timeout[] = [];
+  #running = false;
 
   constructor(options: BaseClientOptions) {
     this.logger = options.logger ?? getDefaultLogger();
@@ -125,8 +126,11 @@ export abstract class BaseClient {
   }
 
   async #startup() {
+    this.logger.info('Begin to execute startup flow, clientId=%s, topics=%d',
+      this.clientId, this.topics.size);
     // fetch topic route
     await this.updateRoutes();
+    this.logger.info('Topic routes updated, clientId=%s', this.clientId);
     // update topic route every 30s
     this.#timers.push(setInterval(async () => {
       this.updateRoutes();
@@ -146,19 +150,27 @@ export abstract class BaseClient {
     // doStats()
 
     if (this.topics.size > 0) {
+      this.logger.info('Waiting for first onSettingsCommand, clientId=%s', 
this.clientId);
       // wait for this first onSettingsCommand call
       // eslint-disable-next-line @typescript-eslint/no-unused-vars
       await new Promise<void>((resolve, reject) => {
         this.#startupReject = reject;
         this.#startupResolve = resolve;
       });
+      this.logger.info('Received first onSettingsCommand, clientId=%s', 
this.clientId);
       this.#startupReject = undefined;
       this.#startupResolve = undefined;
     }
+    this.#running = true;
+  }
+
+  isRunning(): boolean {
+    return this.#running;
   }
 
   async shutdown() {
     this.logger.info('Begin to shutdown the rocketmq client, clientId=%s', 
this.clientId);
+    this.#running = false;
     while (this.#timers.length > 0) {
       const timer = this.#timers.pop();
       clearInterval(timer);
@@ -351,6 +363,8 @@ export abstract class BaseClient {
   }
 
   onSettingsCommand(_endpoints: Endpoints, settings: SettingsPB) {
+    this.logger.info('Received settings command, clientId=%s, settings=%j',
+      this.clientId, settings.toObject());
     // final Metric metric = new Metric(settings.getMetric());
     // clientMeterManager.reset(metric);
     this.getSettings().sync(settings);
diff --git a/nodejs/src/client/Logger.ts b/nodejs/src/client/Logger.ts
index 8a5abbeb..269883bd 100644
--- a/nodejs/src/client/Logger.ts
+++ b/nodejs/src/client/Logger.ts
@@ -23,6 +23,7 @@ export interface ILogger {
   info(...args: any[]): void;
   warn(...args: any[]): void;
   error(...args: any[]): void;
+  debug?(...args: any[]): void;
   close?(...args: any[]): void;
 }
 
diff --git a/nodejs/src/client/TelemetrySession.ts 
b/nodejs/src/client/TelemetrySession.ts
index 3812f928..e36fd19e 100644
--- a/nodejs/src/client/TelemetrySession.ts
+++ b/nodejs/src/client/TelemetrySession.ts
@@ -31,6 +31,8 @@ export class TelemetrySession {
     this.#endpoints = endpoints;
     this.#baseClient = baseClient;
     this.#logger = logger;
+    this.#logger.info('Creating telemetry session, endpoints=%s, clientId=%s',
+      endpoints, baseClient.clientId);
     this.#renewStream(true);
   }
 
@@ -51,12 +53,22 @@ export class TelemetrySession {
   }
 
   #renewStream(inited: boolean) {
-    this.#stream = this.#baseClient.createTelemetryStream(this.#endpoints);
-    this.#stream.on('data', this.#onData.bind(this));
-    this.#stream.once('error', this.#onError.bind(this));
-    this.#stream.once('end', this.#onEnd.bind(this));
-    if (!inited) {
-      this.syncSettings();
+    try {
+      this.#logger.debug?.('Creating telemetry stream, endpoints=%s, 
clientId=%s, inited=%s',
+        this.#endpoints, this.#baseClient.clientId, inited);
+      this.#stream = this.#baseClient.createTelemetryStream(this.#endpoints);
+      this.#stream.on('data', this.#onData.bind(this));
+      this.#stream.once('error', this.#onError.bind(this));
+      this.#stream.once('end', this.#onEnd.bind(this));
+      if (!inited) {
+        this.#logger.info('Syncing settings to new stream, endpoints=%s, 
clientId=%s',
+          this.#endpoints, this.#baseClient.clientId);
+        this.syncSettings();
+      }
+    } catch (err) {
+      this.#logger.error('Failed to create telemetry stream, endpoints=%s, 
clientId=%s, error=%s',
+        this.#endpoints, this.#baseClient.clientId, err);
+      throw err;
     }
   }
 
diff --git a/nodejs/src/client/Logger.ts b/nodejs/src/consumer/Assignment.ts
similarity index 54%
copy from nodejs/src/client/Logger.ts
copy to nodejs/src/consumer/Assignment.ts
index 8a5abbeb..e281b3f6 100644
--- a/nodejs/src/client/Logger.ts
+++ b/nodejs/src/consumer/Assignment.ts
@@ -15,21 +15,25 @@
  * limitations under the License.
  */
 
-import path from 'node:path';
-import { homedir } from 'node:os';
-import { EggLogger } from 'egg-logger';
+import { MessageQueue } from '../route';
 
-export interface ILogger {
-  info(...args: any[]): void;
-  warn(...args: any[]): void;
-  error(...args: any[]): void;
-  close?(...args: any[]): void;
-}
+export class Assignment {
+  readonly messageQueue: MessageQueue;
+
+  constructor(messageQueue: MessageQueue) {
+    this.messageQueue = messageQueue;
+  }
+
+  equals(other: Assignment): boolean {
+    if (this === other) return true;
+    if (!other) return false;
+    return this.messageQueue === other.messageQueue ||
+      (this.messageQueue.queueId === other.messageQueue.queueId &&
+       this.messageQueue.topic.name === other.messageQueue.topic.name &&
+       this.messageQueue.broker.name === other.messageQueue.broker.name);
+  }
 
-export function getDefaultLogger() {
-  const file = path.join(homedir(), 
'logs/rocketmq/rocketmq_client_nodejs.log');
-  return new EggLogger({
-    file,
-    level: 'INFO',
-  });
+  toString(): string {
+    return `Assignment{messageQueue=${JSON.stringify(this.messageQueue)}}`;
+  }
 }
diff --git a/nodejs/src/client/Logger.ts b/nodejs/src/consumer/Assignments.ts
similarity index 50%
copy from nodejs/src/client/Logger.ts
copy to nodejs/src/consumer/Assignments.ts
index 8a5abbeb..a6655114 100644
--- a/nodejs/src/client/Logger.ts
+++ b/nodejs/src/consumer/Assignments.ts
@@ -15,21 +15,32 @@
  * limitations under the License.
  */
 
-import path from 'node:path';
-import { homedir } from 'node:os';
-import { EggLogger } from 'egg-logger';
+import { Assignment } from './Assignment';
 
-export interface ILogger {
-  info(...args: any[]): void;
-  warn(...args: any[]): void;
-  error(...args: any[]): void;
-  close?(...args: any[]): void;
-}
+export class Assignments {
+  readonly #assignmentList: Assignment[];
+
+  constructor(assignmentList: Assignment[]) {
+    this.#assignmentList = assignmentList;
+  }
+
+  getAssignmentList(): Assignment[] {
+    return this.#assignmentList;
+  }
+
+  equals(other?: Assignments): boolean {
+    if (this === other) return true;
+    if (!other) return false;
+    if (this.#assignmentList.length !== other.#assignmentList.length) return 
false;
+    for (let i = 0; i < this.#assignmentList.length; i++) {
+      if (!this.#assignmentList[i].equals(other.#assignmentList[i])) {
+        return false;
+      }
+    }
+    return true;
+  }
 
-export function getDefaultLogger() {
-  const file = path.join(homedir(), 
'logs/rocketmq/rocketmq_client_nodejs.log');
-  return new EggLogger({
-    file,
-    level: 'INFO',
-  });
+  toString(): string {
+    return `Assignments{assignmentList=[${this.#assignmentList.map(a => 
a.toString()).join(', ')}]}`;
+  }
 }
diff --git a/nodejs/src/consumer/index.ts b/nodejs/src/consumer/ConsumeResult.ts
similarity index 81%
copy from nodejs/src/consumer/index.ts
copy to nodejs/src/consumer/ConsumeResult.ts
index 73dbd62f..e3d2aafa 100644
--- a/nodejs/src/consumer/index.ts
+++ b/nodejs/src/consumer/ConsumeResult.ts
@@ -15,8 +15,7 @@
  * limitations under the License.
  */
 
-export * from './Consumer';
-export * from './FilterExpression';
-export * from './SimpleConsumer';
-export * from './SimpleSubscriptionSettings';
-export * from './SubscriptionLoadBalancer';
+export enum ConsumeResult {
+  SUCCESS = 'SUCCESS',
+  FAILURE = 'FAILURE',
+}
diff --git a/nodejs/src/consumer/ConsumeService.ts 
b/nodejs/src/consumer/ConsumeService.ts
new file mode 100644
index 00000000..a1a64d8b
--- /dev/null
+++ b/nodejs/src/consumer/ConsumeService.ts
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { MessageView } from '../message';
+import { ConsumeResult } from './ConsumeResult';
+import { ConsumeTask } from './ConsumeTask';
+import { MessageListener } from './MessageListener';
+import type { ProcessQueue } from './ProcessQueue';
+
+export abstract class ConsumeService {
+  protected readonly clientId: string;
+  readonly #messageListener: MessageListener;
+  #aborted = false;
+
+  constructor(clientId: string, messageListener: MessageListener) {
+    this.clientId = clientId;
+    this.#messageListener = messageListener;
+  }
+
+  abstract consume(pq: ProcessQueue, messageViews: MessageView[]): void;
+
+  async consumeMessage(messageView: MessageView, delay = 0): 
Promise<ConsumeResult> {
+    if (this.#aborted) {
+      return ConsumeResult.FAILURE;
+    }
+    const task = new ConsumeTask(this.clientId, this.#messageListener, 
messageView);
+    if (delay <= 0) {
+      return task.call();
+    }
+    await new Promise<void>(resolve => setTimeout(resolve, delay));
+    if (this.#aborted) {
+      return ConsumeResult.FAILURE;
+    }
+    return task.call();
+  }
+
+  abort() {
+    this.#aborted = true;
+  }
+}
diff --git a/nodejs/src/client/Logger.ts b/nodejs/src/consumer/ConsumeTask.ts
similarity index 52%
copy from nodejs/src/client/Logger.ts
copy to nodejs/src/consumer/ConsumeTask.ts
index 8a5abbeb..48931264 100644
--- a/nodejs/src/client/Logger.ts
+++ b/nodejs/src/consumer/ConsumeTask.ts
@@ -15,21 +15,26 @@
  * limitations under the License.
  */
 
-import path from 'node:path';
-import { homedir } from 'node:os';
-import { EggLogger } from 'egg-logger';
+import { MessageView } from '../message';
+import { ConsumeResult } from './ConsumeResult';
+import { MessageListener } from './MessageListener';
 
-export interface ILogger {
-  info(...args: any[]): void;
-  warn(...args: any[]): void;
-  error(...args: any[]): void;
-  close?(...args: any[]): void;
-}
+export class ConsumeTask {
+  readonly #messageListener: MessageListener;
+  readonly #messageView: MessageView;
+
+  constructor(_clientId: string, messageListener: MessageListener, 
messageView: MessageView) {
+    this.#messageListener = messageListener;
+    this.#messageView = messageView;
+  }
 
-export function getDefaultLogger() {
-  const file = path.join(homedir(), 
'logs/rocketmq/rocketmq_client_nodejs.log');
-  return new EggLogger({
-    file,
-    level: 'INFO',
-  });
+  async call(): Promise<ConsumeResult> {
+    try {
+      const result = await this.#messageListener.consume(this.#messageView);
+      return result;
+    } catch (e) {
+      // Message listener raised an exception while consuming messages
+      return ConsumeResult.FAILURE;
+    }
+  }
 }
diff --git a/nodejs/src/consumer/Consumer.ts b/nodejs/src/consumer/Consumer.ts
index cdc78bf3..19c7f66c 100644
--- a/nodejs/src/consumer/Consumer.ts
+++ b/nodejs/src/consumer/Consumer.ts
@@ -108,4 +108,22 @@ export abstract class Consumer extends BaseClient {
     StatusChecker.check(response.status);
     return response.receiptHandle;
   }
+
+  /**
+   * Expose public methods for ProcessQueue to access RPC operations
+   */
+  async ackMessageViaRpc(endpoints: any, request: AckMessageRequest, timeout: 
number) {
+    const res = await this.rpcClientManager.ackMessage(endpoints, request, 
timeout);
+    return res;
+  }
+
+  async changeInvisibleDurationViaRpc(endpoints: any, request: 
ChangeInvisibleDurationRequest, timeout: number) {
+    const res = await this.rpcClientManager.changeInvisibleDuration(endpoints, 
request, timeout);
+    return res;
+  }
+
+  async forwardMessageToDeadLetterQueueViaRpc(endpoints: any, request: any, 
timeout: number) {
+    const res = await 
this.rpcClientManager.forwardMessageToDeadLetterQueue(endpoints, request, 
timeout);
+    return res;
+  }
 }
diff --git a/nodejs/src/consumer/FifoConsumeService.ts 
b/nodejs/src/consumer/FifoConsumeService.ts
new file mode 100644
index 00000000..58d07e1e
--- /dev/null
+++ b/nodejs/src/consumer/FifoConsumeService.ts
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { MessageView } from '../message';
+import { ConsumeService } from './ConsumeService';
+import { MessageListener } from './MessageListener';
+import type { ProcessQueue } from './ProcessQueue';
+
+export class FifoConsumeService extends ConsumeService {
+  constructor(clientId: string, messageListener: MessageListener) {
+    super(clientId, messageListener);
+  }
+
+  consume(pq: ProcessQueue, messageViews: MessageView[]): void {
+    this.#consumeIteratively(pq, messageViews, 0);
+  }
+
+  #consumeIteratively(pq: ProcessQueue, messageViews: MessageView[], index: 
number): void {
+    if (index >= messageViews.length) {
+      return;
+    }
+
+    const messageView = messageViews[index];
+
+    if (messageView.corrupted) {
+      pq.discardFifoMessage(messageView);
+      this.#consumeIteratively(pq, messageViews, index + 1);
+      return;
+    }
+
+    this.consumeMessage(messageView)
+      .then(result => pq.eraseFifoMessage(messageView, result))
+      .then(() => this.#consumeIteratively(pq, messageViews, index + 1))
+      .catch(() => this.#consumeIteratively(pq, messageViews, index + 1));
+  }
+}
diff --git a/nodejs/src/consumer/index.ts 
b/nodejs/src/consumer/MessageListener.ts
similarity index 79%
copy from nodejs/src/consumer/index.ts
copy to nodejs/src/consumer/MessageListener.ts
index 73dbd62f..a696226f 100644
--- a/nodejs/src/consumer/index.ts
+++ b/nodejs/src/consumer/MessageListener.ts
@@ -15,8 +15,9 @@
  * limitations under the License.
  */
 
-export * from './Consumer';
-export * from './FilterExpression';
-export * from './SimpleConsumer';
-export * from './SimpleSubscriptionSettings';
-export * from './SubscriptionLoadBalancer';
+import { MessageView } from '../message';
+import { ConsumeResult } from './ConsumeResult';
+
+export interface MessageListener {
+  consume(messageView: MessageView): ConsumeResult | Promise<ConsumeResult>;
+}
diff --git a/nodejs/src/consumer/ProcessQueue.ts 
b/nodejs/src/consumer/ProcessQueue.ts
new file mode 100644
index 00000000..65eaca7b
--- /dev/null
+++ b/nodejs/src/consumer/ProcessQueue.ts
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { randomUUID } from 'node:crypto';
+import { Code } from '../../proto/apache/rocketmq/v2/definition_pb';
+import { MessageView } from '../message';
+import { MessageQueue } from '../route';
+import { TooManyRequestsException } from '../exception';
+import { ConsumeResult } from './ConsumeResult';
+import { FilterExpression } from './FilterExpression';
+import type { PushConsumer } from './PushConsumer';
+
+const ACK_MESSAGE_FAILURE_BACKOFF_DELAY = 1000;
+const CHANGE_INVISIBLE_DURATION_FAILURE_BACKOFF_DELAY = 1000;
+const FORWARD_MESSAGE_TO_DLQ_FAILURE_BACKOFF_DELAY = 1000;
+
+const RECEIVING_FLOW_CONTROL_BACKOFF_DELAY = 20;
+const RECEIVING_FAILURE_BACKOFF_DELAY = 1000;
+const RECEIVING_BACKOFF_DELAY_WHEN_CACHE_IS_FULL = 1000;
+
+export class ProcessQueue {
+  readonly #consumer: PushConsumer;
+  #dropped = false;
+  readonly #mq: MessageQueue;
+  readonly #filterExpression: FilterExpression;
+  readonly #cachedMessages: MessageView[] = [];
+  #cachedMessagesBytes = 0;
+  #activityTime = Date.now();
+  #cacheFullTime = 0;
+  #aborted = false;
+
+  constructor(consumer: PushConsumer, mq: MessageQueue, filterExpression: 
FilterExpression) {
+    this.#consumer = consumer;
+    this.#mq = mq;
+    this.#filterExpression = filterExpression;
+  }
+
+  getMessageQueue(): MessageQueue {
+    return this.#mq;
+  }
+
+  drop(): void {
+    this.#dropped = true;
+  }
+
+  expired(): boolean {
+    const longPollingTimeout = 
this.#consumer.getPushConsumerSettings().getLongPollingTimeout();
+    const requestTimeout = this.#consumer.requestTimeoutValue;
+    const maxIdleDuration = (longPollingTimeout + requestTimeout) * 3;
+    const idleDuration = Date.now() - this.#activityTime;
+    if (idleDuration < maxIdleDuration) {
+      return false;
+    }
+    const afterCacheFullDuration = Date.now() - this.#cacheFullTime;
+    if (afterCacheFullDuration < maxIdleDuration) {
+      return false;
+    }
+    return true;
+  }
+
+  cacheMessages(messageList: MessageView[]): void {
+    for (const messageView of messageList) {
+      this.#cachedMessages.push(messageView);
+      this.#cachedMessagesBytes += messageView.body.length;
+    }
+  }
+
+  #getReceptionBatchSize(): number {
+    const bufferSize = Math.max(
+      this.#consumer.cacheMessageCountThresholdPerQueue() - 
this.cachedMessagesCount(),
+      1,
+    );
+    return Math.min(bufferSize, 
this.#consumer.getPushConsumerSettings().getReceiveBatchSize());
+  }
+
+  fetchMessageImmediately(): void {
+    this.#receiveMessageImmediately();
+  }
+
+  onReceiveMessageException(t: Error, attemptId?: string): void {
+    const delay = t instanceof TooManyRequestsException
+      ? RECEIVING_FLOW_CONTROL_BACKOFF_DELAY
+      : RECEIVING_FAILURE_BACKOFF_DELAY;
+    this.#receiveMessageLater(delay, attemptId);
+  }
+
+  #receiveMessageLater(delay: number, attemptId?: string): void {
+    if (this.#aborted) return;
+    setTimeout(() => {
+      if (this.#aborted) return;
+      this.#receiveMessage(attemptId);
+    }, delay);
+  }
+
+  #generateAttemptId(): string {
+    return randomUUID();
+  }
+
+  receiveMessage(attemptId?: string): void {
+    this.#receiveMessage(attemptId);
+  }
+
+  #receiveMessage(attemptId?: string): void {
+    if (this.#dropped) {
+      return;
+    }
+    if (this.#isCacheFull()) {
+      this.#receiveMessageLater(RECEIVING_BACKOFF_DELAY_WHEN_CACHE_IS_FULL, 
attemptId);
+      return;
+    }
+    this.#receiveMessageImmediately(attemptId);
+  }
+
+  #receiveMessageImmediately(attemptId?: string): void {
+    if (this.#aborted) return;
+    attemptId = attemptId ?? this.#generateAttemptId();
+    try {
+      const batchSize = this.#getReceptionBatchSize();
+      const longPollingTimeout = 
this.#consumer.getPushConsumerSettings().getLongPollingTimeout();
+      const request = this.#consumer.wrapPushReceiveMessageRequest(
+        batchSize, this.#mq, this.#filterExpression, longPollingTimeout, 
attemptId,
+      );
+      this.#activityTime = Date.now();
+
+      this.#consumer.receiveMessage(request, this.#mq, longPollingTimeout)
+        .then(messages => {
+          this.#onReceiveMessageResult(messages);
+        })
+        .catch(err => {
+          this.onReceiveMessageException(err, attemptId);
+        });
+    } catch (err) {
+      this.onReceiveMessageException(err as Error, attemptId);
+    }
+  }
+
+  #onReceiveMessageResult(messages: MessageView[]): void {
+    if (messages.length > 0) {
+      this.cacheMessages(messages);
+      this.#consumer.getConsumeService().consume(this, messages);
+    }
+    this.#receiveMessage();
+  }
+
+  #isCacheFull(): boolean {
+    const cacheMessageCountThreshold = 
this.#consumer.cacheMessageCountThresholdPerQueue();
+    const actualCount = this.cachedMessagesCount();
+    if (cacheMessageCountThreshold <= actualCount) {
+      this.#cacheFullTime = Date.now();
+      return true;
+    }
+
+    const cacheMessageBytesThreshold = 
this.#consumer.cacheMessageBytesThresholdPerQueue();
+    if (cacheMessageBytesThreshold <= this.#cachedMessagesBytes) {
+      this.#cacheFullTime = Date.now();
+      return true;
+    }
+
+    return false;
+  }
+
+  eraseMessage(messageView: MessageView, consumeResult: ConsumeResult): void {
+    const task = consumeResult === ConsumeResult.SUCCESS
+      ? this.#ackMessage(messageView)
+      : this.#nackMessage(messageView);
+    task.finally(() => {
+      this.#evictCache(messageView);
+    });
+  }
+
+  async #ackMessage(messageView: MessageView, attempt = 1): Promise<void> {
+    try {
+      const endpoints = messageView.endpoints;
+      const response = await this.#consumer.ackMessageViaRpc(
+        endpoints,
+        this.#consumer.wrapAckMessageRequest(messageView),
+        this.#consumer.requestTimeoutValue,
+      );
+      const status = response.getStatus()?.toObject();
+      if (status?.code === Code.INVALID_RECEIPT_HANDLE) {
+        return; // Forgive to retry
+      }
+      if (status?.code !== Code.OK) {
+        await this.#ackMessageLater(messageView, attempt + 1);
+      }
+    } catch {
+      await this.#ackMessageLater(messageView, attempt + 1);
+    }
+  }
+
+  async #ackMessageLater(messageView: MessageView, attempt: number): 
Promise<void> {
+    if (this.#aborted) return;
+    await new Promise<void>(resolve => setTimeout(resolve, 
ACK_MESSAGE_FAILURE_BACKOFF_DELAY));
+    if (this.#aborted) return;
+    await this.#ackMessage(messageView, attempt);
+  }
+
+  async #nackMessage(messageView: MessageView): Promise<void> {
+    const retryPolicy = this.#consumer.getRetryPolicy();
+    const delay = retryPolicy?.getNextAttemptDelay(messageView.deliveryAttempt 
?? 1) ?? 0;
+    await this.#changeInvisibleDuration(messageView, delay);
+  }
+
+  async #changeInvisibleDuration(messageView: MessageView, duration: number, 
attempt = 1): Promise<void> {
+    try {
+      const endpoints = messageView.endpoints;
+      const response = await this.#consumer.changeInvisibleDurationViaRpc(
+        endpoints,
+        this.#consumer.wrapChangeInvisibleDurationRequest(messageView, 
duration),
+        this.#consumer.requestTimeoutValue,
+      );
+      const status = response.getStatus()?.toObject();
+      if (status?.code === Code.INVALID_RECEIPT_HANDLE) {
+        return; // Forgive to retry
+      }
+      if (status?.code !== Code.OK) {
+        await this.#changeInvisibleDurationLater(messageView, duration, 
attempt + 1);
+      }
+    } catch {
+      await this.#changeInvisibleDurationLater(messageView, duration, attempt 
+ 1);
+    }
+  }
+
+  async #changeInvisibleDurationLater(messageView: MessageView, duration: 
number, attempt: number): Promise<void> {
+    if (this.#aborted) return;
+    await new Promise<void>(resolve => setTimeout(resolve, 
CHANGE_INVISIBLE_DURATION_FAILURE_BACKOFF_DELAY));
+    if (this.#aborted) return;
+    await this.#changeInvisibleDuration(messageView, duration, attempt);
+  }
+
+  async eraseFifoMessage(messageView: MessageView, consumeResult: 
ConsumeResult): Promise<void> {
+    const retryPolicy = this.#consumer.getRetryPolicy();
+    const maxAttempts = retryPolicy?.getMaxAttempts() ?? 1;
+    let attempt = messageView.deliveryAttempt ?? 1;
+
+    if (consumeResult === ConsumeResult.FAILURE && attempt < maxAttempts) {
+      const nextAttemptDelay = retryPolicy?.getNextAttemptDelay(attempt) ?? 0;
+      // Redeliver the fifo message
+      const result = await 
this.#consumer.getConsumeService().consumeMessage(messageView, 
nextAttemptDelay);
+      await this.eraseFifoMessage(messageView, result);
+    } else {
+      const task = consumeResult === ConsumeResult.SUCCESS
+        ? this.#ackMessage(messageView)
+        : this.#forwardToDeadLetterQueue(messageView);
+      await task;
+      this.#evictCache(messageView);
+    }
+  }
+
+  async #forwardToDeadLetterQueue(messageView: MessageView, attempt = 1): 
Promise<void> {
+    try {
+      const endpoints = messageView.endpoints;
+      const response = await 
this.#consumer.forwardMessageToDeadLetterQueueViaRpc(
+        endpoints,
+        this.#consumer.wrapForwardMessageToDeadLetterQueueRequest(messageView),
+        this.#consumer.requestTimeoutValue,
+      );
+      const status = response.getStatus();
+      if (!status) {
+        throw new Error('Missing status in forward to dead letter queue 
response');
+      }
+      const statusObj = status.toObject();
+      if (statusObj.code !== Code.OK) {
+        await this.#forwardToDeadLetterQueueLater(messageView, attempt + 1);
+      }
+    } catch (err) {
+      if ((err as Error).message === 'Missing status in forward to dead letter 
queue response') {
+        throw err; // Re-throw critical error
+      }
+      await this.#forwardToDeadLetterQueueLater(messageView, attempt + 1);
+    }
+  }
+
+  async #forwardToDeadLetterQueueLater(messageView: MessageView, attempt: 
number): Promise<void> {
+    if (this.#aborted) return;
+    await new Promise<void>(resolve => setTimeout(resolve, 
FORWARD_MESSAGE_TO_DLQ_FAILURE_BACKOFF_DELAY));
+    if (this.#aborted) return;
+    await this.#forwardToDeadLetterQueue(messageView, attempt);
+  }
+
+  discardMessage(messageView: MessageView): void {
+    this.#nackMessage(messageView).finally(() => {
+      this.#evictCache(messageView);
+    });
+  }
+
+  discardFifoMessage(messageView: MessageView): void {
+    this.#forwardToDeadLetterQueue(messageView).finally(() => {
+      this.#evictCache(messageView);
+    });
+  }
+
+  #evictCache(messageView: MessageView): void {
+    const index = this.#cachedMessages.indexOf(messageView);
+    if (index !== -1) {
+      this.#cachedMessages.splice(index, 1);
+      this.#cachedMessagesBytes -= messageView.body.length;
+    }
+  }
+
+  cachedMessagesCount(): number {
+    return this.#cachedMessages.length;
+  }
+
+  cachedMessageBytes(): number {
+    return this.#cachedMessagesBytes;
+  }
+
+  abort(): void {
+    this.#aborted = true;
+  }
+}
diff --git a/nodejs/src/consumer/PushConsumer.ts 
b/nodejs/src/consumer/PushConsumer.ts
new file mode 100644
index 00000000..b6841b8c
--- /dev/null
+++ b/nodejs/src/consumer/PushConsumer.ts
@@ -0,0 +1,470 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { ClientType } from '../../proto/apache/rocketmq/v2/definition_pb';
+import {
+  AckMessageRequest,
+  ChangeInvisibleDurationRequest,
+  ForwardMessageToDeadLetterQueueRequest,
+  HeartbeatRequest,
+  NotifyClientTerminationRequest,
+  QueryAssignmentRequest,
+  ReceiveMessageRequest,
+} from '../../proto/apache/rocketmq/v2/service_pb';
+import { MessageView } from '../message';
+import { MessageQueue, TopicRouteData } from '../route';
+import { StatusChecker } from '../exception';
+import { RetryPolicy } from '../retry';
+import { createDuration, createResource } from '../util';
+import { Consumer, ConsumerOptions } from './Consumer';
+import { FilterExpression } from './FilterExpression';
+import { PushSubscriptionSettings } from './PushSubscriptionSettings';
+import { ConsumeService } from './ConsumeService';
+import { StandardConsumeService } from './StandardConsumeService';
+import { FifoConsumeService } from './FifoConsumeService';
+import { ProcessQueue } from './ProcessQueue';
+import { Assignment } from './Assignment';
+import { Assignments } from './Assignments';
+import { MessageListener } from './MessageListener';
+
+const ASSIGNMENT_SCAN_SCHEDULE_DELAY = 1000;
+const ASSIGNMENT_SCAN_SCHEDULE_PERIOD = 5000;
+
+export interface PushConsumerOptions extends ConsumerOptions {
+  subscriptions: Map<string/* topic */, FilterExpression | string>;
+  messageListener: MessageListener;
+  maxCacheMessageCount?: number;
+  maxCacheMessageSizeInBytes?: number;
+  longPollingTimeout?: number;
+}
+
+class ConsumeMetrics {
+  receptionTimes = 0;
+  receivedMessagesQuantity = 0;
+  consumptionOkQuantity = 0;
+  consumptionErrorQuantity = 0;
+}
+
+export class PushConsumer extends Consumer {
+  readonly #pushSubscriptionSettings: PushSubscriptionSettings;
+  readonly #subscriptionExpressions = new Map<string, FilterExpression>();
+  readonly #cacheAssignments = new Map<string, Assignments>();
+  readonly #messageListener: MessageListener;
+  readonly #maxCacheMessageCount: number;
+  readonly #maxCacheMessageSizeInBytes: number;
+  readonly #processQueueTable = new Map<string /* mq key */, { mq: 
MessageQueue; pq: ProcessQueue }>();
+  readonly #metrics = new ConsumeMetrics();
+  #consumeService!: ConsumeService;
+  #scanAssignmentTimer?: NodeJS.Timeout;
+
+  constructor(options: PushConsumerOptions) {
+    options.topics = Array.from(options.subscriptions.keys());
+    super(options);
+
+    for (const [ topic, filter ] of options.subscriptions.entries()) {
+      if (typeof filter === 'string') {
+        this.#subscriptionExpressions.set(topic, new FilterExpression(filter));
+      } else {
+        this.#subscriptionExpressions.set(topic, filter);
+      }
+    }
+
+    this.#messageListener = options.messageListener;
+    this.#maxCacheMessageCount = options.maxCacheMessageCount ?? 1024;
+    this.#maxCacheMessageSizeInBytes = options.maxCacheMessageSizeInBytes ?? 
64 * 1024 * 1024;
+
+    this.#pushSubscriptionSettings = new PushSubscriptionSettings(
+      options.namespace, this.clientId, this.endpoints,
+      this.consumerGroup, this.requestTimeout, this.#subscriptionExpressions,
+    );
+  }
+
+  async startup() {
+    this.logger.info('Begin to start the rocketmq push consumer, clientId=%s, 
consumerGroup=%s',
+      this.clientId, this.consumerGroup);
+    await super.startup();
+    this.logger.info('Super startup completed, clientId=%s', this.clientId);
+    try {
+      this.#consumeService = this.#createConsumeService();
+      // Start scanning assignments periodically
+      this.logger.info('Starting assignment scanning, clientId=%s', 
this.clientId);
+      setTimeout(() => this.#scanAssignments(), 
ASSIGNMENT_SCAN_SCHEDULE_DELAY);
+      this.#scanAssignmentTimer = setInterval(() => this.#scanAssignments(), 
ASSIGNMENT_SCAN_SCHEDULE_PERIOD);
+      this.logger.info('Push consumer started successfully, clientId=%s', 
this.clientId);
+    } catch (err) {
+      this.logger.error('Failed to start push consumer, cleaning up resources, 
clientId=%s, error=%s',
+        this.clientId, err);
+      // Clean up timers if initialization fails
+      if (this.#scanAssignmentTimer) {
+        clearInterval(this.#scanAssignmentTimer);
+        this.#scanAssignmentTimer = undefined;
+      }
+      throw err;
+    }
+  }
+
+  async shutdown() {
+    this.logger.info('Begin to shutdown the rocketmq push consumer, 
clientId=%s', this.clientId);
+    // Stop scanning assignments
+    if (this.#scanAssignmentTimer) {
+      clearInterval(this.#scanAssignmentTimer);
+      this.#scanAssignmentTimer = undefined;
+    }
+    // Drop all process queues
+    this.logger.info('Dropping all process queues, clientId=%s, queueCount=%d',
+      this.clientId, this.#processQueueTable.size);
+    for (const { pq } of this.#processQueueTable.values()) {
+      pq.drop();
+      pq.abort();
+    }
+    this.#processQueueTable.clear();
+    // Shutdown consume service
+    if (this.#consumeService) {
+      this.logger.info('Shutting down consume service, clientId=%s', 
this.clientId);
+      this.#consumeService.abort();
+    }
+    await super.shutdown();
+    this.logger.info('Push consumer has been shutdown successfully, 
clientId=%s', this.clientId);
+  }
+
+  protected getSettings() {
+    return this.#pushSubscriptionSettings;
+  }
+
+  protected wrapHeartbeatRequest() {
+    return new HeartbeatRequest()
+      .setClientType(ClientType.PUSH_CONSUMER)
+      .setGroup(createResource(this.consumerGroup));
+  }
+
+  protected wrapNotifyClientTerminationRequest() {
+    return new NotifyClientTerminationRequest()
+      .setGroup(createResource(this.consumerGroup));
+  }
+
+  // eslint-disable-next-line @typescript-eslint/no-unused-vars
+  protected onTopicRouteDataUpdate(_topic: string, _topicRouteData: 
TopicRouteData) {
+    // No-op for push consumer; assignments are queried separately
+  }
+
+  #createConsumeService(): ConsumeService {
+    if (this.#pushSubscriptionSettings.isFifo()) {
+      return new FifoConsumeService(this.clientId, this.#messageListener);
+    }
+    return new StandardConsumeService(this.clientId, this.#messageListener);
+  }
+
+  async subscribe(topic: string, filterExpression: FilterExpression) {
+    await this.getRouteData(topic);
+    this.#subscriptionExpressions.set(topic, filterExpression);
+  }
+
+  unsubscribe(topic: string) {
+    this.#subscriptionExpressions.delete(topic);
+  }
+
+  // --- Public methods for ProcessQueue access ---
+
+  get requestTimeoutValue(): number {
+    return this.requestTimeout;
+  }
+
+  getPushConsumerSettings(): PushSubscriptionSettings {
+    return this.#pushSubscriptionSettings;
+  }
+
+  getConsumerGroup(): string {
+    return this.consumerGroup;
+  }
+
+  getConsumeService(): ConsumeService {
+    return this.#consumeService;
+  }
+
+  getRetryPolicy(): RetryPolicy | undefined {
+    return this.#pushSubscriptionSettings.getRetryPolicy();
+  }
+
+  getClientId(): string {
+    return this.clientId;
+  }
+
+  // --- Metrics access ---
+
+  getReceptionTimes(): number {
+    return this.#metrics.receptionTimes;
+  }
+
+  getReceivedMessagesQuantity(): number {
+    return this.#metrics.receivedMessagesQuantity;
+  }
+
+  getConsumptionOkQuantity(): number {
+    return this.#metrics.consumptionOkQuantity;
+  }
+
+  getConsumptionErrorQuantity(): number {
+    return this.#metrics.consumptionErrorQuantity;
+  }
+
+  incrementReceptionTimes() {
+    this.#metrics.receptionTimes++;
+  }
+
+  incrementReceivedMessagesQuantity(count: number) {
+    this.#metrics.receivedMessagesQuantity += count;
+  }
+
+  incrementConsumptionOkQuantity() {
+    this.#metrics.consumptionOkQuantity++;
+  }
+
+  incrementConsumptionErrorQuantity() {
+    this.#metrics.consumptionErrorQuantity++;
+  }
+
+  // --- Statistics ---
+
+  doStats() {
+    const receptionTimes = this.#metrics.receptionTimes;
+    this.#metrics.receptionTimes = 0;
+    const receivedMessagesQuantity = this.#metrics.receivedMessagesQuantity;
+    this.#metrics.receivedMessagesQuantity = 0;
+    const consumptionOkQuantity = this.#metrics.consumptionOkQuantity;
+    this.#metrics.consumptionOkQuantity = 0;
+    const consumptionErrorQuantity = this.#metrics.consumptionErrorQuantity;
+    this.#metrics.consumptionErrorQuantity = 0;
+
+    this.logger.info('clientId=%s, consumerGroup=%s, receptionTimes=%d, 
receivedMessagesQuantity=%d, '
+      + 'consumptionOkQuantity=%d, consumptionErrorQuantity=%d',
+    this.clientId, this.consumerGroup, receptionTimes, 
receivedMessagesQuantity,
+    consumptionOkQuantity, consumptionErrorQuantity);
+  }
+
+  wrapPushReceiveMessageRequest(
+    batchSize: number,
+    mq: MessageQueue,
+    filterExpression: FilterExpression,
+    longPollingTimeout: number,
+    attemptId?: string,
+  ) {
+    const request = new ReceiveMessageRequest()
+      .setGroup(createResource(this.consumerGroup))
+      .setMessageQueue(mq.toProtobuf())
+      .setFilterExpression(filterExpression.toProtobuf())
+      .setLongPollingTimeout(createDuration(longPollingTimeout))
+      .setBatchSize(batchSize)
+      .setAutoRenew(true);
+    if (attemptId) {
+      request.setAttemptId(attemptId);
+    }
+    return request;
+  }
+
+  async receiveMessage(request: ReceiveMessageRequest, mq: MessageQueue, 
awaitDuration: number) {
+    return super.receiveMessage(request, mq, awaitDuration);
+  }
+
+  wrapAckMessageRequest(messageView: MessageView) {
+    const request = new AckMessageRequest()
+      .setGroup(createResource(this.consumerGroup))
+      .setTopic(createResource(messageView.topic));
+    request.addEntries()
+      .setMessageId(messageView.messageId)
+      .setReceiptHandle(messageView.receiptHandle);
+    return request;
+  }
+
+  wrapChangeInvisibleDurationRequest(messageView: MessageView, 
invisibleDuration: number) {
+    return new ChangeInvisibleDurationRequest()
+      .setGroup(createResource(this.consumerGroup))
+      .setTopic(createResource(messageView.topic))
+      .setReceiptHandle(messageView.receiptHandle)
+      .setInvisibleDuration(createDuration(invisibleDuration))
+      .setMessageId(messageView.messageId);
+  }
+
+  wrapForwardMessageToDeadLetterQueueRequest(messageView: MessageView) {
+    const retryPolicy = this.getRetryPolicy();
+    return new ForwardMessageToDeadLetterQueueRequest()
+      .setGroup(createResource(this.consumerGroup))
+      .setTopic(createResource(messageView.topic))
+      .setReceiptHandle(messageView.receiptHandle)
+      .setMessageId(messageView.messageId)
+      .setDeliveryAttempt(messageView.deliveryAttempt ?? 0)
+      .setMaxDeliveryAttempts(retryPolicy?.getMaxAttempts() ?? 1);
+  }
+
+  // --- Internal: queue size and cache threshold ---
+
+  getQueueSize(): number {
+    return this.#processQueueTable.size;
+  }
+
+  cacheMessageBytesThresholdPerQueue(): number {
+    const size = this.getQueueSize();
+    if (size <= 0) return 0;
+    return Math.max(1, Math.floor(this.#maxCacheMessageSizeInBytes / size));
+  }
+
+  cacheMessageCountThresholdPerQueue(): number {
+    const size = this.getQueueSize();
+    if (size <= 0) return 0;
+    return Math.max(1, Math.floor(this.#maxCacheMessageCount / size));
+  }
+
+  // --- Internal: assignment scanning ---
+
+  #scanAssignments() {
+    try {
+      this.logger.debug?.('Scanning assignments, clientId=%s, 
subscriptionCount=%d',
+        this.clientId, this.#subscriptionExpressions.size);
+      for (const [ topic, filterExpression ] of this.#subscriptionExpressions) 
{
+        const existed = this.#cacheAssignments.get(topic);
+        this.#queryAssignment(topic)
+          .then(latest => {
+            this.logger.debug?.('Query assignment result, topic=%s, 
clientId=%s, assignmentCount=%d',
+              topic, this.clientId, latest.getAssignmentList().length);
+            if (latest.getAssignmentList().length === 0) {
+              if (!existed || existed.getAssignmentList().length === 0) {
+                this.logger.info('Acquired empty assignments from remote, 
would scan later, topic=%s, '
+                  + 'clientId=%s', topic, this.clientId);
+                return;
+              }
+              this.logger.warn('Attention!!! acquired empty assignments from 
remote, but existed assignments'
+                + ' is not empty, topic=%s, clientId=%s', topic, 
this.clientId);
+            }
+
+            if (!latest.equals(existed)) {
+              this.logger.info('Assignments of topic=%s has changed, %j => %j, 
clientId=%s', topic, existed,
+                latest, this.clientId);
+              this.#syncProcessQueue(topic, latest, filterExpression);
+              this.#cacheAssignments.set(topic, latest);
+              return;
+            }
+            this.logger.debug?.('Assignments of topic=%s remains the same, 
assignments=%j, clientId=%s', topic,
+              existed, this.clientId);
+            // Process queue may be dropped, need to be synchronized anyway.
+            this.#syncProcessQueue(topic, latest, filterExpression);
+          })
+          .catch(err => {
+            this.logger.error('Exception raised while scanning the 
assignments, topic=%s, clientId=%s, error=%s',
+              topic, this.clientId, err);
+          });
+      }
+    } catch (err) {
+      this.logger.error('Exception raised while scanning the assignments for 
all topics, clientId=%s, error=%s',
+        this.clientId, err);
+    }
+  }
+
+  async #queryAssignment(topic: string): Promise<Assignments> {
+    const topicRouteData = await this.getRouteData(topic);
+    const endpointsList = topicRouteData.getTotalEndpoints();
+    if (endpointsList.length === 0) {
+      throw new Error(`No endpoints available for topic=${topic}`);
+    }
+    const endpoints = endpointsList[0];
+
+    const request = new QueryAssignmentRequest()
+      .setTopic(createResource(topic))
+      .setGroup(createResource(this.consumerGroup))
+      .setEndpoints(this.endpoints.toProtobuf());
+
+    const response = await this.rpcClientManager.queryAssignment(
+      endpoints, request, this.requestTimeout,
+    );
+    const status = response.getStatus();
+    if (!status) {
+      throw new Error('Missing status in query assignment response');
+    }
+    StatusChecker.check(status.toObject());
+
+    const assignmentList = response.getAssignmentsList().map(assignment => {
+      const mqPb = assignment.getMessageQueue()!;
+      const mq = new MessageQueue(mqPb);
+      return new Assignment(mq);
+    });
+
+    return new Assignments(assignmentList);
+  }
+
+  #syncProcessQueue(topic: string, assignments: Assignments, filterExpression: 
FilterExpression) {
+    const latestMqKeys = new Set<string>();
+    for (const assignment of assignments.getAssignmentList()) {
+      latestMqKeys.add(this.#mqKey(assignment.messageQueue));
+    }
+
+    // Find active message queues
+    const activeMqKeys = new Set<string>();
+    for (const [ mqKey, { mq, pq }] of this.#processQueueTable) {
+      if (mq.topic.name !== topic) {
+        continue;
+      }
+      if (!latestMqKeys.has(mqKey)) {
+        this.logger.info('Drop message queue according to the latest 
assignmentList, mq=%s@%s@%s, clientId=%s',
+          mq.topic.name, mq.broker.name, mq.queueId, this.clientId);
+        this.#dropProcessQueue(mqKey);
+        continue;
+      }
+      if (pq.expired()) {
+        this.logger.warn('Drop message queue because it is expired, 
mq=%s@%s@%s, clientId=%s',
+          mq.topic.name, mq.broker.name, mq.queueId, this.clientId);
+        this.#dropProcessQueue(mqKey);
+        continue;
+      }
+      activeMqKeys.add(mqKey);
+    }
+
+    // Create new process queues for new assignments
+    for (const assignment of assignments.getAssignmentList()) {
+      const mqKey = this.#mqKey(assignment.messageQueue);
+      if (activeMqKeys.has(mqKey)) {
+        continue;
+      }
+      const processQueue = this.#createProcessQueue(assignment.messageQueue, 
filterExpression);
+      if (processQueue) {
+        this.logger.info('Start to fetch message from remote, mq=%s@%s@%s, 
clientId=%s',
+          assignment.messageQueue.topic.name, 
assignment.messageQueue.broker.name,
+          assignment.messageQueue.queueId, this.clientId);
+        processQueue.fetchMessageImmediately();
+      }
+    }
+  }
+
+  #mqKey(mq: MessageQueue): string {
+    return `${mq.topic.name}@${mq.broker.name}@${mq.queueId}`;
+  }
+
+  #dropProcessQueue(mqKey: string) {
+    const entry = this.#processQueueTable.get(mqKey);
+    if (entry) {
+      entry.pq.drop();
+      this.#processQueueTable.delete(mqKey);
+    }
+  }
+
+  #createProcessQueue(mq: MessageQueue, filterExpression: FilterExpression): 
ProcessQueue | null {
+    const mqKey = this.#mqKey(mq);
+    if (this.#processQueueTable.has(mqKey)) {
+      return null;
+    }
+    const processQueue = new ProcessQueue(this, mq, filterExpression);
+    this.#processQueueTable.set(mqKey, { mq, pq: processQueue });
+    return processQueue;
+  }
+}
diff --git a/nodejs/src/consumer/PushSubscriptionSettings.ts 
b/nodejs/src/consumer/PushSubscriptionSettings.ts
new file mode 100644
index 00000000..fe97f6b1
--- /dev/null
+++ b/nodejs/src/consumer/PushSubscriptionSettings.ts
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import {
+  Settings as SettingsPB,
+  ClientType,
+  Subscription,
+  RetryPolicy as RetryPolicyPB,
+} from '../../proto/apache/rocketmq/v2/definition_pb';
+import { Endpoints } from '../route';
+import { Settings, UserAgent } from '../client';
+import { ExponentialBackoffRetryPolicy, RetryPolicy } from '../retry';
+import { createDuration, createResource } from '../util';
+import { FilterExpression } from './FilterExpression';
+
+export class PushSubscriptionSettings extends Settings {
+  readonly #group: string;
+  readonly #subscriptionExpressions: Map<string, FilterExpression>;
+  #fifo = false;
+  #receiveBatchSize = 32;
+  #longPollingTimeout = 30000; // ms
+
+  constructor(
+    namespace: string,
+    clientId: string,
+    accessPoint: Endpoints,
+    consumerGroup: string,
+    requestTimeout: number,
+    subscriptionExpressions: Map<string, FilterExpression>,
+    longPollingTimeout?: number,
+  ) {
+    super(namespace, clientId, ClientType.PUSH_CONSUMER, accessPoint, 
requestTimeout);
+    this.#group = consumerGroup;
+    this.#subscriptionExpressions = subscriptionExpressions;
+    if (longPollingTimeout !== undefined) {
+      this.#longPollingTimeout = longPollingTimeout;
+    }
+  }
+
+  isFifo(): boolean {
+    return this.#fifo;
+  }
+
+  getReceiveBatchSize(): number {
+    return this.#receiveBatchSize;
+  }
+
+  getLongPollingTimeout(): number {
+    return this.#longPollingTimeout;
+  }
+
+  getRetryPolicy(): RetryPolicy | undefined {
+    return this.retryPolicy;
+  }
+
+  toProtobuf(): SettingsPB {
+    const subscription = new Subscription()
+      .setGroup(createResource(this.#group));
+
+    for (const [ topic, filterExpression ] of 
this.#subscriptionExpressions.entries()) {
+      subscription.addSubscriptions()
+        .setTopic(createResource(topic))
+        .setExpression(filterExpression.toProtobuf());
+    }
+
+    return new SettingsPB()
+      .setClientType(this.clientType)
+      .setAccessPoint(this.accessPoint.toProtobuf())
+      .setRequestTimeout(createDuration(this.requestTimeout))
+      .setSubscription(subscription)
+      .setUserAgent(UserAgent.INSTANCE.toProtobuf());
+  }
+
+  sync(settings: SettingsPB): void {
+    if (settings.getPubSubCase() !== SettingsPB.PubSubCase.SUBSCRIPTION) {
+      return;
+    }
+    const subscription = settings.getSubscription();
+    if (subscription) {
+      this.#fifo = subscription.getFifo() ?? false;
+      this.#receiveBatchSize = subscription.getReceiveBatchSize() ?? 32;
+      const longPollingTimeout = subscription.getLongPollingTimeout();
+      if (longPollingTimeout) {
+        this.#longPollingTimeout = longPollingTimeout.getSeconds() * 1000 +
+          Math.floor(longPollingTimeout.getNanos() / 1000000);
+      }
+    }
+    const backoffPolicy = settings.getBackoffPolicy();
+    if (backoffPolicy) {
+      switch (backoffPolicy.getStrategyCase()) {
+        case RetryPolicyPB.StrategyCase.EXPONENTIAL_BACKOFF: {
+          const exponential = 
backoffPolicy.getExponentialBackoff()!.toObject();
+          this.retryPolicy = new ExponentialBackoffRetryPolicy(
+            backoffPolicy.getMaxAttempts(),
+            exponential.initial?.seconds,
+            exponential.max?.seconds,
+            exponential.multiplier,
+          );
+          break;
+        }
+        case RetryPolicyPB.StrategyCase.CUSTOMIZED_BACKOFF:
+          // CustomizedBackoffRetryPolicy not yet implemented in Node.js
+          break;
+        default:
+          break;
+      }
+    }
+  }
+}
diff --git a/nodejs/src/consumer/SimpleConsumer.ts 
b/nodejs/src/consumer/SimpleConsumer.ts
index b5b045a6..994ae957 100644
--- a/nodejs/src/consumer/SimpleConsumer.ts
+++ b/nodejs/src/consumer/SimpleConsumer.ts
@@ -25,6 +25,8 @@ import { SimpleSubscriptionSettings } from 
'./SimpleSubscriptionSettings';
 import { SubscriptionLoadBalancer } from './SubscriptionLoadBalancer';
 import { Consumer, ConsumerOptions } from './Consumer';
 
+const RANDOM_INDEX = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER);
+
 export interface SimpleConsumerOptions extends ConsumerOptions {
   /**
    * support tag string as filter, e.g.:
@@ -46,7 +48,7 @@ export class SimpleConsumer extends Consumer {
   readonly #subscriptionExpressions = new Map<string, FilterExpression>();
   readonly #subscriptionRouteDataCache = new Map<string, 
SubscriptionLoadBalancer>();
   readonly #awaitDuration: number;
-  #topicIndex = 0;
+  #topicIndex = RANDOM_INDEX;
 
   constructor(options: SimpleConsumerOptions) {
     options.topics = Array.from(options.subscriptions.keys());
@@ -113,8 +115,16 @@ export class SimpleConsumer extends Consumer {
   }
 
   async receive(maxMessageNum = 10, invisibleDuration = 15000) {
+    if (maxMessageNum <= 0) {
+      throw new Error(`maxMessageNum must be greater than 0, but got 
${maxMessageNum}`);
+    }
+
     const topic = this.#nextTopic();
-    const filterExpression = this.#subscriptionExpressions.get(topic)!;
+    const filterExpression = this.#subscriptionExpressions.get(topic);
+    if (!filterExpression) {
+      throw new Error(`No subscription found for topic=${topic}, please 
subscribe first`);
+    }
+
     const loadBalancer = await this.#getSubscriptionLoadBalancer(topic);
     const mq = loadBalancer.takeMessageQueue();
     const request = this.wrapReceiveMessageRequest(maxMessageNum, mq, 
filterExpression,
@@ -126,15 +136,18 @@ export class SimpleConsumer extends Consumer {
     await this.ackMessage(message);
   }
 
-  async changeInvisibleDuration0(message: MessageView, invisibleDuration: 
number) {
-    await this.changeInvisibleDuration0(message, invisibleDuration);
+  async changeInvisibleDuration(message: MessageView, invisibleDuration: 
number) {
+    const response = await this.invisibleDuration(message, invisibleDuration);
+    // Refresh receipt handle manually
+    (message as any).receiptHandle = response;
   }
 
   #nextTopic() {
     const topics = Array.from(this.#subscriptionExpressions.keys());
-    if (this.#topicIndex >= topics.length) {
-      this.#topicIndex = 0;
+    if (topics.length === 0) {
+      throw new Error('No subscriptions available to receive messages');
     }
-    return topics[this.#topicIndex++];
+    const index = Math.abs(this.#topicIndex++ % topics.length);
+    return topics[index];
   }
 }
diff --git a/nodejs/src/client/Logger.ts 
b/nodejs/src/consumer/StandardConsumeService.ts
similarity index 50%
copy from nodejs/src/client/Logger.ts
copy to nodejs/src/consumer/StandardConsumeService.ts
index 8a5abbeb..3121f210 100644
--- a/nodejs/src/client/Logger.ts
+++ b/nodejs/src/consumer/StandardConsumeService.ts
@@ -15,21 +15,30 @@
  * limitations under the License.
  */
 
-import path from 'node:path';
-import { homedir } from 'node:os';
-import { EggLogger } from 'egg-logger';
+import { MessageView } from '../message';
+import { ConsumeService } from './ConsumeService';
+import { MessageListener } from './MessageListener';
+import type { ProcessQueue } from './ProcessQueue';
 
-export interface ILogger {
-  info(...args: any[]): void;
-  warn(...args: any[]): void;
-  error(...args: any[]): void;
-  close?(...args: any[]): void;
-}
+export class StandardConsumeService extends ConsumeService {
+  constructor(clientId: string, messageListener: MessageListener) {
+    super(clientId, messageListener);
+  }
+
+  consume(pq: ProcessQueue, messageViews: MessageView[]): void {
+    for (const messageView of messageViews) {
+      if (messageView.corrupted) {
+        pq.discardMessage(messageView);
+        continue;
+      }
 
-export function getDefaultLogger() {
-  const file = path.join(homedir(), 
'logs/rocketmq/rocketmq_client_nodejs.log');
-  return new EggLogger({
-    file,
-    level: 'INFO',
-  });
+      this.consumeMessage(messageView)
+        .then(result => {
+          pq.eraseMessage(messageView, result);
+        })
+        .catch(() => {
+          // Should never reach here.
+        });
+    }
+  }
 }
diff --git a/nodejs/src/consumer/index.ts b/nodejs/src/consumer/index.ts
index 73dbd62f..8b1cfc1c 100644
--- a/nodejs/src/consumer/index.ts
+++ b/nodejs/src/consumer/index.ts
@@ -20,3 +20,14 @@ export * from './FilterExpression';
 export * from './SimpleConsumer';
 export * from './SimpleSubscriptionSettings';
 export * from './SubscriptionLoadBalancer';
+export * from './ConsumeResult';
+export * from './MessageListener';
+export * from './Assignment';
+export * from './Assignments';
+export * from './PushSubscriptionSettings';
+export * from './ConsumeTask';
+export * from './ConsumeService';
+export * from './StandardConsumeService';
+export * from './FifoConsumeService';
+export * from './ProcessQueue';
+export * from './PushConsumer';
diff --git a/nodejs/src/producer/Producer.ts b/nodejs/src/producer/Producer.ts
index 8484167f..9e281e36 100644
--- a/nodejs/src/producer/Producer.ts
+++ b/nodejs/src/producer/Producer.ts
@@ -21,6 +21,7 @@ import {
   ClientType,
   MessageType,
   TransactionResolution,
+  TransactionSource,
 } from '../../proto/apache/rocketmq/v2/definition_pb';
 import {
   EndTransactionRequest,
@@ -71,24 +72,50 @@ export class Producer extends BaseClient {
     this.#checker = options.checker;
   }
 
+  async startup() {
+    this.logger.info('Begin to start the rocketmq producer, clientId=%s', 
this.clientId);
+    await super.startup();
+    this.logger.info('The rocketmq producer starts successfully, clientId=%s', 
this.clientId);
+  }
+
+  async shutdown() {
+    this.logger.info('Begin to shutdown the rocketmq producer, clientId=%s', 
this.clientId);
+    await super.shutdown();
+    this.logger.info('Shutdown the rocketmq producer successfully, 
clientId=%s', this.clientId);
+  }
+
   get publishingSettings() {
     return this.#publishingSettings;
   }
 
   beginTransaction() {
     assert(this.#checker, 'Transaction checker should not be null');
+    // Check producer status before beginning transaction
+    if (!this.isRunning()) {
+      this.logger.error('Unable to begin a transaction because producer is not 
running, clientId=%s', this.clientId);
+      throw new Error('Producer is not running now');
+    }
     return new Transaction(this);
   }
 
   async endTransaction(endpoints: Endpoints, message: Message, messageId: 
string,
-    transactionId: string, resolution: TransactionResolution) {
+    transactionId: string, resolution: TransactionResolution, source: 
TransactionSource = TransactionSource.SOURCE_CLIENT) {
+    const resolutionStr = resolution === TransactionResolution.COMMIT ? 
'COMMIT' : 'ROLLBACK';
+    const sourceStr = TransactionSource[source];
+    this.logger.info('Begin to end transaction, messageId=%s, 
transactionId=%s, resolution=%s, source=%s, clientId=%s',
+      messageId, transactionId, resolutionStr, sourceStr, this.clientId);
+
     const request = new EndTransactionRequest()
       .setMessageId(messageId)
       .setTransactionId(transactionId)
       
.setTopic(createResource(message.topic).setResourceNamespace(this.namespace))
-      .setResolution(resolution);
+      .setResolution(resolution)
+      .setSource(source);
     const response = await this.rpcClientManager.endTransaction(endpoints, 
request, this.requestTimeout);
     StatusChecker.check(response.getStatus()?.toObject());
+
+    this.logger.info('End transaction successfully, messageId=%s, 
transactionId=%s, resolution=%s, source=%s, clientId=%s',
+      messageId, transactionId, resolutionStr, sourceStr, this.clientId);
   }
 
   async onRecoverOrphanedTransactionCommand(endpoints: Endpoints, command: 
RecoverOrphanedTransactionCommand) {
@@ -114,7 +141,9 @@ export class Producer extends BaseClient {
       if (resolution === null || resolution === 
TransactionResolution.TRANSACTION_RESOLUTION_UNSPECIFIED) {
         return;
       }
-      await this.endTransaction(endpoints, messageView, messageId, 
transactionId, resolution);
+      // Use SOURCE_SERVER_CHECK for transaction recovery
+      await this.endTransaction(endpoints, messageView, messageId, 
transactionId, resolution,
+        TransactionSource.SOURCE_SERVER_CHECK);
       this.logger.info('Recover orphaned transaction message success, 
transactionId=%s, resolution=%s, messageId=%s, clientId=%s',
         transactionId, resolution, messageId, this.clientId);
     } catch (err) {
@@ -143,14 +172,26 @@ export class Producer extends BaseClient {
       return sendReceipts[0];
     }
 
-    const publishingMessage = transaction.tryAddMessage(message);
-    const sendReceipts = await this.#send([ message ], true);
-    const sendReceipt = sendReceipts[0];
-    transaction.tryAddReceipt(publishingMessage, sendReceipt);
-    return sendReceipt;
+    // Send transactional message
+    try {
+      const publishingMessage = transaction.tryAddMessage(message);
+      const sendReceipts = await this.#send([ message ], true);
+      const sendReceipt = sendReceipts[0];
+      transaction.tryAddReceipt(publishingMessage, sendReceipt);
+      return sendReceipt;
+    } catch (err) {
+      this.logger.error('Failed to send transactional message, clientId=%s, 
error=%s', this.clientId, err);
+      throw err;
+    }
   }
 
   async #send(messages: MessageOptions[], txEnabled: boolean) {
+    // Check producer status before message publishing
+    if (!this.isRunning()) {
+      this.logger.error('Unable to send message because producer is not 
running, clientId=%s', this.clientId);
+      throw new Error('Producer is not running now');
+    }
+
     const pubMessages: PublishingMessage[] = [];
     const topics = new Set<string>();
     for (const message of messages) {
@@ -158,21 +199,21 @@ export class Producer extends BaseClient {
       topics.add(message.topic);
     }
     if (topics.size > 1) {
-      throw new TypeError(`Messages to send have different 
topics=${JSON.stringify(topics)}`);
+      throw new TypeError(`Messages to send have different 
topics=${JSON.stringify(Array.from(topics))}`);
     }
     const topic = pubMessages[0].topic;
     const messageType = pubMessages[0].messageType;
     const messageGroup = pubMessages[0].messageGroup;
     const messageTypes = new Set(pubMessages.map(m => m.messageType));
     if (messageTypes.size > 1) {
-      throw new TypeError(`Messages to send have different 
types=${JSON.stringify(messageTypes)}`);
+      throw new TypeError(`Messages to send have different 
types=${JSON.stringify(Array.from(messageTypes))}`);
     }
 
     // Message group must be same if message type is FIFO, or no need to 
proceed.
     if (messageType === MessageType.FIFO) {
       const messageGroups = new Set(pubMessages.map(m => m.messageGroup!));
       if (messageGroups.size > 1) {
-        throw new TypeError(`FIFO messages to send have message groups, 
messageGroups=${JSON.stringify(messageGroups)}`);
+        throw new TypeError(`FIFO messages to send have message groups, 
messageGroups=${JSON.stringify(Array.from(messageGroups))}`);
       }
     }
 
@@ -223,42 +264,42 @@ export class Producer extends BaseClient {
       sendReceipts = SendReceipt.processResponseInvocation(mq, response);
     } catch (err) {
       const messageIds = messages.map(m => m.messageId);
-      // Isolate endpoints because of sending failure.
+      // Isolate endpoints because of sending failure
       this.#isolate(endpoints);
       if (attempt >= maxAttempts) {
-        // No need more attempts.
-        this.logger.error('Failed to send message(s) finally, run out of 
attempt times, maxAttempts=%s, attempt=%s, topic=%s, messageId(s)=%s, 
endpoints=%s, clientId=%s, error=%s',
+        // No more attempts
+        this.logger.error('Failed to send message(s) finally, run out of 
attempt times, maxAttempts=%d, attempt=%d, topic=%s, messageId(s)=%s, 
endpoints=%s, clientId=%s, error=%s',
           maxAttempts, attempt, topic, messageIds, endpoints, this.clientId, 
err);
         throw err;
       }
-      // No need more attempts for transactional message.
+      // No more attempts for transactional message
       if (messageType === MessageType.TRANSACTION) {
-        this.logger.error('Failed to send transactional message finally, 
maxAttempts=%s, attempt=%s, topic=%s, messageId(s)=%s, endpoints=%s, 
clientId=%s, error=%s',
+        this.logger.error('Failed to send transactional message finally, 
maxAttempts=%d, attempt=%d, topic=%s, messageId(s)=%s, endpoints=%s, 
clientId=%s, error=%s',
           maxAttempts, attempt, topic, messageIds, endpoints, this.clientId, 
err);
         throw err;
       }
-      // Try to do more attempts.
+      // Try next attempt
       const nextAttempt = 1 + attempt;
-      // Retry immediately if the request is not throttled.
+      // Retry immediately if the request is not throttled
       if (!(err instanceof TooManyRequestsException)) {
-        this.logger.warn('Failed to send message, would attempt to resend 
right now, maxAttempts=%s, attempt=%s, topic=%s, messageId(s)=%s, endpoints=%s, 
clientId=%s, error=%s',
+        this.logger.warn('Failed to send message, would attempt to resend 
right now, maxAttempts=%d, attempt=%d, topic=%s, messageId(s)=%s, endpoints=%s, 
clientId=%s, error=%s',
           maxAttempts, attempt, topic, messageIds, endpoints, this.clientId, 
err);
         return this.#send0(topic, messageType, candidates, messages, 
nextAttempt);
       }
       const delay = this.#getRetryPolicy().getNextAttemptDelay(nextAttempt);
-      this.logger.warn('Failed to send message due to too many requests, would 
attempt to resend after %sms, maxAttempts=%s, attempt=%s, topic=%s, 
messageId(s)=%s, endpoints=%s, clientId=%s, error=%s',
+      this.logger.warn('Failed to send message due to too many requests, would 
attempt to resend after %dms, maxAttempts=%d, attempt=%d, topic=%s, 
messageId(s)=%s, endpoints=%s, clientId=%s, error=%s',
         delay, maxAttempts, attempt, topic, messageIds, endpoints, 
this.clientId, err);
       await setTimeout(delay);
       return this.#send0(topic, messageType, candidates, messages, 
nextAttempt);
     }
 
-    // Resend message(s) successfully.
+    // Resend message(s) successfully
     if (attempt > 1) {
       const messageIds = sendReceipts.map(r => r.messageId);
-      this.logger.info('Resend message successfully, topic=%s, 
messageId(s)=%j, maxAttempts=%s, attempt=%s, endpoints=%s, clientId=%s',
+      this.logger.info('Resend message successfully, topic=%s, 
messageId(s)=%s, maxAttempts=%d, attempt=%d, endpoints=%s, clientId=%s',
         topic, messageIds, maxAttempts, attempt, endpoints, this.clientId);
     }
-    // Send message(s) successfully on first attempt, return directly.
+    // Send message(s) successfully on first attempt, return directly
     return sendReceipts;
   }
 
diff --git a/nodejs/src/producer/Transaction.ts 
b/nodejs/src/producer/Transaction.ts
index 85d525ea..a172b352 100644
--- a/nodejs/src/producer/Transaction.ts
+++ b/nodejs/src/producer/Transaction.ts
@@ -50,21 +50,37 @@ export class Transaction {
     if (this.#messageSendReceiptMap.size === 0) {
       throw new TypeError('Transactional message has not been sent yet');
     }
+
+    const logger = (this.#producer as any).logger;
+    logger.info('Begin to commit transaction, messageCount=%d, clientId=%s',
+      this.#messageSendReceiptMap.size, (this.#producer as any).clientId);
+
     for (const [ messageId, sendReceipt ] of 
this.#messageSendReceiptMap.entries()) {
       const publishingMessage = this.#messageMap.get(messageId)!;
       await this.#producer.endTransaction(sendReceipt.endpoints, 
publishingMessage,
         sendReceipt.messageId, sendReceipt.transactionId, 
TransactionResolution.COMMIT);
     }
+
+    logger.info('Commit transaction successfully, messageCount=%d, 
clientId=%s',
+      this.#messageSendReceiptMap.size, (this.#producer as any).clientId);
   }
 
   async rollback() {
     if (this.#messageSendReceiptMap.size === 0) {
       throw new TypeError('Transactional message has not been sent yet');
     }
+
+    const logger = (this.#producer as any).logger;
+    logger.info('Begin to rollback transaction, messageCount=%d, clientId=%s',
+      this.#messageSendReceiptMap.size, (this.#producer as any).clientId);
+
     for (const [ messageId, sendReceipt ] of 
this.#messageSendReceiptMap.entries()) {
       const publishingMessage = this.#messageMap.get(messageId)!;
       await this.#producer.endTransaction(sendReceipt.endpoints, 
publishingMessage,
         sendReceipt.messageId, sendReceipt.transactionId, 
TransactionResolution.ROLLBACK);
     }
+
+    logger.info('Rollback transaction successfully, messageCount=%d, 
clientId=%s',
+      this.#messageSendReceiptMap.size, (this.#producer as any).clientId);
   }
 }

Reply via email to