This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new bd1e7ab7 [ISSUE #967] C++ client support namespace, empty body check
and recall message (#968)
bd1e7ab7 is described below
commit bd1e7ab73a0c939d8e0d008b0e068bb63543e42b
Author: lizhimins <[email protected]>
AuthorDate: Wed Mar 26 11:18:31 2025 +0800
[ISSUE #967] C++ client support namespace, empty body check and recall
message (#968)
* [ISSUE #967] C++ client support namespace, empty body check and recall
message
---
cpp/examples/ExampleProducerWithTimedMessage.cpp | 23 +++-
cpp/include/rocketmq/Configuration.h | 7 ++
cpp/include/rocketmq/ErrorCode.h | 5 +
cpp/include/rocketmq/Producer.h | 12 +++
.../rocketmq/{SendReceipt.h => RecallReceipt.h} | 9 +-
cpp/include/rocketmq/SendReceipt.h | 2 +
cpp/source/base/Configuration.cpp | 5 +
cpp/source/base/include/Protocol.h | 2 +
cpp/source/client/ClientManagerImpl.cpp | 118 ++++++++++++++++++++-
cpp/source/client/RpcClientImpl.cpp | 7 ++
cpp/source/client/Signature.cpp | 1 +
cpp/source/client/include/ClientManager.h | 11 +-
cpp/source/client/include/ClientManagerImpl.h | 6 ++
cpp/source/client/include/RpcClient.h | 3 +
cpp/source/client/include/RpcClientImpl.h | 3 +
cpp/source/client/include/SendResult.h | 1 +
cpp/source/client/mocks/include/RpcClientMock.h | 3 +
cpp/source/log/LoggerImpl.cpp | 2 +-
cpp/source/rocketmq/Producer.cpp | 7 +-
cpp/source/rocketmq/ProducerImpl.cpp | 87 +++++++++++++--
cpp/source/rocketmq/PushConsumer.cpp | 1 +
cpp/source/rocketmq/PushConsumerImpl.cpp | 45 ++++++--
cpp/source/rocketmq/SendContext.cpp | 1 +
cpp/source/rocketmq/SimpleConsumer.cpp | 1 +
cpp/source/rocketmq/SimpleConsumerImpl.cpp | 20 +++-
cpp/source/rocketmq/TopicPublishInfo.cpp | 7 +-
cpp/source/rocketmq/include/ClientImpl.h | 4 +
cpp/source/rocketmq/include/ProcessQueueImpl.h | 1 +
cpp/source/rocketmq/include/ProducerImpl.h | 6 ++
cpp/source/rocketmq/include/PushConsumerImpl.h | 2 +-
30 files changed, 365 insertions(+), 37 deletions(-)
diff --git a/cpp/examples/ExampleProducerWithTimedMessage.cpp
b/cpp/examples/ExampleProducerWithTimedMessage.cpp
index ba2a45f7..f4624deb 100644
--- a/cpp/examples/ExampleProducerWithTimedMessage.cpp
+++ b/cpp/examples/ExampleProducerWithTimedMessage.cpp
@@ -109,9 +109,30 @@ int main(int argc, char* argv[]) {
std::chrono::system_clock::now() +
std::chrono::seconds(10)) // This message would
be available to consumers after 10 seconds
.build();
+
std::error_code ec;
SendReceipt send_receipt = producer.send(std::move(message), ec);
- std::cout << "Message-ID: " << send_receipt.message_id << std::endl;
+
+ if (ec) {
+ std::cout << "Message-ID: " << send_receipt.message_id << " send
error"<< std::endl;
+ } else {
+ std::cout << "Message-ID: " << send_receipt.message_id << ", "
+ << "Message-Recall-Handle: " << send_receipt.recall_handle
<< std::endl;
+
+ // To attempt to recall a message, server support is required to
perform this operation.
+ if (i % 2) {
+ RecallReceipt recall_receipt = producer.recall(FLAGS_topic,
send_receipt.recall_handle, ec);
+
+ if (ec) {
+ std::cout << "Message-ID: " << send_receipt.message_id
+ << ", Recall ErrorCode: " << ec << std::endl;
+ } else {
+ std::cout << "Message-ID: " << send_receipt.message_id
+ << ", Message-Recall-ID: " << recall_receipt.message_id
<< std::endl;
+ }
+ }
+ }
+
count++;
}
} catch (...) {
diff --git a/cpp/include/rocketmq/Configuration.h
b/cpp/include/rocketmq/Configuration.h
index 6dcd4137..a653c87a 100644
--- a/cpp/include/rocketmq/Configuration.h
+++ b/cpp/include/rocketmq/Configuration.h
@@ -35,6 +35,10 @@ public:
return endpoints_;
}
+ const std::string& resourceNamespace() const {
+ return resource_namespace_;
+ }
+
CredentialsProviderPtr credentialsProvider() const {
return credentials_provider_;
}
@@ -54,6 +58,7 @@ protected:
private:
std::string endpoints_;
+ std::string resource_namespace_;
CredentialsProviderPtr credentials_provider_;
std::chrono::milliseconds
request_timeout_{ConfigurationDefaults::RequestTimeout};
bool tls_ = true;
@@ -63,6 +68,8 @@ class ConfigurationBuilder {
public:
ConfigurationBuilder& withEndpoints(std::string endpoints);
+ ConfigurationBuilder& withNamespace(std::string resource_namespace);
+
ConfigurationBuilder&
withCredentialsProvider(std::shared_ptr<CredentialsProvider> provider);
ConfigurationBuilder& withRequestTimeout(std::chrono::milliseconds
request_timeout);
diff --git a/cpp/include/rocketmq/ErrorCode.h b/cpp/include/rocketmq/ErrorCode.h
index 51784e16..eb111926 100644
--- a/cpp/include/rocketmq/ErrorCode.h
+++ b/cpp/include/rocketmq/ErrorCode.h
@@ -143,6 +143,11 @@ enum class ErrorCode : int {
*/
MessageBodyTooLarge = 41301,
+ /**
+ * @brief Message body is empty.
+ */
+ MessageBodyEmpty = 41302,
+
/**
* @brief When trying to perform an action whose dependent procedure state is
* not right, this code will be used.
diff --git a/cpp/include/rocketmq/Producer.h b/cpp/include/rocketmq/Producer.h
index d3de06e9..be1026f8 100644
--- a/cpp/include/rocketmq/Producer.h
+++ b/cpp/include/rocketmq/Producer.h
@@ -22,6 +22,7 @@
#include "Configuration.h"
#include "Message.h"
+#include "RecallReceipt.h"
#include "SendCallback.h"
#include "SendReceipt.h"
#include "Transaction.h"
@@ -67,6 +68,17 @@ public:
SendReceipt send(MessageConstPtr message, std::error_code& ec, Transaction&
transaction);
+ /**
+ * @brief Attempts to cancel a scheduled message based on the provided topic
and recall handle.
+ * This operation requires server support to be executed successfully.
+ *
+ * @param topic The topic associated with the scheduled message to be
canceled.
+ * @param recall_handle A unique identifier or handle for the message
cancellation operation.
+ * @param ec An error code that will be set if the operation encounters an
error.
+ * @return RecallReceipt A receipt object indicating the result of the
cancellation operation.
+ */
+ RecallReceipt recall(std::string& topic, std::string& recall_handle,
std::error_code& ec) noexcept;
+
private:
explicit Producer(std::shared_ptr<ProducerImpl> impl) :
impl_(std::move(impl)) {
}
diff --git a/cpp/include/rocketmq/SendReceipt.h
b/cpp/include/rocketmq/RecallReceipt.h
similarity index 85%
copy from cpp/include/rocketmq/SendReceipt.h
copy to cpp/include/rocketmq/RecallReceipt.h
index 7eef6e79..dbbb0ba7 100644
--- a/cpp/include/rocketmq/SendReceipt.h
+++ b/cpp/include/rocketmq/RecallReceipt.h
@@ -19,18 +19,11 @@
#include <string>
#include "RocketMQ.h"
-#include "rocketmq/Message.h"
ROCKETMQ_NAMESPACE_BEGIN
-struct SendReceipt {
- std::string target;
-
+struct RecallReceipt {
std::string message_id;
-
- std::string transaction_id;
-
- MessageConstPtr message;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/include/rocketmq/SendReceipt.h
b/cpp/include/rocketmq/SendReceipt.h
index 7eef6e79..4e797770 100644
--- a/cpp/include/rocketmq/SendReceipt.h
+++ b/cpp/include/rocketmq/SendReceipt.h
@@ -30,6 +30,8 @@ struct SendReceipt {
std::string transaction_id;
+ std::string recall_handle;
+
MessageConstPtr message;
};
diff --git a/cpp/source/base/Configuration.cpp
b/cpp/source/base/Configuration.cpp
index 66cff2e8..13330261 100644
--- a/cpp/source/base/Configuration.cpp
+++ b/cpp/source/base/Configuration.cpp
@@ -28,6 +28,11 @@ ConfigurationBuilder&
ConfigurationBuilder::withEndpoints(std::string endpoints)
return *this;
}
+ConfigurationBuilder& ConfigurationBuilder::withNamespace(std::string
resource_namespace) {
+ configuration_.resource_namespace_ = std::move(resource_namespace);
+ return *this;
+}
+
ConfigurationBuilder&
ConfigurationBuilder::withCredentialsProvider(std::shared_ptr<CredentialsProvider>
provider) {
configuration_.credentials_provider_ = std::move(provider);
return *this;
diff --git a/cpp/source/base/include/Protocol.h
b/cpp/source/base/include/Protocol.h
index 5a8e671f..2c11e70d 100644
--- a/cpp/source/base/include/Protocol.h
+++ b/cpp/source/base/include/Protocol.h
@@ -46,6 +46,8 @@ using HeartbeatRequest = rmq::HeartbeatRequest;
using HeartbeatResponse = rmq::HeartbeatResponse;
using EndTransactionRequest = rmq::EndTransactionRequest;
using EndTransactionResponse = rmq::EndTransactionResponse;
+using RecallMessageRequest = rmq::RecallMessageRequest;
+using RecallMessageResponse = rmq::RecallMessageResponse;
using RecoverOrphanedTransactionCommand =
rmq::RecoverOrphanedTransactionCommand;
using PrintThreadStackTraceCommand = rmq::PrintThreadStackTraceCommand;
using ThreadStackTrace = rmq::ThreadStackTrace;
diff --git a/cpp/source/client/ClientManagerImpl.cpp
b/cpp/source/client/ClientManagerImpl.cpp
index 053f7723..3b22ddd6 100644
--- a/cpp/source/client/ClientManagerImpl.cpp
+++ b/cpp/source/client/ClientManagerImpl.cpp
@@ -325,8 +325,11 @@ bool ClientManagerImpl::send(const std::string&
target_host,
auto first = invocation_context->response.entries().begin();
send_result.message_id = first->message_id();
send_result.transaction_id = first->transaction_id();
+ // unique handle to identify a message to recall,
+ // only delay message is supported for now
+ send_result.recall_handle = first->recall_handle();
} else {
- SPDLOG_ERROR("Unexpected send-message-response: {}",
invocation_context->response.DebugString());
+ SPDLOG_ERROR("Unexpected send-message-response: {}",
invocation_context->response.ShortDebugString());
}
break;
}
@@ -361,6 +364,12 @@ bool ClientManagerImpl::send(const std::string&
target_host,
break;
}
+ case rmq::Code::ILLEGAL_DELIVERY_TIME: {
+ SPDLOG_ERROR("IllegalDeliveryTime: {}. Host={}", status.message(),
invocation_context->remote_address);
+ send_result.ec = ErrorCode::IllegalMessageProperty;
+ break;
+ }
+
case rmq::Code::MESSAGE_PROPERTIES_TOO_LARGE: {
SPDLOG_ERROR("MessagePropertiesTooLarge: {}. Host={}",
status.message(), invocation_context->remote_address);
send_result.ec = ErrorCode::MessagePropertiesTooLarge;
@@ -373,6 +382,12 @@ bool ClientManagerImpl::send(const std::string&
target_host,
break;
}
+ case rmq::Code::MESSAGE_BODY_EMPTY: {
+ SPDLOG_ERROR("MessageBodyEmpty: {}. Host={}", status.message(),
invocation_context->remote_address);
+ send_result.ec = ErrorCode::MessageBodyTooLarge;
+ break;
+ }
+
case rmq::Code::TOPIC_NOT_FOUND: {
SPDLOG_WARN("TopicNotFound: {}. Host={}", status.message(),
invocation_context->remote_address);
send_result.ec = ErrorCode::TopicNotFound;
@@ -1288,6 +1303,107 @@ void ClientManagerImpl::endTransaction(
client->asyncEndTransaction(request, invocation_context);
}
+void ClientManagerImpl::recallMessage(const std::string& target_host, const
Metadata& metadata,
+ const RecallMessageRequest& request,
std::chrono::milliseconds timeout,
+ const std::function<void(const
std::error_code&, const RecallMessageResponse&)>& cb) {
+
+ SPDLOG_DEBUG("RecallMessage Request: {}", request.ShortDebugString());
+
+ RpcClientSharedPtr client = getRpcClient(target_host);
+ if (!client) {
+ SPDLOG_WARN("No RPC client for {}", target_host);
+ RecallMessageResponse response;
+ std::error_code ec = ErrorCode::BadRequest;
+ cb(ec, response);
+ return;
+ }
+
+ auto invocation_context = new InvocationContext<RecallMessageResponse>();
+ invocation_context->task_name = fmt::format("Recall message, topic={},
recall handle={} to {}",
+
request.topic().ShortDebugString(), request.recall_handle().data(),
target_host);
+ invocation_context->remote_address = target_host;
+ for (const auto& item : metadata) {
+ invocation_context->context.AddMetadata(item.first, item.second);
+ }
+ invocation_context->context.set_deadline(std::chrono::system_clock::now() +
timeout);
+
+ auto callback =
+ [target_host, cb](const InvocationContext<RecallMessageResponse>*
invocation_context) {
+
+ std::error_code ec;
+ if (!invocation_context->status.ok()) {
+ SPDLOG_WARN("Failed to write EndTransaction to wire. gRPC-code: {},
gRPC-message: {}, host={}",
+ invocation_context->status.error_code(),
invocation_context->status.error_message(),
+ invocation_context->remote_address);
+ ec = ErrorCode::BadRequest;
+ cb(ec, invocation_context->response);
+ return;
+ }
+
+ auto&& status = invocation_context->response.status();
+ auto&& peer_address = invocation_context->remote_address;
+ switch (status.code()) {
+ case rmq::Code::OK: {
+ SPDLOG_DEBUG("Recall message OK. Response: {}, host={}",
+ invocation_context->response.ShortDebugString(),
peer_address);
+ break;
+ }
+
+ case rmq::Code::ILLEGAL_TOPIC: {
+ SPDLOG_WARN("IllegalTopic: {}, host={}", status.message(),
peer_address);
+ ec = ErrorCode::IllegalTopic;
+ break;
+ }
+
+ case rmq::Code::CLIENT_ID_REQUIRED: {
+ SPDLOG_WARN("ClientIdRequired: {}, host={}", status.message(),
peer_address);
+ ec = ErrorCode::InternalClientError;
+ break;
+ }
+
+ case rmq::Code::TOPIC_NOT_FOUND: {
+ SPDLOG_WARN("TopicNotFound: {}, host={}", status.message(),
peer_address);
+ ec = ErrorCode::TopicNotFound;
+ break;
+ }
+
+ case rmq::Code::UNAUTHORIZED: {
+ SPDLOG_WARN("Unauthorized: {}, host={}", status.message(),
peer_address);
+ ec = ErrorCode::Unauthorized;
+ break;
+ }
+
+ case rmq::Code::FORBIDDEN: {
+ SPDLOG_WARN("Forbidden: {}, host={}", status.message(), peer_address);
+ ec = ErrorCode::Forbidden;
+ break;
+ }
+
+ case rmq::Code::INTERNAL_SERVER_ERROR: {
+ SPDLOG_WARN("InternalServerError: {}, host={}", status.message(),
peer_address);
+ ec = ErrorCode::InternalServerError;
+ break;
+ }
+
+ case rmq::Code::PROXY_TIMEOUT: {
+ SPDLOG_WARN("GatewayTimeout: {}, host={}", status.message(),
peer_address);
+ ec = ErrorCode::GatewayTimeout;
+ break;
+ }
+
+ default: {
+ SPDLOG_WARN("NotSupported: please upgrade SDK to latest release. {},
host={}", status.message(), peer_address);
+ ec = ErrorCode::NotSupported;
+ break;
+ }
+ }
+ cb(ec, invocation_context->response);
+ };
+
+ invocation_context->callback = callback;
+ client->asyncRecallMessage(request, invocation_context);
+}
+
void ClientManagerImpl::forwardMessageToDeadLetterQueue(const std::string&
target_host,
const Metadata&
metadata,
const
ForwardMessageToDeadLetterQueueRequest& request,
diff --git a/cpp/source/client/RpcClientImpl.cpp
b/cpp/source/client/RpcClientImpl.cpp
index d9f10212..2e183f2a 100644
--- a/cpp/source/client/RpcClientImpl.cpp
+++ b/cpp/source/client/RpcClientImpl.cpp
@@ -124,6 +124,13 @@ void RpcClientImpl::asyncEndTransaction(const
EndTransactionRequest& request,
stub_->async()->EndTransaction(&invocation_context->context, &request,
&invocation_context->response, callback);
}
+void RpcClientImpl::asyncRecallMessage(const RecallMessageRequest& request,
+
InvocationContext<RecallMessageResponse>* invocation_context) {
+ std::weak_ptr<RpcClient> rpc_client(shared_from_this());
+ auto callback = std::bind(&RpcClientImpl::asyncCallback, rpc_client,
invocation_context, std::placeholders::_1);
+ stub_->async()->RecallMessage(&invocation_context->context, &request,
&invocation_context->response, callback);
+}
+
bool RpcClientImpl::ok() const {
return channel_ && grpc_connectivity_state::GRPC_CHANNEL_SHUTDOWN !=
channel_->GetState(false);
}
diff --git a/cpp/source/client/Signature.cpp b/cpp/source/client/Signature.cpp
index e25dbb54..4b43f216 100644
--- a/cpp/source/client/Signature.cpp
+++ b/cpp/source/client/Signature.cpp
@@ -28,6 +28,7 @@ void Signature::sign(const ClientConfig& client,
absl::flat_hash_map<std::string
metadata.insert({MetadataConstants::LANGUAGE_KEY, "CPP"});
// Add common headers
metadata.insert({MetadataConstants::CLIENT_ID_KEY, client.client_id});
+ metadata.insert({MetadataConstants::NAMESPACE_KEY,
client.resource_namespace});
metadata.insert({MetadataConstants::CLIENT_VERSION_KEY,
MetadataConstants::CLIENT_VERSION});
metadata.insert({MetadataConstants::PROTOCOL_VERSION_KEY,
protocolVersion()});
diff --git a/cpp/source/client/include/ClientManager.h
b/cpp/source/client/include/ClientManager.h
index 02b232b2..c67c6ea8 100644
--- a/cpp/source/client/include/ClientManager.h
+++ b/cpp/source/client/include/ClientManager.h
@@ -81,12 +81,15 @@ public:
const EndTransactionRequest& request,
std::chrono::milliseconds timeout,
const std::function<void(const std::error_code&,
const EndTransactionResponse&)>& cb) = 0;
+ virtual void recallMessage(const std::string& target_host, const Metadata&
metadata,
+ const RecallMessageRequest& request,
std::chrono::milliseconds timeout,
+ const std::function<void(const std::error_code&,
const RecallMessageResponse&)>& cb) = 0;
+
virtual void addClientObserver(std::weak_ptr<Client> client) = 0;
- virtual void
- queryAssignment(const std::string& target, const Metadata& metadata, const
QueryAssignmentRequest& request,
- std::chrono::milliseconds timeout,
- const std::function<void(const std::error_code&, const
QueryAssignmentResponse&)>& cb) = 0;
+ virtual void queryAssignment(const std::string& target, const Metadata&
metadata,
+ const QueryAssignmentRequest& request,
std::chrono::milliseconds timeout,
+ const std::function<void(const
std::error_code&, const QueryAssignmentResponse&)>& cb) = 0;
virtual void receiveMessage(const std::string& target, const Metadata&
metadata, const ReceiveMessageRequest& request,
std::chrono::milliseconds timeout,
ReceiveMessageCallback callback) = 0;
diff --git a/cpp/source/client/include/ClientManagerImpl.h
b/cpp/source/client/include/ClientManagerImpl.h
index 5f1b27ca..cd862154 100644
--- a/cpp/source/client/include/ClientManagerImpl.h
+++ b/cpp/source/client/include/ClientManagerImpl.h
@@ -176,6 +176,12 @@ public:
std::chrono::milliseconds timeout,
const std::function<void(const std::error_code&, const
EndTransactionResponse&)>& cb) override;
+ void recallMessage(const std::string& target_host,
+ const Metadata& metadata,
+ const RecallMessageRequest& request,
+ std::chrono::milliseconds timeout,
+ const std::function<void(const std::error_code&, const
RecallMessageResponse&)>& cb) override;
+
std::error_code notifyClientTermination(const std::string& target_host,
const Metadata& metadata,
const
NotifyClientTerminationRequest& request,
diff --git a/cpp/source/client/include/RpcClient.h
b/cpp/source/client/include/RpcClient.h
index fbb30171..ff926957 100644
--- a/cpp/source/client/include/RpcClient.h
+++ b/cpp/source/client/include/RpcClient.h
@@ -72,6 +72,9 @@ public:
virtual void asyncEndTransaction(const EndTransactionRequest& request,
InvocationContext<EndTransactionResponse>*
invocation_context) = 0;
+ virtual void asyncRecallMessage(const RecallMessageRequest& request,
+ InvocationContext<RecallMessageResponse>*
invocation_context) = 0;
+
virtual std::shared_ptr<TelemetryBidiReactor>
asyncTelemetry(std::weak_ptr<Client> client) = 0;
virtual void asyncForwardMessageToDeadLetterQueue(
diff --git a/cpp/source/client/include/RpcClientImpl.h
b/cpp/source/client/include/RpcClientImpl.h
index 35316ec6..9d4483e1 100644
--- a/cpp/source/client/include/RpcClientImpl.h
+++ b/cpp/source/client/include/RpcClientImpl.h
@@ -66,6 +66,9 @@ public:
void asyncEndTransaction(const EndTransactionRequest& request,
InvocationContext<EndTransactionResponse>*
invocation_context) override;
+ void asyncRecallMessage(const RecallMessageRequest& request,
+ InvocationContext<RecallMessageResponse>*
invocation_context) override;
+
std::shared_ptr<TelemetryBidiReactor> asyncTelemetry(std::weak_ptr<Client>
client) override;
void asyncForwardMessageToDeadLetterQueue(
diff --git a/cpp/source/client/include/SendResult.h
b/cpp/source/client/include/SendResult.h
index 3596f61f..04999555 100644
--- a/cpp/source/client/include/SendResult.h
+++ b/cpp/source/client/include/SendResult.h
@@ -28,6 +28,7 @@ struct SendResult {
std::string message_id;
std::string transaction_id;
+ std::string recall_handle;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/mocks/include/RpcClientMock.h
b/cpp/source/client/mocks/include/RpcClientMock.h
index a8475976..898e8453 100644
--- a/cpp/source/client/mocks/include/RpcClientMock.h
+++ b/cpp/source/client/mocks/include/RpcClientMock.h
@@ -56,6 +56,9 @@ public:
MOCK_METHOD(void, asyncEndTransaction, (const EndTransactionRequest&,
InvocationContext<EndTransactionResponse>*),
(override));
+ MOCK_METHOD(void, asyncRecallMessage, (const RecallMessageRequest&,
InvocationContext<RecallMessageResponse>*),
+ (override));
+
MOCK_METHOD(void, asyncForwardMessageToDeadLetterQueue,
(const ForwardMessageToDeadLetterQueueRequest&,
InvocationContext<ForwardMessageToDeadLetterQueueResponse>*),
diff --git a/cpp/source/log/LoggerImpl.cpp b/cpp/source/log/LoggerImpl.cpp
index cdd2f473..a7047d4a 100644
--- a/cpp/source/log/LoggerImpl.cpp
+++ b/cpp/source/log/LoggerImpl.cpp
@@ -76,7 +76,7 @@ void LoggerImpl::init0() {
abort();
}
}
- std::cout << "RocketMQ log files path: " << log_dir.c_str() << std::endl;
+ // std::cout << "RocketMQ log files path: " << log_dir.c_str() << std::endl;
if (pattern_.empty()) {
pattern_ = DEFAULT_PATTERN;
diff --git a/cpp/source/rocketmq/Producer.cpp b/cpp/source/rocketmq/Producer.cpp
index 4f29383f..916c47a4 100644
--- a/cpp/source/rocketmq/Producer.cpp
+++ b/cpp/source/rocketmq/Producer.cpp
@@ -16,7 +16,6 @@
*/
#include "rocketmq/Producer.h"
-#include <chrono>
#include <memory>
#include <system_error>
#include <utility>
@@ -26,6 +25,7 @@
#include "rocketmq/ErrorCode.h"
#include "rocketmq/SendReceipt.h"
#include "rocketmq/Transaction.h"
+#include "rocketmq/RecallReceipt.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -68,6 +68,10 @@ SendReceipt Producer::send(MessageConstPtr message,
std::error_code& ec, Transac
return impl_->send(std::move(message), ec, transaction);
}
+RecallReceipt Producer::recall(std::string& topic, std::string& recall_handle,
std::error_code& ec) noexcept {
+ return impl_->recall(topic, recall_handle, ec);
+}
+
ProducerBuilder Producer::newBuilder() {
return {};
}
@@ -77,6 +81,7 @@ ProducerBuilder::ProducerBuilder() :
impl_(std::make_shared<ProducerImpl>()){};
ProducerBuilder& ProducerBuilder::withConfiguration(Configuration
configuration) {
auto name_server_resolver =
std::make_shared<StaticNameServerResolver>(configuration.endpoints());
impl_->withNameServerResolver(std::move(name_server_resolver));
+ impl_->withResourceNamespace(configuration.resourceNamespace());
impl_->withCredentialsProvider(configuration.credentialsProvider());
impl_->withRequestTimeout(configuration.requestTimeout());
impl_->withSsl(configuration.withSsl());
diff --git a/cpp/source/rocketmq/ProducerImpl.cpp
b/cpp/source/rocketmq/ProducerImpl.cpp
index 3de3d37d..7ea7d4f6 100644
--- a/cpp/source/rocketmq/ProducerImpl.cpp
+++ b/cpp/source/rocketmq/ProducerImpl.cpp
@@ -17,14 +17,12 @@
#include "ProducerImpl.h"
#include <algorithm>
-#include <atomic>
#include <cassert>
#include <chrono>
#include <memory>
#include <system_error>
#include <utility>
-#include "apache/rocketmq/v2/definition.pb.h"
#include "MixAll.h"
#include "Protocol.h"
#include "PublishInfoCallback.h"
@@ -40,6 +38,7 @@
#include "rocketmq/SendReceipt.h"
#include "rocketmq/Transaction.h"
#include "rocketmq/TransactionChecker.h"
+#include "rocketmq/RecallReceipt.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -97,8 +96,12 @@ void ProducerImpl::validate(const Message& message,
std::error_code& ec) {
MixAll::validate(message, ec);
if (!ec) {
+ if (message.body().empty()) {
+ SPDLOG_WARN("Body of the message is null, topic={}", message.topic());
+ ec = ErrorCode::MessageBodyEmpty;
+ }
if (message.body().length() > client_config_.publisher.max_body_size) {
- SPDLOG_WARN("Body of the message to send is too large");
+ SPDLOG_WARN("Body of the message to send is too large, topic={}",
message.topic());
ec = ErrorCode::PayloadTooLarge;
}
}
@@ -224,13 +227,15 @@ SendReceipt ProducerImpl::send(MessageConstPtr message,
std::error_code& ec) noe
SendReceipt send_receipt;
// Define callback
- auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt&
receipt) mutable {
+ auto callback =
+ [&, mtx, cv](const std::error_code& code, const SendReceipt& receipt)
mutable {
ec = code;
- SendReceipt& receipt_mut = const_cast<SendReceipt&>(receipt);
+ auto& receipt_mut = const_cast<SendReceipt&>(receipt);
send_receipt.target = std::move(receipt_mut.target);
send_receipt.message_id = std::move(receipt_mut.message_id);
send_receipt.message = std::move(receipt_mut.message);
send_receipt.transaction_id = std::move(receipt_mut.transaction_id);
+ send_receipt.recall_handle = std::move(receipt_mut.recall_handle);
{
absl::MutexLock lk(mtx.get());
completed = true;
@@ -450,8 +455,9 @@ bool ProducerImpl::endTransaction0(const MiniTransaction&
transaction, Transacti
}
};
- client_manager_->endTransaction(transaction.target, metadata, request,
absl::ToChronoMilliseconds(requestTimeout()),
- cb);
+ client_manager_->endTransaction(
+ transaction.target, metadata, request,
absl::ToChronoMilliseconds(requestTimeout()), cb);
+
{
absl::MutexLock lk(mtx.get());
cv->Wait(mtx.get());
@@ -509,6 +515,73 @@ SendReceipt ProducerImpl::send(MessageConstPtr message,
std::error_code& ec, Tra
return send_receipt;
}
+RecallReceipt ProducerImpl::recall(const std::string& topic, std::string&
recall_handle, std::error_code& ec) noexcept {
+ ensureRunning(ec);
+ if (ec) {
+ SPDLOG_WARN("Producer is not running");
+ return RecallReceipt{};
+ }
+
+ auto topic_publish_info = getPublishInfo(topic);
+ if (!topic_publish_info) {
+ SPDLOG_WARN("Route of topic[{}] is not found", topic);
+ ec = ErrorCode::NotFound;
+ return RecallReceipt{};
+ }
+
+ std::vector<rmq::MessageQueue> message_queue_list;
+ absl::optional<std::string> message_group{};
+
+ if (!topic_publish_info->selectMessageQueues(message_group,
message_queue_list)) {
+ SPDLOG_WARN("Failed to select an addressable message queue for timer
topic[{}]", topic);
+ ec = ErrorCode::NotFound;
+ return RecallReceipt{};
+ }
+
+ const std::string& target = urlOf(message_queue_list.front());
+ if (target.empty()) {
+ SPDLOG_WARN("Failed to resolve broker address from MessageQueue");
+ ec = ErrorCode::BadGateway;
+ return RecallReceipt{};
+ }
+
+ RecallMessageRequest request;
+ request.mutable_topic()->set_resource_namespace(resourceNamespace());
+ request.mutable_topic()->set_name(topic);
+ request.set_recall_handle(recall_handle);
+
+ Metadata metadata;
+ Signature::sign(client_config_, metadata);
+
+ auto mtx = std::make_shared<absl::Mutex>();
+ auto cv = std::make_shared<absl::CondVar>();
+
+ RecallReceipt recall_receipt;
+ auto callback =
+ [&, mtx, cv, topic](const std::error_code& code, const
RecallMessageResponse& response) {
+
+ ec = code;
+ if (!ec) {
+ recall_receipt.message_id = response.message_id();
+ }
+
+ {
+ absl::MutexLock lk(mtx.get());
+ cv->SignalAll();
+ }
+ };
+
+ client_manager_->recallMessage(
+ target, metadata, request, absl::ToChronoMilliseconds(requestTimeout()),
callback);
+
+ {
+ absl::MutexLock lk(mtx.get());
+ cv->Wait(mtx.get());
+ }
+
+ return recall_receipt;
+}
+
void ProducerImpl::getPublishInfoAsync(const std::string& topic, const
PublishInfoCallback& cb) {
TopicPublishInfoPtr ptr;
{
diff --git a/cpp/source/rocketmq/PushConsumer.cpp
b/cpp/source/rocketmq/PushConsumer.cpp
index 2b1c1566..a0fd0055 100644
--- a/cpp/source/rocketmq/PushConsumer.cpp
+++ b/cpp/source/rocketmq/PushConsumer.cpp
@@ -43,6 +43,7 @@ PushConsumer PushConsumerBuilder::build() {
}
impl->consumeThreadPoolSize(consume_thread_);
impl->withNameServerResolver(std::make_shared<StaticNameServerResolver>(configuration_.endpoints()));
+ impl->withResourceNamespace(configuration_.resourceNamespace());
impl->withSsl(configuration_.withSsl());
impl->withCredentialsProvider(configuration_.credentialsProvider());
impl->withRequestTimeout(configuration_.requestTimeout());
diff --git a/cpp/source/rocketmq/PushConsumerImpl.cpp
b/cpp/source/rocketmq/PushConsumerImpl.cpp
index 505854db..712ac814 100644
--- a/cpp/source/rocketmq/PushConsumerImpl.cpp
+++ b/cpp/source/rocketmq/PushConsumerImpl.cpp
@@ -70,13 +70,14 @@ void PushConsumerImpl::start() {
return;
}
+ client_config_.subscriber.group.set_resource_namespace(resourceNamespace());
client_manager_->addClientObserver(shared_from_this());
fetchRoutes();
- SPDLOG_INFO("start concurrently consume service: {}",
client_config_.subscriber.group.name());
- consume_message_service_ =
- std::make_shared<ConsumeMessageServiceImpl>(shared_from_this(),
consume_thread_pool_size_, message_listener_);
+ SPDLOG_INFO("Start concurrently consume service: {}",
client_config_.subscriber.group.name());
+ consume_message_service_ = std::make_shared<ConsumeMessageServiceImpl>(
+ shared_from_this(), consume_thread_pool_size_, message_listener_);
consume_message_service_->start();
// Heartbeat depends on initialization of consume-message-service
@@ -91,7 +92,8 @@ void PushConsumerImpl::start() {
};
scan_assignment_handle_ = client_manager_->getScheduler()->schedule(
- scan_assignment_functor, SCAN_ASSIGNMENT_TASK_NAME,
std::chrono::milliseconds(100), std::chrono::seconds(5));
+ scan_assignment_functor, SCAN_ASSIGNMENT_TASK_NAME,
+ std::chrono::milliseconds(100), std::chrono::seconds(5));
SPDLOG_INFO("PushConsumer started, groupName={}",
client_config_.subscriber.group.name());
auto collect_stats_functor = [consumer_weak_ptr] {
@@ -101,8 +103,9 @@ void PushConsumerImpl::start() {
}
};
- collect_stats_handle_ =
client_manager_->getScheduler()->schedule(collect_stats_functor,
COLLECT_STATS_TASK_NAME,
-
std::chrono::seconds(3), std::chrono::seconds(3));
+ collect_stats_handle_ = client_manager_->getScheduler()->schedule(
+ collect_stats_functor, COLLECT_STATS_TASK_NAME,
+ std::chrono::seconds(3), std::chrono::seconds(3));
}
const char* PushConsumerImpl::SCAN_ASSIGNMENT_TASK_NAME =
"scan-assignment-task";
@@ -191,10 +194,33 @@ void PushConsumerImpl::scanAssignments() {
}
bool PushConsumerImpl::selectBroker(const TopicRouteDataPtr& topic_route_data,
std::string& broker_host) {
+
+ absl::flat_hash_set<std::string> endpoints;
+ endpointsInUse(endpoints);
+ if (endpoints.empty()) {
+ SPDLOG_WARN("No broker is available");
+ return false;
+ }
+
+ // preference for selecting the access point filled in by the user
if (topic_route_data && !topic_route_data->messageQueues().empty()) {
+ uint32_t queue_count = topic_route_data->messageQueues().size();
uint32_t index = TopicAssignment::getAndIncreaseQueryWhichBroker();
- for (uint32_t i = index; i < index +
topic_route_data->messageQueues().size(); i++) {
- auto message_queue = topic_route_data->messageQueues().at(i %
topic_route_data->messageQueues().size());
+ for (uint32_t i = index; i < index + queue_count; i++) {
+ auto message_queue = topic_route_data->messageQueues().at(i %
queue_count);
+ if (MixAll::MASTER_BROKER_ID != message_queue.broker().id() ||
!readable(message_queue.permission())) {
+ continue;
+ }
+
+ std::string current_host = urlOf(message_queue);
+ if (endpoints.contains(current_host)) {
+ broker_host = current_host;
+ return true;
+ }
+ }
+
+ for (uint32_t i = index; i < index + queue_count; i++) {
+ auto message_queue = topic_route_data->messageQueues().at(i %
queue_count);
if (MixAll::MASTER_BROKER_ID != message_queue.broker().id() ||
!readable(message_queue.permission())) {
continue;
}
@@ -527,8 +553,7 @@ void PushConsumerImpl::buildClientSettings(rmq::Settings&
settings) {
void PushConsumerImpl::prepareHeartbeatData(HeartbeatRequest& request) {
request.set_client_type(rmq::ClientType::PUSH_CONSUMER);
- request.mutable_group()->set_resource_namespace(resourceNamespace());
- request.mutable_group()->set_name(groupName());
+ request.mutable_group()->CopyFrom(client_config_.subscriber.group);
}
void PushConsumerImpl::notifyClientTermination() {
diff --git a/cpp/source/rocketmq/SendContext.cpp
b/cpp/source/rocketmq/SendContext.cpp
index bd97384d..5f6b8fdc 100644
--- a/cpp/source/rocketmq/SendContext.cpp
+++ b/cpp/source/rocketmq/SendContext.cpp
@@ -57,6 +57,7 @@ void SendContext::onSuccess(const SendResult& send_result)
noexcept {
send_receipt.target = send_result.target;
send_receipt.message_id = send_result.message_id;
send_receipt.transaction_id = send_result.transaction_id;
+ send_receipt.recall_handle = send_result.recall_handle;
send_receipt.message = std::move(message_);
callback_(send_result.ec, send_receipt);
}
diff --git a/cpp/source/rocketmq/SimpleConsumer.cpp
b/cpp/source/rocketmq/SimpleConsumer.cpp
index 2b5e79b0..8acf16ac 100644
--- a/cpp/source/rocketmq/SimpleConsumer.cpp
+++ b/cpp/source/rocketmq/SimpleConsumer.cpp
@@ -125,6 +125,7 @@ SimpleConsumer SimpleConsumerBuilder::build() {
simple_consumer.impl_->withRequestTimeout(configuration_.requestTimeout());
simple_consumer.impl_->withNameServerResolver(std::make_shared<StaticNameServerResolver>(configuration_.endpoints()));
+
simple_consumer.impl_->withResourceNamespace(configuration_.resourceNamespace());
simple_consumer.impl_->withCredentialsProvider(configuration_.credentialsProvider());
simple_consumer.impl_->withReceiveMessageTimeout(await_duration_);
simple_consumer.impl_->withSsl(configuration_.withSsl());
diff --git a/cpp/source/rocketmq/SimpleConsumerImpl.cpp
b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
index e0a78eeb..2441bb2d 100644
--- a/cpp/source/rocketmq/SimpleConsumerImpl.cpp
+++ b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
@@ -85,6 +85,7 @@ void SimpleConsumerImpl::start() {
ClientImpl::start();
State expected = State::STARTING;
if (state_.compare_exchange_strong(expected, State::STARTED,
std::memory_order_relaxed)) {
+
client_config_.subscriber.group.set_resource_namespace(resourceNamespace());
refreshAssignments();
std::weak_ptr<SimpleConsumerImpl> consumer(shared_from_this());
@@ -301,8 +302,23 @@ void SimpleConsumerImpl::receive(std::size_t limit,
callback(ec, messages);
return;
}
- std::size_t idx = ++assignment_index_ % assignments_.size();
- assignment.CopyFrom(assignments_[idx]);
+
+ // choose assign allow readable
+ std::size_t start_index = ++assignment_index_ % assignments_.size();
+ for (std::size_t i = 0; i < assignments_.size(); ++i) {
+ const auto& assign = assignments_[(start_index + i) %
assignments_.size()];
+ if (readable(assign.message_queue().permission())) {
+ assignment.CopyFrom(assign);
+ break;
+ }
+ }
+
+ if (!assignment.IsInitialized()) {
+ std::error_code ec = ErrorCode::NotFound;
+ std::vector<MessageConstSharedPtr> messages;
+ callback(ec, messages);
+ return;
+ }
}
const auto& target = urlOf(assignment.message_queue());
diff --git a/cpp/source/rocketmq/TopicPublishInfo.cpp
b/cpp/source/rocketmq/TopicPublishInfo.cpp
index 6eeece4a..a94013a5 100644
--- a/cpp/source/rocketmq/TopicPublishInfo.cpp
+++ b/cpp/source/rocketmq/TopicPublishInfo.cpp
@@ -70,8 +70,13 @@ bool
TopicPublishInfo::selectMessageQueues(absl::optional<std::string> messa
absl::MutexLock lock(&queue_list_mtx_);
for (std::vector<rmq::MessageQueue>::size_type i = 0; i <
queue_list_.size(); ++i) {
const rmq::MessageQueue& message_queue = queue_list_[index++ %
(queue_list_.size())];
+ if (!writable(message_queue.permission())) {
+ continue;
+ }
+
if (!producer->isEndpointIsolated(urlOf(message_queue))) {
- auto search = std::find_if(result.begin(), result.end(), [&](const
rmq::MessageQueue& item) {
+ auto search = std::find_if(
+ result.begin(), result.end(), [&](const rmq::MessageQueue& item) {
return item.broker().name() == message_queue.broker().name();
});
if (std::end(result) == search) {
diff --git a/cpp/source/rocketmq/include/ClientImpl.h
b/cpp/source/rocketmq/include/ClientImpl.h
index 25cef46c..9b6a6502 100644
--- a/cpp/source/rocketmq/include/ClientImpl.h
+++ b/cpp/source/rocketmq/include/ClientImpl.h
@@ -90,6 +90,10 @@ public:
client_config_.credentials_provider = std::move(credentials_provider);
}
+ void withResourceNamespace(std::string resource_namespace) {
+ client_config_.resource_namespace = std::move(resource_namespace);
+ }
+
void withRequestTimeout(std::chrono::milliseconds request_timeout) {
client_config_.request_timeout = absl::FromChrono(request_timeout);
}
diff --git a/cpp/source/rocketmq/include/ProcessQueueImpl.h
b/cpp/source/rocketmq/include/ProcessQueueImpl.h
index 822f7c04..75a5d2d3 100644
--- a/cpp/source/rocketmq/include/ProcessQueueImpl.h
+++ b/cpp/source/rocketmq/include/ProcessQueueImpl.h
@@ -128,6 +128,7 @@ private:
std::atomic<uint64_t> cached_message_memory_;
void popMessage();
+
void wrapPopMessageRequest(absl::flat_hash_map<std::string, std::string>&
metadata,
rmq::ReceiveMessageRequest& request);
diff --git a/cpp/source/rocketmq/include/ProducerImpl.h
b/cpp/source/rocketmq/include/ProducerImpl.h
index 2cb9c44e..237c242e 100644
--- a/cpp/source/rocketmq/include/ProducerImpl.h
+++ b/cpp/source/rocketmq/include/ProducerImpl.h
@@ -30,6 +30,7 @@
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "rocketmq/Message.h"
+#include "rocketmq/RecallReceipt.h"
#include "rocketmq/SendCallback.h"
#include "rocketmq/SendReceipt.h"
#include "rocketmq/TransactionChecker.h"
@@ -79,6 +80,11 @@ public:
*/
SendReceipt send(MessageConstPtr message, std::error_code& ec, Transaction&
transaction);
+ /**
+ * Recall message synchronously, only delay message is supported for now.
+ */
+ RecallReceipt recall(const std::string& topic, std::string& recall_handle,
std::error_code& ec) noexcept;
+
/**
* Check if the RPC client for the target host is isolated or not
* @param endpoint Address of target host.
diff --git a/cpp/source/rocketmq/include/PushConsumerImpl.h
b/cpp/source/rocketmq/include/PushConsumerImpl.h
index 7a4ff1a3..8f360fda 100644
--- a/cpp/source/rocketmq/include/PushConsumerImpl.h
+++ b/cpp/source/rocketmq/include/PushConsumerImpl.h
@@ -72,7 +72,7 @@ public:
void scanAssignments() LOCKS_EXCLUDED(topic_filter_expression_table_mtx_);
- static bool selectBroker(const TopicRouteDataPtr& route, std::string&
broker_host);
+ bool selectBroker(const TopicRouteDataPtr& route, std::string& broker_host);
void wrapQueryAssignmentRequest(const std::string& topic,
const std::string& consumer_group,