This is an automated email from the ASF dual-hosted git repository. zhaocong pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 11f30ad30393749c32c42d7bcfe724b99077ff36 Author: Matteo Merli <[email protected]> AuthorDate: Mon Mar 16 00:23:17 2026 -0700 [fix][client] Fail messages immediately in ProducerImpl when in terminal state (#25317) --- .../broker/service/TopicTerminationTest.java | 3 ++- .../apache/pulsar/client/impl/ProducerImpl.java | 30 +++++++++++++++++++++- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java index a2436e4360e..be4cc80e370 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java @@ -140,6 +140,7 @@ public class TopicTerminationTest extends BrokerTestBase { Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) .enableBatching(false) .messageRoutingMode(MessageRoutingMode.SinglePartition) + .sendTimeout(5, TimeUnit.SECONDS) .create(); CyclicBarrier barrier = new CyclicBarrier(2); @@ -168,7 +169,7 @@ public class TopicTerminationTest extends BrokerTestBase { boolean alreadyFailed = false; try { - FutureUtil.waitForAll(futures).get(); + FutureUtil.waitForAll(futures).get(10, TimeUnit.SECONDS); } catch (Exception e) { // Ignore for now, check is below } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 457c9a52fa3..aa818790b4c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -1042,6 +1042,23 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } } + private PulsarClientException getTerminalException(State state) { + switch (state) { + case Terminated: + return new PulsarClientException.TopicTerminatedException( + format("The topic %s that the producer %s produces to has been terminated", topic, + producerName)); + case Closed: + return new PulsarClientException.AlreadyClosedException( + format("The producer %s of the topic %s was already closed", producerName, topic)); + case ProducerFenced: + return new PulsarClientException.ProducerFencedException( + format("The producer %s of the topic %s was fenced", producerName, topic)); + default: + return new PulsarClientException.NotConnectedException(); + } + } + private boolean isValidProducerState(SendCallback callback, long sequenceId) { switch (getState()) { case Ready: @@ -2452,9 +2469,20 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne op.cmd.release(); return; } + final State state = getState(); + if (state == State.Terminated || state == State.Closed || state == State.ProducerFenced) { + // The producer is in a terminal state and will never reconnect. Fail the message immediately + // rather than leaving it stuck in pendingMessages until sendTimeout. + releaseSemaphoreForSendOp(op); + client.getMemoryLimitController().releaseMemory(op.uncompressedSize); + op.sendComplete(getTerminalException(state)); + ReferenceCountUtil.safeRelease(op.cmd); + op.recycle(); + return; + } pendingMessages.add(op); updateLastSeqPushed(op); - if (State.RegisteringSchema.equals(getState())) { + if (State.RegisteringSchema.equals(state)) { // Since there is a in-progress schema registration, do not continuously publish to avoid break publish // ordering. After the schema registration finished, it will trigger a "recoverProcessOpSendMsgFrom" to // publish all messages in "pendingMessages".
