This is an automated email from the ASF dual-hosted git repository.

BewareMyPower pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 0a9e017  [fix][client-cpp] Fail producers immediately when topic is 
terminated (#567)
0a9e017 is described below

commit 0a9e01771b197f777c393093159c2911ec3629b0
Author: dwang-qm <[email protected]>
AuthorDate: Tue Apr 21 20:19:15 2026 -0700

    [fix][client-cpp] Fail producers immediately when topic is terminated (#567)
---
 lib/HandlerBase.cc    |  1 +
 lib/HandlerBase.h     |  2 ++
 lib/ProducerImpl.cc   |  7 +++++--
 lib/ResultUtils.h     |  1 +
 tests/ProducerTest.cc | 44 ++++++++++++++++++++++++++++++++++++++++++++
 tests/PulsarFriend.h  |  5 +++++
 6 files changed, 58 insertions(+), 2 deletions(-)

diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index 1a4f573..bc8cd53 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -171,6 +171,7 @@ void HandlerBase::handleDisconnection(Result result, const 
ClientConnectionPtr&
         case Closing:
         case Closed:
         case Producer_Fenced:
+        case Terminated:
         case Failed:
             LOG_DEBUG(getName() << "Ignoring connection closed event since the 
handler is not used anymore");
             break;
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index 0e733f0..229404e 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -138,6 +138,8 @@ class HandlerBase : public 
std::enable_shared_from_this<HandlerBase> {
         Failed,           // Handler is failed, in Java client: 
HandlerState.State.Failed
         Producer_Fenced,  // The producer has been fenced by the broker
                           // in Java client: HandlerState.State.ProducerFenced
+        Terminated,       // The topic has been terminatedproducer has been 
fenced by the broker
+                          // in Java client: HandlerState.State.Terminated
     };
 
     std::atomic<State> state_;
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index d8f2dba..7632581 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -273,8 +273,8 @@ Result ProducerImpl::handleCreateProducer(const 
ClientConnectionPtr& cnx, Result
             }
         }
 
-        if (result == ResultProducerFenced) {
-            state_ = Producer_Fenced;
+        if (result == ResultProducerFenced || result == ResultTopicTerminated) 
{
+            state_ = result == ResultProducerFenced ? Producer_Fenced : 
Terminated;
             failPendingMessages(result, false);
             auto client = client_.lock();
             if (client) {
@@ -450,6 +450,9 @@ bool ProducerImpl::isValidProducerState(const SendCallback& 
callback) const {
         case HandlerBase::Producer_Fenced:
             callback(ResultProducerFenced, {});
             return false;
+        case HandlerBase::Terminated:
+            callback(ResultTopicTerminated, {});
+            return false;
         case HandlerBase::NotStarted:
         case HandlerBase::Failed:
         default:
diff --git a/lib/ResultUtils.h b/lib/ResultUtils.h
index cf4ff1f..a3d2ec4 100644
--- a/lib/ResultUtils.h
+++ b/lib/ResultUtils.h
@@ -39,6 +39,7 @@ inline bool isResultRetryable(Result result) {
                                                       
ResultInvalidConfiguration,
                                                       ResultIncompatibleSchema,
                                                       ResultTopicNotFound,
+                                                      ResultTopicTerminated,
                                                       
ResultOperationNotSupported,
                                                       ResultNotAllowedError,
                                                       ResultChecksumError,
diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc
index 4220a2e..d0552df 100644
--- a/tests/ProducerTest.cc
+++ b/tests/ProducerTest.cc
@@ -215,6 +215,50 @@ TEST(ProducerTest, testBacklogQuotasExceeded) {
     client.close();
 }
 
+TEST(ProducerTest, testCreateProducerAfterTopicTermination) {
+    const auto topicName = "testCreateProducerAfterTopicTermination-" + 
std::to_string(time(nullptr));
+    const auto topic = "persistent://public/default/" + topicName;
+
+    Client client(serviceUrl, 
ClientConfiguration().setOperationTimeoutSeconds(1));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+    ASSERT_EQ(ResultOk, 
producer.send(MessageBuilder().setContent("content").build()));
+    ASSERT_EQ(ResultOk, producer.close());
+
+    const auto httpCode =
+        makePostRequest(adminUrl + "admin/v2/persistent/public/default/" + 
topicName + "/terminate", "");
+    ASSERT_EQ(200, httpCode) << "httpCode: " << httpCode;
+
+    Producer terminatedProducer;
+    ASSERT_EQ(ResultTopicTerminated, client.createProducer(topic, 
terminatedProducer));
+
+    client.close();
+}
+
+TEST(ProducerTest, testSendAfterTopicTerminationReconnect) {
+    const auto topicName = "testSendAfterTopicTerminationReconnect-" + 
std::to_string(time(nullptr));
+    const auto topic = "persistent://public/default/" + topicName;
+
+    Client client(serviceUrl, 
ClientConfiguration().setOperationTimeoutSeconds(1));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+    ASSERT_EQ(ResultOk, 
producer.send(MessageBuilder().setContent("before-terminate").build()));
+
+    const auto httpCode =
+        makePostRequest(adminUrl + "admin/v2/persistent/public/default/" + 
topicName + "/terminate", "");
+    ASSERT_EQ(200, httpCode) << "httpCode: " << httpCode;
+
+    PulsarFriend::getProducerImpl(producer).disconnectProducer();
+    ASSERT_TRUE(
+        waitUntil(std::chrono::seconds(3), [&producer] { return 
PulsarFriend::isTerminated(producer); }));
+
+    ASSERT_EQ(ResultTopicTerminated, 
producer.send(MessageBuilder().setContent("after-terminate").build()));
+
+    client.close();
+}
+
 class ProducerTest : public ::testing::TestWithParam<bool> {};
 
 TEST_P(ProducerTest, testMaxMessageSize) {
diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h
index 3296953..8017e0b 100644
--- a/tests/PulsarFriend.h
+++ b/tests/PulsarFriend.h
@@ -257,6 +257,11 @@ class PulsarFriend {
         return waitUntil(std::chrono::seconds(3),
                          [producerImpl] { return 
!producerImpl->getCnx().expired(); });
     }
+
+    static bool isTerminated(Producer producer) {
+        auto producerImpl = 
std::dynamic_pointer_cast<ProducerImpl>(producer.impl_);
+        return producerImpl && producerImpl->state_ == HandlerBase::Terminated;
+    }
 };
 }  // namespace pulsar
 

Reply via email to