This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 2843ef56 [ISSUE #984] [C++] Support fifo parallel consume in C++ 
PushConsumer (#1009)
2843ef56 is described below

commit 2843ef56ce6b6d87afb76c27e886ba0c4b644b0c
Author: lizhimins <[email protected]>
AuthorDate: Wed Jun 4 10:16:30 2025 +0800

    [ISSUE #984] [C++] Support fifo parallel consume in C++ PushConsumer (#1009)
---
 README-CN.md                                      | 17 +++++-----
 README.md                                         |  3 +-
 cpp/include/rocketmq/PushConsumer.h               | 23 ++++++++------
 cpp/source/client/ClientManagerImpl.cpp           |  7 ++++-
 cpp/source/client/include/ClientConfig.h          |  1 +
 cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp | 38 ++++++++++++++++++++---
 cpp/source/rocketmq/PushConsumer.cpp              |  1 +
 cpp/source/rocketmq/PushConsumerImpl.cpp          |  2 +-
 cpp/source/rocketmq/include/ClientImpl.h          |  4 +++
 cpp/source/rocketmq/include/SimpleConsumerImpl.h  |  4 ---
 10 files changed, 72 insertions(+), 28 deletions(-)

diff --git a/README-CN.md b/README-CN.md
index a0145403..6d7da093 100644
--- a/README-CN.md
+++ b/README-CN.md
@@ -19,14 +19,15 @@
 
 | 特性                                             | Java  | C/C++  |  C#   | 
Golang | Rust | Python | Node.js |  PHP  |
 |------------------------------------------------| :---: 
|:------:|:-----:|:------:|:----:|:------:|:-------:| :---: |
-| Producer with standard messages                |   ✅   |   ✅    |   ✅   |   
✅    |  ✅   |   ✅    |    ✅    |   🚧   |
-| Producer with FIFO messages                    |   ✅   |   ✅    |   ✅   |   
✅    |  ✅   |   ✅    |    ✅    |   🚧   |
-| Producer with timed/delay messages             |   ✅   |   ✅    |   ✅   |   
✅    |  ✅   |   ✅    |    ✅    |   🚧   |
-| Producer with transactional messages           |   ✅   |   ✅    |   ✅   |   
✅    |  ✅   |   ✅    |    ✅    |   🚧   |
-| Producer with recalling timed/delay messages   |   ✅   |   🚧   |   🚧    |   
🚧   |   🚧   |    🚧    |   🚧    |   🚧   |
-| Simple consumer                                |   ✅   |   ✅    |   ✅   |   
✅    |  ✅   |   ✅    |    ✅    |   🚧   |
-| Push consumer with concurrent message listener |   ✅   |   ✅    |  ✅   |   🚧 
  |  ✅   |   🚧   |   🚧    |   🚧   |
-| Push consumer with FIFO message listener       |   ✅   |   ✅    |  ✅   |   🚧 
  |  ✅   |   🚧   |   🚧    |   🚧   |
+| Producer with standard messages                |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
+| Producer with FIFO messages                    |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
+| Producer with timed/delay messages             |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
+| Producer with transactional messages           |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
+| Producer with recalling timed/delay messages   |   ✅   |   ✅   |   🚧   |   🚧 
   |   🚧   |   🚧    |    🚧    |   🚧   |
+| Simple consumer                                |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
+| Push consumer with concurrent message listener |   ✅   |   ✅   |   ✅   |   🚧 
   |   ✅   |   🚧    |    🚧    |   🚧   |
+| Push consumer with FIFO message listener       |   ✅   |   ✅   |   ✅   |   🚧 
   |   ✅   |   🚧    |    🚧    |   🚧   |
+| Push consumer with FIFO consume accelerator    |   ✅   |   ✅   |   🚧   |   🚧 
   |   🚧   |   🚧    |    🚧    |   🚧   |
 
 ## 先决条件和构建
 
diff --git a/README.md b/README.md
index aaecf6cf..f50deaa5 100644
--- a/README.md
+++ b/README.md
@@ -23,10 +23,11 @@ Provide cloud-native and robust solutions for Java, C++, 
C#, Golang, Rust and al
 | Producer with FIFO messages                    |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
 | Producer with timed/delay messages             |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
 | Producer with transactional messages           |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
-| Producer with recalling timed/delay messages   |   ✅   |   🚧   |   🚧    |   
🚧   |   🚧   |    🚧    |   🚧    |   🚧   |
+| Producer with recalling timed/delay messages   |   ✅   |   ✅   |   🚧   |   🚧 
   |   🚧   |   🚧    |    🚧    |   🚧   |
 | Simple consumer                                |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
 | Push consumer with concurrent message listener |   ✅   |   ✅   |   ✅   |   🚧 
   |   ✅   |   🚧    |    🚧    |   🚧   |
 | Push consumer with FIFO message listener       |   ✅   |   ✅   |   ✅   |   🚧 
   |   ✅   |   🚧    |    🚧    |   🚧   |
+| Push consumer with FIFO consume accelerator    |   ✅   |   ✅   |   🚧   |   🚧 
   |   🚧   |   🚧    |    🚧    |   🚧   |
 
 ## Prerequisite and Build
 
diff --git a/cpp/include/rocketmq/PushConsumer.h 
b/cpp/include/rocketmq/PushConsumer.h
index 500c34b0..a989997c 100644
--- a/cpp/include/rocketmq/PushConsumer.h
+++ b/cpp/include/rocketmq/PushConsumer.h
@@ -43,7 +43,7 @@ public:
 private:
   friend class PushConsumerBuilder;
 
-  PushConsumer(std::shared_ptr<PushConsumerImpl> impl)
+  explicit PushConsumer(std::shared_ptr<PushConsumerImpl> impl)
       : impl_(std::move(impl)) {
   } 
 
@@ -54,41 +54,46 @@ class PushConsumerBuilder {
 public:
   PushConsumerBuilder() : configuration_(Configuration::newBuilder().build()) 
{}
   
-  PushConsumerBuilder &withConfiguration(Configuration configuration) {
+  PushConsumerBuilder& withConfiguration(Configuration configuration) {
     configuration_ = std::move(configuration);
     return *this;
   }
 
-  PushConsumerBuilder &withGroup(std::string group) {
+  PushConsumerBuilder& withGroup(std::string group) {
     group_ = std::move(group);
     return *this;
   }
 
-  PushConsumerBuilder &withConsumeThreads(std::size_t consume_thread) {
+  PushConsumerBuilder& withConsumeThreads(std::size_t consume_thread) {
     consume_thread_ = consume_thread;
     return *this;
   }
 
-  PushConsumerBuilder &withListener(MessageListener listener) {
+  PushConsumerBuilder& withListener(MessageListener listener) {
     listener_ = std::move(listener);
     return *this;
   }
 
-  PushConsumerBuilder &subscribe(std::string topic,
+  PushConsumerBuilder& subscribe(std::string topic,
                                  FilterExpression filter_expression) {
     subscriptions_.insert({topic, filter_expression});
     return *this;
   }
 
+  PushConsumerBuilder& fifoConsumeAccelerator(bool fifo_consume_accelerator) {
+    fifo_consume_accelerator_ = fifo_consume_accelerator;
+    return *this;
+  }
+
   PushConsumer build();
 
 private:
   std::string group_;
   Configuration configuration_;
-  std::size_t consume_thread_;
-  MessageListener listener_;
-
+  std::size_t consume_thread_ = 20;
   std::unordered_map<std::string, FilterExpression> subscriptions_;
+  MessageListener listener_;
+  bool fifo_consume_accelerator_ = false;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/ClientManagerImpl.cpp 
b/cpp/source/client/ClientManagerImpl.cpp
index 95aaa4f6..7c3498e9 100644
--- a/cpp/source/client/ClientManagerImpl.cpp
+++ b/cpp/source/client/ClientManagerImpl.cpp
@@ -774,7 +774,7 @@ void ClientManagerImpl::receiveMessage(const std::string& 
target_host,
                                        const ReceiveMessageRequest& request,
                                        std::chrono::milliseconds timeout,
                                        ReceiveMessageCallback cb) {
-  SPDLOG_DEBUG("Prepare to receive message from {} asynchronously. Request: 
{}", target_host, request.DebugString());
+  SPDLOG_DEBUG("Prepare to receive message from {} asynchronously. Request: 
{}", target_host, request.ShortDebugString());
   RpcClientSharedPtr client = getRpcClient(target_host);
   auto context = absl::make_unique<ReceiveMessageContext>();
   context->callback = std::move(cb);
@@ -809,6 +809,11 @@ MessageConstSharedPtr ClientManagerImpl::wrapMessage(const 
rmq::Message& item) {
     builder.withKeys(std::move(keys));
   }
 
+  // Message Group
+  if (system_properties.has_message_group()) {
+    builder.withGroup(system_properties.message_group());
+  }
+
   // Message-Id
   const auto& message_id = system_properties.message_id();
   builder.withId(message_id);
diff --git a/cpp/source/client/include/ClientConfig.h 
b/cpp/source/client/include/ClientConfig.h
index e0a7fbf6..542cd116 100644
--- a/cpp/source/client/include/ClientConfig.h
+++ b/cpp/source/client/include/ClientConfig.h
@@ -39,6 +39,7 @@ struct SubscriberConfig {
   rmq::Resource group;
   absl::flat_hash_map<std::string, rmq::SubscriptionEntry> subscriptions;
   bool fifo{false};
+  bool fifo_consume_accelerator{false};
   std::uint32_t receive_batch_size{32};
   absl::Duration polling_timeout{absl::Seconds(30)};
 };
diff --git a/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp 
b/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp
index 57268f35..c607760c 100644
--- a/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp
+++ b/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp
@@ -21,7 +21,6 @@
 #include "ConsumeStats.h"
 #include "ConsumeTask.h"
 #include "PushConsumerImpl.h"
-#include "Tag.h"
 #include "ThreadPoolImpl.h"
 #include "rocketmq/ErrorCode.h"
 #include "rocketmq/Logger.h"
@@ -67,13 +66,44 @@ void 
ConsumeMessageServiceImpl::dispatch(std::shared_ptr<ProcessQueue> process_q
   }
 
   if (consumer->config().subscriber.fifo) {
-    auto consume_task = std::make_shared<ConsumeTask>(shared_from_this(), 
process_queue, std::move(messages));
-    pool_->submit([consume_task]() { consume_task->process(); });
+    if (!consumer->config().subscriber.fifo_consume_accelerator) {
+      auto consume_task = std::make_shared<ConsumeTask>(
+          shared_from_this(), process_queue, std::move(messages));
+      pool_->submit([consume_task]() { consume_task->process(); });
+    } else {
+      std::map<std::string, std::vector<MessageConstSharedPtr>> 
grouped_messages;
+      std::vector<MessageConstSharedPtr> ungrouped_messages;
+
+      for (const auto& message : messages) {
+        if (!message->group().empty()) {
+          grouped_messages[message->group()].push_back(message);
+        } else {
+          ungrouped_messages.push_back(message);
+        }
+      }
+
+      SPDLOG_INFO("FifoConsumeService accelerator enable, message_count={}, 
group_count={}",
+                  messages.size(), grouped_messages.size() + 
(ungrouped_messages.empty() ? 0 : 1));
+
+      // C++17 could use [group, msg_list]
+      for (const auto& pair : grouped_messages) {
+        auto consume_task = std::make_shared<ConsumeTask>(
+            shared_from_this(), process_queue, pair.second);
+        pool_->submit([consume_task]() { consume_task->process(); });
+      }
+
+      if (!ungrouped_messages.empty()) {
+        auto consume_task = std::make_shared<ConsumeTask>(
+            shared_from_this(), process_queue, ungrouped_messages);
+        pool_->submit([consume_task]() { consume_task->process(); });
+      }
+    }
     return;
   }
 
   for (const auto& message : messages) {
-    auto consume_task = std::make_shared<ConsumeTask>(shared_from_this(), 
process_queue, message);
+    auto consume_task = std::make_shared<ConsumeTask>(
+        shared_from_this(), process_queue, message);
     pool_->submit([consume_task]() { consume_task->process(); });
   }
 }
diff --git a/cpp/source/rocketmq/PushConsumer.cpp 
b/cpp/source/rocketmq/PushConsumer.cpp
index a0fd0055..a2f22803 100644
--- a/cpp/source/rocketmq/PushConsumer.cpp
+++ b/cpp/source/rocketmq/PushConsumer.cpp
@@ -47,6 +47,7 @@ PushConsumer PushConsumerBuilder::build() {
   impl->withSsl(configuration_.withSsl());
   impl->withCredentialsProvider(configuration_.credentialsProvider());
   impl->withRequestTimeout(configuration_.requestTimeout());
+  impl->withFifoConsumeAccelerator(fifo_consume_accelerator_);
   impl->start();
   return PushConsumer(impl);
 }
diff --git a/cpp/source/rocketmq/PushConsumerImpl.cpp 
b/cpp/source/rocketmq/PushConsumerImpl.cpp
index acc08177..4ba038b2 100644
--- a/cpp/source/rocketmq/PushConsumerImpl.cpp
+++ b/cpp/source/rocketmq/PushConsumerImpl.cpp
@@ -462,7 +462,7 @@ void PushConsumerImpl::consumeThreadPoolSize(int 
thread_pool_size) {
 }
 
 void PushConsumerImpl::registerMessageListener(MessageListener 
message_listener) {
-  message_listener_ = message_listener;
+  message_listener_ = std::move(message_listener);
 }
 
 std::size_t PushConsumerImpl::getProcessQueueTableSize() {
diff --git a/cpp/source/rocketmq/include/ClientImpl.h 
b/cpp/source/rocketmq/include/ClientImpl.h
index 9b6a6502..96865399 100644
--- a/cpp/source/rocketmq/include/ClientImpl.h
+++ b/cpp/source/rocketmq/include/ClientImpl.h
@@ -102,6 +102,10 @@ public:
     client_config_.withSsl = with_ssl;
   }
 
+  void withFifoConsumeAccelerator(bool fifo_consume_accelerator) {
+    client_config_.subscriber.fifo_consume_accelerator = 
fifo_consume_accelerator;
+  }
+
   /**
    * Expose for test purpose only.
    */
diff --git a/cpp/source/rocketmq/include/SimpleConsumerImpl.h 
b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
index 7fc63b6d..39e6523e 100644
--- a/cpp/source/rocketmq/include/SimpleConsumerImpl.h
+++ b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
@@ -61,10 +61,6 @@ public:
     long_polling_duration_ = receive_timeout;
   }
 
-  void withSsl(bool enable) {
-    client_config_.withSsl = enable;
-  }
-
 protected:
   void topicsOfInterest(std::vector<std::string> &topics) override;
 

Reply via email to