This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 39fa6d5abab [fix][broker] Fix race condition in ServerCnx 
producer/consumer async callbacks (#25352)
39fa6d5abab is described below

commit 39fa6d5ababecea74b6183c822b006e441af5de5
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Mar 19 01:19:39 2026 -0700

    [fix][broker] Fix race condition in ServerCnx producer/consumer async 
callbacks (#25352)
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 62 +++++++++++-----------
 .../pulsar/broker/service/ServerCnxTest.java       |  7 ++-
 2 files changed, 37 insertions(+), 32 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4b63dd05c2d..f84f1c080ed 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1358,7 +1358,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
         CompletableFuture<Consumer> existingConsumerFuture =
                 consumers.putIfAbsent(consumerId, consumerFuture);
-        isAuthorizedFuture.thenApply(isAuthorized -> {
+        isAuthorizedFuture.thenApplyAsync(isAuthorized -> {
             if (isAuthorized) {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Client is authorized to subscribe with 
role {}",
@@ -1490,7 +1490,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                         }
                                     });
                         })
-                        .thenAccept(consumer -> {
+                        .thenAcceptAsync(consumer -> {
                             if (consumer.checkAndApplyTopicMigration()) {
                                 log.info("[{}] Disconnecting consumer {} on 
migrated subscription on topic {} / {}",
                                         remoteAddress, consumerId, 
subscriptionName, topicName);
@@ -1524,8 +1524,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                 consumers.remove(consumerId, consumerFuture);
                             }
 
-                        })
-                        .exceptionally(exception -> {
+                        }, ctx.executor())
+                        .exceptionallyAsync(exception -> {
                             if (exception.getCause() instanceof 
ConsumerBusyException) {
                                 if (log.isDebugEnabled()) {
                                     log.debug(
@@ -1573,7 +1573,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
                             return null;
 
-                        });
+                        }, ctx.executor());
             } else {
                 String msg = "Client is not authorized to subscribe";
                 log.warn("[{}] {} with role {}", remoteAddress, msg, 
getPrincipal());
@@ -1581,12 +1581,12 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 writeAndFlush(Commands.newError(requestId, 
ServerError.AuthorizationError, msg));
             }
             return null;
-        }).exceptionally(ex -> {
+        }, ctx.executor()).exceptionallyAsync(ex -> {
             logAuthException(remoteAddress, "subscribe", getPrincipal(), 
Optional.of(topicName), ex);
             consumers.remove(consumerId, consumerFuture);
             commandSender.sendErrorResponse(requestId, 
ServerError.AuthorizationError, ex.getMessage());
             return null;
-        });
+        }, ctx.executor());
     }
 
     private SchemaData getSchema(Schema protocolSchema) {
@@ -1642,7 +1642,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                             (canProduce, canSubscribe) -> canProduce && 
canSubscribe);
         }
 
