This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 2843ef56 [ISSUE #984] [C++] Support fifo parallel consume in C++
PushConsumer (#1009)
2843ef56 is described below
commit 2843ef56ce6b6d87afb76c27e886ba0c4b644b0c
Author: lizhimins <[email protected]>
AuthorDate: Wed Jun 4 10:16:30 2025 +0800
[ISSUE #984] [C++] Support fifo parallel consume in C++ PushConsumer (#1009)
---
README-CN.md | 17 +++++-----
README.md | 3 +-
cpp/include/rocketmq/PushConsumer.h | 23 ++++++++------
cpp/source/client/ClientManagerImpl.cpp | 7 ++++-
cpp/source/client/include/ClientConfig.h | 1 +
cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp | 38 ++++++++++++++++++++---
cpp/source/rocketmq/PushConsumer.cpp | 1 +
cpp/source/rocketmq/PushConsumerImpl.cpp | 2 +-
cpp/source/rocketmq/include/ClientImpl.h | 4 +++
cpp/source/rocketmq/include/SimpleConsumerImpl.h | 4 ---
10 files changed, 72 insertions(+), 28 deletions(-)
diff --git a/README-CN.md b/README-CN.md
index a0145403..6d7da093 100644
--- a/README-CN.md
+++ b/README-CN.md
@@ -19,14 +19,15 @@
| 特性 | Java | C/C++ | C# |
Golang | Rust | Python | Node.js | PHP |
|------------------------------------------------| :---:
|:------:|:-----:|:------:|:----:|:------:|:-------:| :---: |
-| Producer with standard messages | ✅ | ✅ | ✅ |
✅ | ✅ | ✅ | ✅ | 🚧 |
-| Producer with FIFO messages | ✅ | ✅ | ✅ |
✅ | ✅ | ✅ | ✅ | 🚧 |
-| Producer with timed/delay messages | ✅ | ✅ | ✅ |
✅ | ✅ | ✅ | ✅ | 🚧 |
-| Producer with transactional messages | ✅ | ✅ | ✅ |
✅ | ✅ | ✅ | ✅ | 🚧 |
-| Producer with recalling timed/delay messages | ✅ | 🚧 | 🚧 |
🚧 | 🚧 | 🚧 | 🚧 | 🚧 |
-| Simple consumer | ✅ | ✅ | ✅ |
✅ | ✅ | ✅ | ✅ | 🚧 |
-| Push consumer with concurrent message listener | ✅ | ✅ | ✅ | 🚧
| ✅ | 🚧 | 🚧 | 🚧 |
-| Push consumer with FIFO message listener | ✅ | ✅ | ✅ | 🚧
| ✅ | 🚧 | 🚧 | 🚧 |
+| Producer with standard messages | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
+| Producer with FIFO messages | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
+| Producer with timed/delay messages | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
+| Producer with transactional messages | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
+| Producer with recalling timed/delay messages | ✅ | ✅ | 🚧 | 🚧
| 🚧 | 🚧 | 🚧 | 🚧 |
+| Simple consumer | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
+| Push consumer with concurrent message listener | ✅ | ✅ | ✅ | 🚧
| ✅ | 🚧 | 🚧 | 🚧 |
+| Push consumer with FIFO message listener | ✅ | ✅ | ✅ | 🚧
| ✅ | 🚧 | 🚧 | 🚧 |
+| Push consumer with FIFO consume accelerator | ✅ | ✅ | 🚧 | 🚧
| 🚧 | 🚧 | 🚧 | 🚧 |
## 先决条件和构建
diff --git a/README.md b/README.md
index aaecf6cf..f50deaa5 100644
--- a/README.md
+++ b/README.md
@@ -23,10 +23,11 @@ Provide cloud-native and robust solutions for Java, C++,
C#, Golang, Rust and al
| Producer with FIFO messages | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
| Producer with timed/delay messages | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
| Producer with transactional messages | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
-| Producer with recalling timed/delay messages | ✅ | 🚧 | 🚧 |
🚧 | 🚧 | 🚧 | 🚧 | 🚧 |
+| Producer with recalling timed/delay messages | ✅ | ✅ | 🚧 | 🚧
| 🚧 | 🚧 | 🚧 | 🚧 |
| Simple consumer | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
| Push consumer with concurrent message listener | ✅ | ✅ | ✅ | 🚧
| ✅ | 🚧 | 🚧 | 🚧 |
| Push consumer with FIFO message listener | ✅ | ✅ | ✅ | 🚧
| ✅ | 🚧 | 🚧 | 🚧 |
+| Push consumer with FIFO consume accelerator | ✅ | ✅ | 🚧 | 🚧
| 🚧 | 🚧 | 🚧 | 🚧 |
## Prerequisite and Build
diff --git a/cpp/include/rocketmq/PushConsumer.h
b/cpp/include/rocketmq/PushConsumer.h
index 500c34b0..a989997c 100644
--- a/cpp/include/rocketmq/PushConsumer.h
+++ b/cpp/include/rocketmq/PushConsumer.h
@@ -43,7 +43,7 @@ public:
private:
friend class PushConsumerBuilder;
- PushConsumer(std::shared_ptr<PushConsumerImpl> impl)
+ explicit PushConsumer(std::shared_ptr<PushConsumerImpl> impl)
: impl_(std::move(impl)) {
}
@@ -54,41 +54,46 @@ class PushConsumerBuilder {
public:
PushConsumerBuilder() : configuration_(Configuration::newBuilder().build())
{}
- PushConsumerBuilder &withConfiguration(Configuration configuration) {
+ PushConsumerBuilder& withConfiguration(Configuration configuration) {
configuration_ = std::move(configuration);
return *this;
}
- PushConsumerBuilder &withGroup(std::string group) {
+ PushConsumerBuilder& withGroup(std::string group) {
group_ = std::move(group);
return *this;
}
- PushConsumerBuilder &withConsumeThreads(std::size_t consume_thread) {
+ PushConsumerBuilder& withConsumeThreads(std::size_t consume_thread) {
consume_thread_ = consume_thread;
return *this;
}
- PushConsumerBuilder &withListener(MessageListener listener) {
+ PushConsumerBuilder& withListener(MessageListener listener) {
listener_ = std::move(listener);
return *this;
}
- PushConsumerBuilder &subscribe(std::string topic,
+ PushConsumerBuilder& subscribe(std::string topic,
FilterExpression filter_expression) {
subscriptions_.insert({topic, filter_expression});
return *this;
}
+ PushConsumerBuilder& fifoConsumeAccelerator(bool fifo_consume_accelerator) {
+ fifo_consume_accelerator_ = fifo_consume_accelerator;
+ return *this;
+ }
+
PushConsumer build();
private:
std::string group_;
Configuration configuration_;
- std::size_t consume_thread_;
- MessageListener listener_;
-
+ std::size_t consume_thread_ = 20;
std::unordered_map<std::string, FilterExpression> subscriptions_;
+ MessageListener listener_;
+ bool fifo_consume_accelerator_ = false;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/ClientManagerImpl.cpp
b/cpp/source/client/ClientManagerImpl.cpp
index 95aaa4f6..7c3498e9 100644
--- a/cpp/source/client/ClientManagerImpl.cpp
+++ b/cpp/source/client/ClientManagerImpl.cpp
@@ -774,7 +774,7 @@ void ClientManagerImpl::receiveMessage(const std::string&
target_host,
const ReceiveMessageRequest& request,
std::chrono::milliseconds timeout,
ReceiveMessageCallback cb) {
- SPDLOG_DEBUG("Prepare to receive message from {} asynchronously. Request:
{}", target_host, request.DebugString());
+ SPDLOG_DEBUG("Prepare to receive message from {} asynchronously. Request:
{}", target_host, request.ShortDebugString());
RpcClientSharedPtr client = getRpcClient(target_host);
auto context = absl::make_unique<ReceiveMessageContext>();
context->callback = std::move(cb);
@@ -809,6 +809,11 @@ MessageConstSharedPtr ClientManagerImpl::wrapMessage(const
rmq::Message& item) {
builder.withKeys(std::move(keys));
}
+ // Message Group
+ if (system_properties.has_message_group()) {
+ builder.withGroup(system_properties.message_group());
+ }
+
// Message-Id
const auto& message_id = system_properties.message_id();
builder.withId(message_id);
diff --git a/cpp/source/client/include/ClientConfig.h
b/cpp/source/client/include/ClientConfig.h
index e0a7fbf6..542cd116 100644
--- a/cpp/source/client/include/ClientConfig.h
+++ b/cpp/source/client/include/ClientConfig.h
@@ -39,6 +39,7 @@ struct SubscriberConfig {
rmq::Resource group;
absl::flat_hash_map<std::string, rmq::SubscriptionEntry> subscriptions;
bool fifo{false};
+ bool fifo_consume_accelerator{false};
std::uint32_t receive_batch_size{32};
absl::Duration polling_timeout{absl::Seconds(30)};
};
diff --git a/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp
b/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp
index 57268f35..c607760c 100644
--- a/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp
+++ b/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp
@@ -21,7 +21,6 @@
#include "ConsumeStats.h"
#include "ConsumeTask.h"
#include "PushConsumerImpl.h"
-#include "Tag.h"
#include "ThreadPoolImpl.h"
#include "rocketmq/ErrorCode.h"
#include "rocketmq/Logger.h"
@@ -67,13 +66,44 @@ void
ConsumeMessageServiceImpl::dispatch(std::shared_ptr<ProcessQueue> process_q
}
if (consumer->config().subscriber.fifo) {
- auto consume_task = std::make_shared<ConsumeTask>(shared_from_this(),
process_queue, std::move(messages));
- pool_->submit([consume_task]() { consume_task->process(); });
+ if (!consumer->config().subscriber.fifo_consume_accelerator) {
+ auto consume_task = std::make_shared<ConsumeTask>(
+ shared_from_this(), process_queue, std::move(messages));
+ pool_->submit([consume_task]() { consume_task->process(); });
+ } else {
+ std::map<std::string, std::vector<MessageConstSharedPtr>>
grouped_messages;
+ std::vector<MessageConstSharedPtr> ungrouped_messages;
+
+ for (const auto& message : messages) {
+ if (!message->group().empty()) {
+ grouped_messages[message->group()].push_back(message);
+ } else {
+ ungrouped_messages.push_back(message);
+ }
+ }
+
+ SPDLOG_INFO("FifoConsumeService accelerator enable, message_count={},
group_count={}",
+ messages.size(), grouped_messages.size() +
(ungrouped_messages.empty() ? 0 : 1));
+
+ // C++17 could use [group, msg_list]
+ for (const auto& pair : grouped_messages) {
+ auto consume_task = std::make_shared<ConsumeTask>(
+ shared_from_this(), process_queue, pair.second);
+ pool_->submit([consume_task]() { consume_task->process(); });
+ }
+
+ if (!ungrouped_messages.empty()) {
+ auto consume_task = std::make_shared<ConsumeTask>(
+ shared_from_this(), process_queue, ungrouped_messages);
+ pool_->submit([consume_task]() { consume_task->process(); });
+ }
+ }
return;
}
for (const auto& message : messages) {
- auto consume_task = std::make_shared<ConsumeTask>(shared_from_this(),
process_queue, message);
+ auto consume_task = std::make_shared<ConsumeTask>(
+ shared_from_this(), process_queue, message);
pool_->submit([consume_task]() { consume_task->process(); });
}
}
diff --git a/cpp/source/rocketmq/PushConsumer.cpp
b/cpp/source/rocketmq/PushConsumer.cpp
index a0fd0055..a2f22803 100644
--- a/cpp/source/rocketmq/PushConsumer.cpp
+++ b/cpp/source/rocketmq/PushConsumer.cpp
@@ -47,6 +47,7 @@ PushConsumer PushConsumerBuilder::build() {
impl->withSsl(configuration_.withSsl());
impl->withCredentialsProvider(configuration_.credentialsProvider());
impl->withRequestTimeout(configuration_.requestTimeout());
+ impl->withFifoConsumeAccelerator(fifo_consume_accelerator_);
impl->start();
return PushConsumer(impl);
}
diff --git a/cpp/source/rocketmq/PushConsumerImpl.cpp
b/cpp/source/rocketmq/PushConsumerImpl.cpp
index acc08177..4ba038b2 100644
--- a/cpp/source/rocketmq/PushConsumerImpl.cpp
+++ b/cpp/source/rocketmq/PushConsumerImpl.cpp
@@ -462,7 +462,7 @@ void PushConsumerImpl::consumeThreadPoolSize(int
thread_pool_size) {
}
void PushConsumerImpl::registerMessageListener(MessageListener
message_listener) {
- message_listener_ = message_listener;
+ message_listener_ = std::move(message_listener);
}
std::size_t PushConsumerImpl::getProcessQueueTableSize() {
diff --git a/cpp/source/rocketmq/include/ClientImpl.h
b/cpp/source/rocketmq/include/ClientImpl.h
index 9b6a6502..96865399 100644
--- a/cpp/source/rocketmq/include/ClientImpl.h
+++ b/cpp/source/rocketmq/include/ClientImpl.h
@@ -102,6 +102,10 @@ public:
client_config_.withSsl = with_ssl;
}
+ void withFifoConsumeAccelerator(bool fifo_consume_accelerator) {
+ client_config_.subscriber.fifo_consume_accelerator =
fifo_consume_accelerator;
+ }
+
/**
* Expose for test purpose only.
*/
diff --git a/cpp/source/rocketmq/include/SimpleConsumerImpl.h
b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
index 7fc63b6d..39e6523e 100644
--- a/cpp/source/rocketmq/include/SimpleConsumerImpl.h
+++ b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
@@ -61,10 +61,6 @@ public:
long_polling_duration_ = receive_timeout;
}
- void withSsl(bool enable) {
- client_config_.withSsl = enable;
- }
-
protected:
void topicsOfInterest(std::vector<std::string> &topics) override;