This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 90fb1c0a4ef506447c59b2f3ce44e2c939de3dc1
Author: sinan liu <[email protected]>
AuthorDate: Sat Apr 4 06:11:47 2026 +0800

    [fix][broker] Prevent timed-out producer creation from racing with retry 
(#25460)
    
    (cherry picked from commit 56442c27907f3e80e4876404ff31063811a92fba)
---
 .../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java  | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 336dfc5c9a0..f316b938c78 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1873,6 +1873,13 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                              ProducerAccessMode producerAccessMode,
                              Optional<Long> topicEpoch, boolean 
supportsPartialProducer,
                              CompletableFuture<Producer> producerFuture){
+        if (producerFuture.isCompletedExceptionally()) {
+            log.info("[{}] Skipped producer creation after timeout on client 
side. producerId={}, producerName={}",
+                    remoteAddress, producerId, producerName);
+            producers.remove(producerId, producerFuture);
+            return;
+        }
+
         CompletableFuture<Void> producerQueuedFuture = new 
CompletableFuture<>();
         Producer producer = new Producer(topic, ServerCnx.this, producerId, 
producerName,
                 getPrincipal(), isEncrypted, metadata, schemaVersion, epoch,

Reply via email to