-        isAuthorizedFuture.thenApply(isAuthorized -> {
+        isAuthorizedFuture.thenApplyAsync(isAuthorized -> {
             if (!isAuthorized) {
                 String msg = "Client is not authorized to Produce";
                 log.warn("[{}] {} with role {}", remoteAddress, msg, 
getPrincipal());
@@ -1689,7 +1689,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                         topicName, producerId, producerName, schema == null ? 
"absent" : "present");
             }
 
-            service.getOrCreateTopic(topicName.toString()).thenCompose((Topic 
topic) -> {
+            
service.getOrCreateTopic(topicName.toString()).thenComposeAsync((Topic topic) 
-> {
                 // Check max producer limitation to avoid unnecessary ops 
wasting resources. For example: the new
                 // producer reached max producer limitation, but pulsar did 
schema check first, it would waste CPU
                 if (((AbstractTopic) topic).isProducersExceeded(producerName)) 
{
@@ -1705,7 +1705,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                         topic.checkBacklogQuotaExceeded(producerName, 
BacklogQuotaType.destination_storage),
                         topic.checkBacklogQuotaExceeded(producerName, 
BacklogQuotaType.message_age));
 
-                backlogQuotaCheckFuture.thenRun(() -> {
+                backlogQuotaCheckFuture.thenRunAsync(() -> {
                     // Check whether the producer will publish encrypted 
messages or not
                     if ((topic.isEncryptionRequired() || 
encryptionRequireOnProducer)
                             && !isEncrypted
@@ -1723,7 +1723,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
                     CompletableFuture<SchemaVersion> schemaVersionFuture = 
tryAddSchema(topic, schema);
 
-                    schemaVersionFuture.exceptionally(exception -> {
+                    schemaVersionFuture.exceptionallyAsync(exception -> {
                         if (producerFuture.completeExceptionally(exception)) {
                             String message = exception.getMessage();
                             if (exception.getCause() != null) {
@@ -1747,9 +1747,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                         }
                         producers.remove(producerId, producerFuture);
                         return null;
-                    });
+                    }, ctx.executor());
 
-                    schemaVersionFuture.thenAccept(schemaVersion -> {
+                    schemaVersionFuture.thenAcceptAsync(schemaVersion -> {
                         CompletionStage<Subscription> createInitSubFuture;
                         if (!Strings.isNullOrEmpty(initialSubscriptionName)
                                 && topic.isPersistent()
@@ -1769,7 +1769,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                             createInitSubFuture = 
CompletableFuture.completedFuture(null);
                         }
 
-                        createInitSubFuture.whenComplete((sub, ex) -> {
+                        createInitSubFuture.whenCompleteAsync((sub, ex) -> {
                             if (ex != null) {
                                 final Throwable rc = 
FutureUtil.unwrapCompletionException(ex);
                                 if (rc instanceof 
BrokerServiceException.NotAllowedException) {
@@ -1797,11 +1797,11 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                             buildProducerAndAddTopic(topic, producerId, 
producerName, requestId, isEncrypted,
                                     metadata, schemaVersion, epoch, 
userProvidedProducerName, topicName,
                                     producerAccessMode, topicEpoch, 
supportsPartialProducer, producerFuture);
-                        });
-                    });
-                });
+                        }, ctx.executor());
+                    }, ctx.executor());
+                }, ctx.executor());
                 return backlogQuotaCheckFuture;
-            }).exceptionally(exception -> {
+            }, ctx.executor()).exceptionallyAsync(exception -> {
                 Throwable cause = exception.getCause();
                 if (cause instanceof 
BrokerServiceException.TopicBacklogQuotaExceededException) {
                     BrokerServiceException.TopicBacklogQuotaExceededException 
tbqe =
@@ -1860,13 +1860,13 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 }
                 producers.remove(producerId, producerFuture);
                 return null;
-            });
+            }, ctx.executor());
             return null;
-        }).exceptionally(ex -> {
+        }, ctx.executor()).exceptionallyAsync(ex -> {
             logAuthException(remoteAddress, "producer", getPrincipal(), 
Optional.of(topicName), ex);
             commandSender.sendErrorResponse(requestId, 
ServerError.AuthorizationError, ex.getMessage());
             return null;
-        });
+        }, ctx.executor());
     }
 
     private void buildProducerAndAddTopic(Topic topic, long producerId, String 
producerName, long requestId,
@@ -1880,7 +1880,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 getPrincipal(), isEncrypted, metadata, schemaVersion, epoch,
                 userProvidedProducerName, producerAccessMode, topicEpoch, 
supportsPartialProducer);
 
-        topic.addProducer(producer, 
producerQueuedFuture).thenAccept(newTopicEpoch -> {
+        topic.addProducer(producer, 
producerQueuedFuture).thenAcceptAsync(newTopicEpoch -> {
             if (isActive()) {
                 if (producerFuture.complete(producer)) {
                     log.info("[{}] Created new producer: {}, role: {}", 
remoteAddress, producer, getPrincipal());
@@ -1913,7 +1913,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             }
 
             producers.remove(producerId, producerFuture);
-        }).exceptionallyAsync(ex -> {
+        }, ctx.executor()).exceptionallyAsync(ex -> {
             if (ex.getCause() instanceof 
BrokerServiceException.TopicMigratedException) {
                 Optional<ClusterUrl> clusterURL = 
getMigratedClusterUrl(service.getPulsar(), topic.getName());
                 if (clusterURL.isPresent()) {
@@ -1956,7 +1956,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             return null;
         }, ctx.executor());
 
-        producerQueuedFuture.thenRun(() -> {
+        producerQueuedFuture.thenRunAsync(() -> {
             // If the producer is queued waiting, we will get an immediate 
notification
             // that we need to pass to client
             if (isActive()) {
@@ -1969,7 +1969,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                             producerCreated(this, producer, metadata);
                 }
             }
-        });
+        }, ctx.executor());
     }
     @Override
     protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
@@ -2299,7 +2299,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         log.info("[{}][{}] Closing producer on cnx {}. producerId={}",
                  producer.getTopic(), producer.getProducerName(), 
remoteAddress, producerId);
 
-        producer.close(true).thenAccept(v -> {
+        producer.close(true).thenAcceptAsync(v -> {
             log.info("[{}][{}] Closed producer on cnx {}. producerId={}",
                      producer.getTopic(), producer.getProducerName(),
                      remoteAddress, producerId);
@@ -2308,7 +2308,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             if (brokerInterceptor != null) {
                 brokerInterceptor.producerClosed(this, producer, 
producer.getMetadata());
             }
-        });
+        }, ctx.executor());
     }
 
     @Override
@@ -3439,11 +3439,11 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         }
         CompletableFuture<Producer> future = producers.get(producerId);
         if (future != null) {
-            future.whenComplete((producer2, exception) -> {
+            future.whenCompleteAsync((producer2, exception) -> {
                     if (exception != null || producer2 == producer) {
                         producers.remove(producerId, future);
                     }
-                });
+                }, ctx.executor());
         }
     }
 
@@ -3454,11 +3454,11 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         }
         CompletableFuture<Consumer> future = consumers.get(consumerId);
         if (future != null) {
-            future.whenComplete((consumer2, exception) -> {
+            future.whenCompleteAsync((consumer2, exception) -> {
                     if (exception != null || consumer2 == consumer) {
                         consumers.remove(consumerId, future);
                     }
-                });
+                }, ctx.executor());
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index f0907031882..2151eacd5d6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -2381,7 +2381,10 @@ public class ServerCnxTest {
             assertEquals(((CommandError) response).getRequestId(), 5);
 
             // We should receive response for 1st producer, since it was not 
cancelled by the close
-            Awaitility.await().untilAsserted(() -> 
assertFalse(channel.outboundMessages().isEmpty()));
+            Awaitility.await().untilAsserted(() -> {
+                channel.runPendingTasks();
+                assertFalse(channel.outboundMessages().isEmpty());
+            });
 
             assertTrue(channel.isActive());
             response = getResponse();
@@ -2889,6 +2892,8 @@ public class ServerCnxTest {
         final long sleepTimeMs = 10;
         final long iterations = TimeUnit.SECONDS.toMillis(10) / sleepTimeMs;
         for (int i = 0; i < iterations; i++) {
+            // Execute tasks submitted to ctx.executor() via 
thenAcceptAsync/thenRunAsync etc.
+            channel.runPendingTasks();
             if (!channel.outboundMessages().isEmpty()) {
                 Object outObject = channel.outboundMessages().remove();
                 Object cmd = clientChannelHelper.getCommand(outObject);

Reply via email to