This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new ff413722b21 [fix][broker] Fix race condition in ServerCnx
producer/consumer async callbacks (#25352)
ff413722b21 is described below
commit ff413722b218900693e6130b91580448132dbf67
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Mar 19 01:19:39 2026 -0700
[fix][broker] Fix race condition in ServerCnx producer/consumer async
callbacks (#25352)
---
.../apache/pulsar/broker/service/ServerCnx.java | 62 +++++++++++-----------
.../pulsar/broker/service/ServerCnxTest.java | 7 ++-
2 files changed, 37 insertions(+), 32 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 7ee003d7ace..102ac962982 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1356,7 +1356,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
CompletableFuture<Consumer> existingConsumerFuture =
consumers.putIfAbsent(consumerId, consumerFuture);
- isAuthorizedFuture.thenApply(isAuthorized -> {
+ isAuthorizedFuture.thenApplyAsync(isAuthorized -> {
if (isAuthorized) {
if (log.isDebugEnabled()) {
log.debug("[{}] Client is authorized to subscribe with
role {}",
@@ -1488,7 +1488,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
});
})
- .thenAccept(consumer -> {
+ .thenAcceptAsync(consumer -> {
if (consumer.checkAndApplyTopicMigration()) {
log.info("[{}] Disconnecting consumer {} on
migrated subscription on topic {} / {}",
remoteAddress, consumerId,
subscriptionName, topicName);
@@ -1522,8 +1522,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
consumers.remove(consumerId, consumerFuture);
}
- })
- .exceptionally(exception -> {
+ }, ctx.executor())
+ .exceptionallyAsync(exception -> {
if (exception.getCause() instanceof
ConsumerBusyException) {
if (log.isDebugEnabled()) {
log.debug(
@@ -1571,7 +1571,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return null;
- });
+ }, ctx.executor());
} else {
String msg = "Client is not authorized to subscribe";
log.warn("[{}] {} with role {}", remoteAddress, msg,
getPrincipal());
@@ -1579,12 +1579,12 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
writeAndFlush(Commands.newError(requestId,
ServerError.AuthorizationError, msg));
}
return null;
- }).exceptionally(ex -> {
+ }, ctx.executor()).exceptionallyAsync(ex -> {
logAuthException(remoteAddress, "subscribe", getPrincipal(),
Optional.of(topicName), ex);
consumers.remove(consumerId, consumerFuture);
commandSender.sendErrorResponse(requestId,
ServerError.AuthorizationError, ex.getMessage());
return null;
- });
+ }, ctx.executor());
}
private SchemaData getSchema(Schema protocolSchema) {
@@ -1640,7 +1640,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
(canProduce, canSubscribe) -> canProduce &&
canSubscribe);
}
- isAuthorizedFuture.thenApply(isAuthorized -> {
+ isAuthorizedFuture.thenApplyAsync(isAuthorized -> {
if (!isAuthorized) {
String msg = "Client is not authorized to Produce";
log.warn("[{}] {} with role {}", remoteAddress, msg,
getPrincipal());
@@ -1687,7 +1687,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
topicName, producerId, producerName, schema == null ?
"absent" : "present");
}
- service.getOrCreateTopic(topicName.toString()).thenCompose((Topic
topic) -> {
+
service.getOrCreateTopic(topicName.toString()).thenComposeAsync((Topic topic)
-> {
// Check max producer limitation to avoid unnecessary ops
wasting resources. For example: the new
// producer reached max producer limitation, but pulsar did
schema check first, it would waste CPU
if (((AbstractTopic) topic).isProducersExceeded(producerName))
{
@@ -1703,7 +1703,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
topic.checkBacklogQuotaExceeded(producerName,
BacklogQuotaType.destination_storage),
topic.checkBacklogQuotaExceeded(producerName,
BacklogQuotaType.message_age));
- backlogQuotaCheckFuture.thenRun(() -> {
+ backlogQuotaCheckFuture.thenRunAsync(() -> {
// Check whether the producer will publish encrypted
messages or not
if ((topic.isEncryptionRequired() ||
encryptionRequireOnProducer)
&& !isEncrypted
@@ -1721,7 +1721,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
CompletableFuture<SchemaVersion> schemaVersionFuture =
tryAddSchema(topic, schema);
- schemaVersionFuture.exceptionally(exception -> {
+ schemaVersionFuture.exceptionallyAsync(exception -> {
if (producerFuture.completeExceptionally(exception)) {
String message = exception.getMessage();
if (exception.getCause() != null) {
@@ -1745,9 +1745,9 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
producers.remove(producerId, producerFuture);
return null;
- });
+ }, ctx.executor());
- schemaVersionFuture.thenAccept(schemaVersion -> {
+ schemaVersionFuture.thenAcceptAsync(schemaVersion -> {
CompletionStage<Subscription> createInitSubFuture;
if (!Strings.isNullOrEmpty(initialSubscriptionName)
&& topic.isPersistent()
@@ -1767,7 +1767,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
createInitSubFuture =
CompletableFuture.completedFuture(null);
}
- createInitSubFuture.whenComplete((sub, ex) -> {
+ createInitSubFuture.whenCompleteAsync((sub, ex) -> {
if (ex != null) {
final Throwable rc =
FutureUtil.unwrapCompletionException(ex);
if (rc instanceof
BrokerServiceException.NotAllowedException) {
@@ -1795,11 +1795,11 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
buildProducerAndAddTopic(topic, producerId,
producerName, requestId, isEncrypted,
metadata, schemaVersion, epoch,
userProvidedProducerName, topicName,
producerAccessMode, topicEpoch,
supportsPartialProducer, producerFuture);
- });
- });
- });
+ }, ctx.executor());
+ }, ctx.executor());
+ }, ctx.executor());
return backlogQuotaCheckFuture;
- }).exceptionally(exception -> {
+ }, ctx.executor()).exceptionallyAsync(exception -> {
Throwable cause = exception.getCause();
if (cause instanceof
BrokerServiceException.TopicBacklogQuotaExceededException) {
BrokerServiceException.TopicBacklogQuotaExceededException
tbqe =
@@ -1858,13 +1858,13 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
producers.remove(producerId, producerFuture);
return null;
- });
+ }, ctx.executor());
return null;
- }).exceptionally(ex -> {
+ }, ctx.executor()).exceptionallyAsync(ex -> {
logAuthException(remoteAddress, "producer", getPrincipal(),
Optional.of(topicName), ex);
commandSender.sendErrorResponse(requestId,
ServerError.AuthorizationError, ex.getMessage());
return null;
- });
+ }, ctx.executor());
}
private void buildProducerAndAddTopic(Topic topic, long producerId, String
producerName, long requestId,
@@ -1878,7 +1878,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
getPrincipal(), isEncrypted, metadata, schemaVersion, epoch,
userProvidedProducerName, producerAccessMode, topicEpoch,
supportsPartialProducer);
- topic.addProducer(producer,
producerQueuedFuture).thenAccept(newTopicEpoch -> {
+ topic.addProducer(producer,
producerQueuedFuture).thenAcceptAsync(newTopicEpoch -> {
if (isActive()) {
if (producerFuture.complete(producer)) {
log.info("[{}] Created new producer: {}, role: {}",
remoteAddress, producer, getPrincipal());
@@ -1911,7 +1911,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
producers.remove(producerId, producerFuture);
- }).exceptionallyAsync(ex -> {
+ }, ctx.executor()).exceptionallyAsync(ex -> {
if (ex.getCause() instanceof
BrokerServiceException.TopicMigratedException) {
Optional<ClusterUrl> clusterURL =
getMigratedClusterUrl(service.getPulsar(), topic.getName());
if (clusterURL.isPresent()) {
@@ -1954,7 +1954,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return null;
}, ctx.executor());
- producerQueuedFuture.thenRun(() -> {
+ producerQueuedFuture.thenRunAsync(() -> {
// If the producer is queued waiting, we will get an immediate
notification
// that we need to pass to client
if (isActive()) {
@@ -1967,7 +1967,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
producerCreated(this, producer, metadata);
}
}
- });
+ }, ctx.executor());
}
@Override
protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
@@ -2297,7 +2297,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
log.info("[{}][{}] Closing producer on cnx {}. producerId={}",
producer.getTopic(), producer.getProducerName(),
remoteAddress, producerId);
- producer.close(true).thenAccept(v -> {
+ producer.close(true).thenAcceptAsync(v -> {
log.info("[{}][{}] Closed producer on cnx {}. producerId={}",
producer.getTopic(), producer.getProducerName(),
remoteAddress, producerId);
@@ -2306,7 +2306,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
if (brokerInterceptor != null) {
brokerInterceptor.producerClosed(this, producer,
producer.getMetadata());
}
- });
+ }, ctx.executor());
}
@Override
@@ -3432,11 +3432,11 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
CompletableFuture<Producer> future = producers.get(producerId);
if (future != null) {
- future.whenComplete((producer2, exception) -> {
+ future.whenCompleteAsync((producer2, exception) -> {
if (exception != null || producer2 == producer) {
producers.remove(producerId, future);
}
- });
+ }, ctx.executor());
}
}
@@ -3447,11 +3447,11 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
CompletableFuture<Consumer> future = consumers.get(consumerId);
if (future != null) {
- future.whenComplete((consumer2, exception) -> {
+ future.whenCompleteAsync((consumer2, exception) -> {
if (exception != null || consumer2 == consumer) {
consumers.remove(consumerId, future);
}
- });
+ }, ctx.executor());
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 9044f4a910f..a7986d95dbe 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -2385,7 +2385,10 @@ public class ServerCnxTest {
assertEquals(((CommandError) response).getRequestId(), 5);
// We should receive response for 1st producer, since it was not
cancelled by the close
- Awaitility.await().untilAsserted(() ->
assertFalse(channel.outboundMessages().isEmpty()));
+ Awaitility.await().untilAsserted(() -> {
+ channel.runPendingTasks();
+ assertFalse(channel.outboundMessages().isEmpty());
+ });
assertTrue(channel.isActive());
response = getResponse();
@@ -2893,6 +2896,8 @@ public class ServerCnxTest {
final long sleepTimeMs = 10;
final long iterations = TimeUnit.SECONDS.toMillis(10) / sleepTimeMs;
for (int i = 0; i < iterations; i++) {
+ // Execute tasks submitted to ctx.executor() via
thenAcceptAsync/thenRunAsync etc.
+ channel.runPendingTasks();
if (!channel.outboundMessages().isEmpty()) {
Object outObject = channel.outboundMessages().remove();
Object cmd = clientChannelHelper.getCommand(outObject);