This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 90fb1c0a4ef506447c59b2f3ce44e2c939de3dc1 Author: sinan liu <[email protected]> AuthorDate: Sat Apr 4 06:11:47 2026 +0800 [fix][broker] Prevent timed-out producer creation from racing with retry (#25460) (cherry picked from commit 56442c27907f3e80e4876404ff31063811a92fba) --- .../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 336dfc5c9a0..f316b938c78 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1873,6 +1873,13 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { ProducerAccessMode producerAccessMode, Optional<Long> topicEpoch, boolean supportsPartialProducer, CompletableFuture<Producer> producerFuture){ + if (producerFuture.isCompletedExceptionally()) { + log.info("[{}] Skipped producer creation after timeout on client side. producerId={}, producerName={}", + remoteAddress, producerId, producerName); + producers.remove(producerId, producerFuture); + return; + } + CompletableFuture<Void> producerQueuedFuture = new CompletableFuture<>(); Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, getPrincipal(), isEncrypted, metadata, schemaVersion, epoch,
