zhaohai666 commented on code in PR #1201:
URL: https://github.com/apache/rocketmq-clients/pull/1201#discussion_r2957915746


##########
nodejs/src/consumer/PushConsumer.ts:
##########
@@ -0,0 +1,455 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { ClientType } from '../../proto/apache/rocketmq/v2/definition_pb';
+import {
+  AckMessageRequest,
+  ChangeInvisibleDurationRequest,
+  ForwardMessageToDeadLetterQueueRequest,
+  HeartbeatRequest,
+  NotifyClientTerminationRequest,
+  QueryAssignmentRequest,
+  ReceiveMessageRequest,
+} from '../../proto/apache/rocketmq/v2/service_pb';
+import { MessageView } from '../message';
+import { MessageQueue, TopicRouteData } from '../route';
+import { StatusChecker } from '../exception';
+import { RetryPolicy } from '../retry';
+import { createDuration, createResource } from '../util';
+import { Consumer, ConsumerOptions } from './Consumer';
+import { FilterExpression } from './FilterExpression';
+import { PushSubscriptionSettings } from './PushSubscriptionSettings';
+import { ConsumeService } from './ConsumeService';
+import { StandardConsumeService } from './StandardConsumeService';
+import { FifoConsumeService } from './FifoConsumeService';
+import { ProcessQueue } from './ProcessQueue';
+import { Assignment } from './Assignment';
+import { Assignments } from './Assignments';
+import { MessageListener } from './MessageListener';
+
+const ASSIGNMENT_SCAN_SCHEDULE_DELAY = 1000;
+const ASSIGNMENT_SCAN_SCHEDULE_PERIOD = 5000;
+
+export interface PushConsumerOptions extends ConsumerOptions {
+  subscriptions: Map<string/* topic */, FilterExpression | string>;
+  messageListener: MessageListener;
+  maxCacheMessageCount?: number;
+  maxCacheMessageSizeInBytes?: number;
+  longPollingTimeout?: number;
+}
+
+class ConsumeMetrics {
+  receptionTimes = 0;
+  receivedMessagesQuantity = 0;
+  consumptionOkQuantity = 0;
+  consumptionErrorQuantity = 0;
+}
+
+export class PushConsumer extends Consumer {
+  readonly #pushSubscriptionSettings: PushSubscriptionSettings;
+  readonly #subscriptionExpressions = new Map<string, FilterExpression>();
+  readonly #cacheAssignments = new Map<string, Assignments>();
+  readonly #messageListener: MessageListener;
+  readonly #maxCacheMessageCount: number;
+  readonly #maxCacheMessageSizeInBytes: number;
+  readonly #processQueueTable = new Map<string /* mq key */, { mq: 
MessageQueue; pq: ProcessQueue }>();
+  readonly #metrics = new ConsumeMetrics();
+  #consumeService!: ConsumeService;
+  #scanAssignmentTimer?: NodeJS.Timeout;
+
+  constructor(options: PushConsumerOptions) {
+    options.topics = Array.from(options.subscriptions.keys());
+    super(options);
+
+    for (const [ topic, filter ] of options.subscriptions.entries()) {
+      if (typeof filter === 'string') {
+        this.#subscriptionExpressions.set(topic, new FilterExpression(filter));
+      } else {
+        this.#subscriptionExpressions.set(topic, filter);
+      }
+    }
+
+    this.#messageListener = options.messageListener;
+    this.#maxCacheMessageCount = options.maxCacheMessageCount ?? 1024;
+    this.#maxCacheMessageSizeInBytes = options.maxCacheMessageSizeInBytes ?? 
64 * 1024 * 1024;
+
+    this.#pushSubscriptionSettings = new PushSubscriptionSettings(
+      options.namespace, this.clientId, this.endpoints,
+      this.consumerGroup, this.requestTimeout, this.#subscriptionExpressions,
+    );
+  }
+
+  async startup() {
+    this.logger.info('Begin to start the rocketmq push consumer, clientId=%s, 
consumerGroup=%s',
+      this.clientId, this.consumerGroup);
+    await super.startup();
+    this.logger.info('Super startup completed, clientId=%s', this.clientId);
+    this.#consumeService = this.#createConsumeService();
+    // Start scanning assignments periodically
+    this.logger.info('Starting assignment scanning, clientId=%s', 
this.clientId);
+    setTimeout(() => this.#scanAssignments(), ASSIGNMENT_SCAN_SCHEDULE_DELAY);
+    this.#scanAssignmentTimer = setInterval(() => this.#scanAssignments(), 
ASSIGNMENT_SCAN_SCHEDULE_PERIOD);
+    this.logger.info('Push consumer started successfully, clientId=%s', 
this.clientId);
+  }
+
+  async shutdown() {
+    this.logger.info('Begin to shutdown the rocketmq push consumer, 
clientId=%s', this.clientId);
+    // Stop scanning assignments
+    if (this.#scanAssignmentTimer) {
+      clearInterval(this.#scanAssignmentTimer);
+      this.#scanAssignmentTimer = undefined;
+    }
+    // Drop all process queues
+    this.logger.info('Dropping all process queues, clientId=%s, queueCount=%d',
+      this.clientId, this.#processQueueTable.size);
+    for (const { pq } of this.#processQueueTable.values()) {
+      pq.drop();
+      pq.abort();
+    }
+    this.#processQueueTable.clear();
+    // Shutdown consume service
+    if (this.#consumeService) {
+      this.logger.info('Shutting down consume service, clientId=%s', 
this.clientId);
+      this.#consumeService.abort();
+    }
+    await super.shutdown();
+    this.logger.info('Push consumer has been shutdown successfully, 
clientId=%s', this.clientId);
+  }
+
+  protected getSettings() {
+    return this.#pushSubscriptionSettings;
+  }
+
+  protected wrapHeartbeatRequest() {
+    return new HeartbeatRequest()
+      .setClientType(ClientType.PUSH_CONSUMER)
+      .setGroup(createResource(this.consumerGroup));
+  }
+
+  protected wrapNotifyClientTerminationRequest() {
+    return new NotifyClientTerminationRequest()
+      .setGroup(createResource(this.consumerGroup));
+  }
+
+  // eslint-disable-next-line @typescript-eslint/no-unused-vars
+  protected onTopicRouteDataUpdate(_topic: string, _topicRouteData: 
TopicRouteData) {
+    // No-op for push consumer; assignments are queried separately
+  }
+
+  #createConsumeService(): ConsumeService {
+    if (this.#pushSubscriptionSettings.isFifo()) {
+      return new FifoConsumeService(this.clientId, this.#messageListener);
+    }
+    return new StandardConsumeService(this.clientId, this.#messageListener);
+  }
+
+  async subscribe(topic: string, filterExpression: FilterExpression) {
+    await this.getRouteData(topic);
+    this.#subscriptionExpressions.set(topic, filterExpression);
+  }
+
+  unsubscribe(topic: string) {
+    this.#subscriptionExpressions.delete(topic);
+  }
+
+  // --- Public methods for ProcessQueue access ---
+
+  get requestTimeoutValue(): number {
+    return this.requestTimeout;
+  }
+
+  getPushConsumerSettings(): PushSubscriptionSettings {
+    return this.#pushSubscriptionSettings;
+  }
+
+  getConsumerGroup(): string {
+    return this.consumerGroup;
+  }
+
+  getConsumeService(): ConsumeService {
+    return this.#consumeService;
+  }
+
+  getRetryPolicy(): RetryPolicy | undefined {
+    return this.#pushSubscriptionSettings.getRetryPolicy();
+  }
+
+  getClientId(): string {
+    return this.clientId;
+  }
+
+  // --- Metrics access ---
+
+  getReceptionTimes(): number {
+    return this.#metrics.receptionTimes;
+  }
+
+  getReceivedMessagesQuantity(): number {
+    return this.#metrics.receivedMessagesQuantity;
+  }
+
+  getConsumptionOkQuantity(): number {
+    return this.#metrics.consumptionOkQuantity;
+  }
+
+  getConsumptionErrorQuantity(): number {
+    return this.#metrics.consumptionErrorQuantity;
+  }
+
+  incrementReceptionTimes() {
+    this.#metrics.receptionTimes++;
+  }
+
+  incrementReceivedMessagesQuantity(count: number) {
+    this.#metrics.receivedMessagesQuantity += count;
+  }
+
+  incrementConsumptionOkQuantity() {
+    this.#metrics.consumptionOkQuantity++;
+  }
+
+  incrementConsumptionErrorQuantity() {
+    this.#metrics.consumptionErrorQuantity++;
+  }
+
+  // --- Statistics ---
+
+  doStats() {
+    const receptionTimes = this.#metrics.receptionTimes;
+    this.#metrics.receptionTimes = 0;
+    const receivedMessagesQuantity = this.#metrics.receivedMessagesQuantity;
+    this.#metrics.receivedMessagesQuantity = 0;
+    const consumptionOkQuantity = this.#metrics.consumptionOkQuantity;
+    this.#metrics.consumptionOkQuantity = 0;
+    const consumptionErrorQuantity = this.#metrics.consumptionErrorQuantity;
+    this.#metrics.consumptionErrorQuantity = 0;
+
+    this.logger.info('clientId=%s, consumerGroup=%s, receptionTimes=%d, 
receivedMessagesQuantity=%d, '
+      + 'consumptionOkQuantity=%d, consumptionErrorQuantity=%d',
+    this.clientId, this.consumerGroup, receptionTimes, 
receivedMessagesQuantity,
+    consumptionOkQuantity, consumptionErrorQuantity);
+  }
+
+  wrapPushReceiveMessageRequest(
+    batchSize: number,
+    mq: MessageQueue,
+    filterExpression: FilterExpression,
+    longPollingTimeout: number,
+    attemptId?: string,
+  ) {
+    const request = new ReceiveMessageRequest()
+      .setGroup(createResource(this.consumerGroup))
+      .setMessageQueue(mq.toProtobuf())
+      .setFilterExpression(filterExpression.toProtobuf())
+      .setLongPollingTimeout(createDuration(longPollingTimeout))
+      .setBatchSize(batchSize)
+      .setAutoRenew(true);
+    if (attemptId) {
+      request.setAttemptId(attemptId);
+    }
+    return request;
+  }
+
+  async receiveMessage(request: ReceiveMessageRequest, mq: MessageQueue, 
awaitDuration: number) {
+    return super.receiveMessage(request, mq, awaitDuration);
+  }
+
+  wrapAckMessageRequest(messageView: MessageView) {
+    const request = new AckMessageRequest()
+      .setGroup(createResource(this.consumerGroup))
+      .setTopic(createResource(messageView.topic));
+    request.addEntries()
+      .setMessageId(messageView.messageId)
+      .setReceiptHandle(messageView.receiptHandle);
+    return request;
+  }
+
+  wrapChangeInvisibleDurationRequest(messageView: MessageView, 
invisibleDuration: number) {
+    return new ChangeInvisibleDurationRequest()
+      .setGroup(createResource(this.consumerGroup))
+      .setTopic(createResource(messageView.topic))
+      .setReceiptHandle(messageView.receiptHandle)
+      .setInvisibleDuration(createDuration(invisibleDuration))
+      .setMessageId(messageView.messageId);
+  }
+
+  wrapForwardMessageToDeadLetterQueueRequest(messageView: MessageView) {
+    const retryPolicy = this.getRetryPolicy();
+    return new ForwardMessageToDeadLetterQueueRequest()
+      .setGroup(createResource(this.consumerGroup))
+      .setTopic(createResource(messageView.topic))
+      .setReceiptHandle(messageView.receiptHandle)
+      .setMessageId(messageView.messageId)
+      .setDeliveryAttempt(messageView.deliveryAttempt ?? 0)
+      .setMaxDeliveryAttempts(retryPolicy?.getMaxAttempts() ?? 1);
+  }
+
+  // --- Internal: queue size and cache threshold ---
+
+  getQueueSize(): number {
+    return this.#processQueueTable.size;
+  }
+
+  cacheMessageBytesThresholdPerQueue(): number {
+    const size = this.getQueueSize();
+    if (size <= 0) return 0;
+    return Math.max(1, Math.floor(this.#maxCacheMessageSizeInBytes / size));
+  }
+
+  cacheMessageCountThresholdPerQueue(): number {
+    const size = this.getQueueSize();
+    if (size <= 0) return 0;
+    return Math.max(1, Math.floor(this.#maxCacheMessageCount / size));
+  }
+
+  // --- Internal: assignment scanning ---
+
+  #scanAssignments() {
+    try {
+      this.logger.debug?.('Scanning assignments, clientId=%s, 
subscriptionCount=%d',
+        this.clientId, this.#subscriptionExpressions.size);
+      for (const [ topic, filterExpression ] of this.#subscriptionExpressions) 
{
+        const existed = this.#cacheAssignments.get(topic);
+        this.#queryAssignment(topic)
+          .then(latest => {
+            this.logger.debug?.('Query assignment result, topic=%s, 
clientId=%s, assignmentCount=%d',
+              topic, this.clientId, latest.getAssignmentList().length);
+            if (latest.getAssignmentList().length === 0) {
+              if (!existed || existed.getAssignmentList().length === 0) {
+                this.logger.info('Acquired empty assignments from remote, 
would scan later, topic=%s, '
+                  + 'clientId=%s', topic, this.clientId);
+                return;
+              }
+              this.logger.warn('Attention!!! acquired empty assignments from 
remote, but existed assignments'
+                + ' is not empty, topic=%s, clientId=%s', topic, 
this.clientId);
+            }
+
+            if (!latest.equals(existed)) {
+              this.logger.info('Assignments of topic=%s has changed, %j => %j, 
clientId=%s', topic, existed,
+                latest, this.clientId);
+              this.#syncProcessQueue(topic, latest, filterExpression);
+              this.#cacheAssignments.set(topic, latest);
+              return;
+            }
+            this.logger.debug?.('Assignments of topic=%s remains the same, 
assignments=%j, clientId=%s', topic,
+              existed, this.clientId);
+            // Process queue may be dropped, need to be synchronized anyway.
+            this.#syncProcessQueue(topic, latest, filterExpression);
+          })
+          .catch(err => {
+            this.logger.error('Exception raised while scanning the 
assignments, topic=%s, clientId=%s, error=%s',
+              topic, this.clientId, err);
+          });
+      }
+    } catch (err) {
+      this.logger.error('Exception raised while scanning the assignments for 
all topics, clientId=%s, error=%s',
+        this.clientId, err);
+    }
+  }
+
+  async #queryAssignment(topic: string): Promise<Assignments> {
+    const topicRouteData = await this.getRouteData(topic);
+    const endpointsList = topicRouteData.getTotalEndpoints();
+    if (endpointsList.length === 0) {
+      throw new Error(`No endpoints available for topic=${topic}`);
+    }
+    const endpoints = endpointsList[0];
+
+    const request = new QueryAssignmentRequest()
+      .setTopic(createResource(topic))
+      .setGroup(createResource(this.consumerGroup))
+      .setEndpoints(this.endpoints.toProtobuf());
+
+    const response = await this.rpcClientManager.queryAssignment(
+      endpoints, request, this.requestTimeout,
+    );
+    StatusChecker.check(response.getStatus()?.toObject());

Review Comment:
   Already processed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to