This is an automated email from the ASF dual-hosted git repository.
BewareMyPower pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 0a9e017 [fix][client-cpp] Fail producers immediately when topic is
terminated (#567)
0a9e017 is described below
commit 0a9e01771b197f777c393093159c2911ec3629b0
Author: dwang-qm <[email protected]>
AuthorDate: Tue Apr 21 20:19:15 2026 -0700
[fix][client-cpp] Fail producers immediately when topic is terminated (#567)
---
lib/HandlerBase.cc | 1 +
lib/HandlerBase.h | 2 ++
lib/ProducerImpl.cc | 7 +++++--
lib/ResultUtils.h | 1 +
tests/ProducerTest.cc | 44 ++++++++++++++++++++++++++++++++++++++++++++
tests/PulsarFriend.h | 5 +++++
6 files changed, 58 insertions(+), 2 deletions(-)
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index 1a4f573..bc8cd53 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -171,6 +171,7 @@ void HandlerBase::handleDisconnection(Result result, const
ClientConnectionPtr&
case Closing:
case Closed:
case Producer_Fenced:
+ case Terminated:
case Failed:
LOG_DEBUG(getName() << "Ignoring connection closed event since the
handler is not used anymore");
break;
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index 0e733f0..229404e 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -138,6 +138,8 @@ class HandlerBase : public
std::enable_shared_from_this<HandlerBase> {
Failed, // Handler is failed, in Java client:
HandlerState.State.Failed
Producer_Fenced, // The producer has been fenced by the broker
// in Java client: HandlerState.State.ProducerFenced
+ Terminated, // The topic has been terminatedproducer has been
fenced by the broker
+ // in Java client: HandlerState.State.Terminated
};
std::atomic<State> state_;
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index d8f2dba..7632581 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -273,8 +273,8 @@ Result ProducerImpl::handleCreateProducer(const
ClientConnectionPtr& cnx, Result
}
}
- if (result == ResultProducerFenced) {
- state_ = Producer_Fenced;
+ if (result == ResultProducerFenced || result == ResultTopicTerminated)
{
+ state_ = result == ResultProducerFenced ? Producer_Fenced :
Terminated;
failPendingMessages(result, false);
auto client = client_.lock();
if (client) {
@@ -450,6 +450,9 @@ bool ProducerImpl::isValidProducerState(const SendCallback&
callback) const {
case HandlerBase::Producer_Fenced:
callback(ResultProducerFenced, {});
return false;
+ case HandlerBase::Terminated:
+ callback(ResultTopicTerminated, {});
+ return false;
case HandlerBase::NotStarted:
case HandlerBase::Failed:
default:
diff --git a/lib/ResultUtils.h b/lib/ResultUtils.h
index cf4ff1f..a3d2ec4 100644
--- a/lib/ResultUtils.h
+++ b/lib/ResultUtils.h
@@ -39,6 +39,7 @@ inline bool isResultRetryable(Result result) {
ResultInvalidConfiguration,
ResultIncompatibleSchema,
ResultTopicNotFound,
+ ResultTopicTerminated,
ResultOperationNotSupported,
ResultNotAllowedError,
ResultChecksumError,
diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc
index 4220a2e..d0552df 100644
--- a/tests/ProducerTest.cc
+++ b/tests/ProducerTest.cc
@@ -215,6 +215,50 @@ TEST(ProducerTest, testBacklogQuotasExceeded) {
client.close();
}
+TEST(ProducerTest, testCreateProducerAfterTopicTermination) {
+ const auto topicName = "testCreateProducerAfterTopicTermination-" +
std::to_string(time(nullptr));
+ const auto topic = "persistent://public/default/" + topicName;
+
+ Client client(serviceUrl,
ClientConfiguration().setOperationTimeoutSeconds(1));
+
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+ ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setContent("content").build()));
+ ASSERT_EQ(ResultOk, producer.close());
+
+ const auto httpCode =
+ makePostRequest(adminUrl + "admin/v2/persistent/public/default/" +
topicName + "/terminate", "");
+ ASSERT_EQ(200, httpCode) << "httpCode: " << httpCode;
+
+ Producer terminatedProducer;
+ ASSERT_EQ(ResultTopicTerminated, client.createProducer(topic,
terminatedProducer));
+
+ client.close();
+}
+
+TEST(ProducerTest, testSendAfterTopicTerminationReconnect) {
+ const auto topicName = "testSendAfterTopicTerminationReconnect-" +
std::to_string(time(nullptr));
+ const auto topic = "persistent://public/default/" + topicName;
+
+ Client client(serviceUrl,
ClientConfiguration().setOperationTimeoutSeconds(1));
+
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+ ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setContent("before-terminate").build()));
+
+ const auto httpCode =
+ makePostRequest(adminUrl + "admin/v2/persistent/public/default/" +
topicName + "/terminate", "");
+ ASSERT_EQ(200, httpCode) << "httpCode: " << httpCode;
+
+ PulsarFriend::getProducerImpl(producer).disconnectProducer();
+ ASSERT_TRUE(
+ waitUntil(std::chrono::seconds(3), [&producer] { return
PulsarFriend::isTerminated(producer); }));
+
+ ASSERT_EQ(ResultTopicTerminated,
producer.send(MessageBuilder().setContent("after-terminate").build()));
+
+ client.close();
+}
+
class ProducerTest : public ::testing::TestWithParam<bool> {};
TEST_P(ProducerTest, testMaxMessageSize) {
diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h
index 3296953..8017e0b 100644
--- a/tests/PulsarFriend.h
+++ b/tests/PulsarFriend.h
@@ -257,6 +257,11 @@ class PulsarFriend {
return waitUntil(std::chrono::seconds(3),
[producerImpl] { return
!producerImpl->getCnx().expired(); });
}
+
+ static bool isTerminated(Producer producer) {
+ auto producerImpl =
std::dynamic_pointer_cast<ProducerImpl>(producer.impl_);
+ return producerImpl && producerImpl->state_ == HandlerBase::Terminated;
+ }
};
} // namespace pulsar