This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f688ff748fe [improve][broker] Close connection when close consumer
write fails (#25520)
f688ff748fe is described below
commit f688ff748fe49d06ffa0829ce61d51707584f712
Author: Zixuan Liu <[email protected]>
AuthorDate: Wed Apr 15 10:45:02 2026 +0800
[improve][broker] Close connection when close consumer write fails (#25520)
---
.../apache/pulsar/broker/service/ServerCnx.java | 17 +++++++++--
.../pulsar/broker/service/ServerCnxTest.java | 33 ++++++++++++++++++++++
.../pulsar/common/protocol/PulsarHandler.java | 3 +-
3 files changed, 49 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index c81d3e5fb04..43f4abd6156 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -3405,16 +3405,27 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
assignedBrokerLookupData.ifPresentOrElse(lookup -> {
LookupData lookupData = getLookupData(lookup);
- writeAndFlush(Commands.newCloseConsumer(consumerId,
-1L,
+
writeCloseConsumerAndCloseConnectionOnFailure(Commands.newCloseConsumer(consumerId,
-1L,
lookupData.getBrokerUrl(),
- lookupData.getBrokerUrlTls()));
+ lookupData.getBrokerUrlTls()), consumerId);
},
- () -> writeAndFlush(Commands.newCloseConsumer(consumerId,
-1L, null, null)));
+ () -> writeCloseConsumerAndCloseConnectionOnFailure(
+ Commands.newCloseConsumer(consumerId, -1L, null,
null), consumerId));
} else {
close();
}
}
+ private void writeCloseConsumerAndCloseConnectionOnFailure(ByteBuf cmd,
long consumerId) {
+ ctx.writeAndFlush(cmd).addListener(future -> {
+ if (!future.isSuccess()) {
+ log.warn("[{}] Forcing connection to close since cannot send
close consumer command for consumer {}",
+ remoteAddress, consumerId, future.cause());
+ close();
+ }
+ });
+ }
+
/**
* It closes the connection with client which triggers {@code
channelInactive()} which clears all producers and
* consumers from connection-map.
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 5cbdaf22bc3..0247fbc637e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -45,10 +45,14 @@ import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.DefaultChannelId;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import io.vertx.core.impl.ConcurrentHashSet;
import java.io.Closeable;
import java.io.IOException;
@@ -2488,6 +2492,35 @@ public class ServerCnxTest {
channel.finish();
}
+ @Test
+ public void testCloseConsumerClosesConnectionWhenWriteFails() throws
Exception {
+ resetChannel();
+ setChannelConnected();
+
+ var ctx = mock(ChannelHandlerContext.class);
+ var writeFuture = mock(ChannelFuture.class);
+ var writeFailure = new RuntimeException("close consumer write failed");
+ when(ctx.writeAndFlush(any())).thenReturn(writeFuture);
+ when(writeFuture.isSuccess()).thenReturn(false);
+ when(writeFuture.cause()).thenReturn(writeFailure);
+ when(writeFuture.addListener(any())).thenAnswer(invocation -> {
+ GenericFutureListener<Future<? super Void>> listener =
invocation.getArgument(0);
+ listener.operationComplete(writeFuture);
+ return writeFuture;
+ });
+ serverCnx.setCtx(ctx);
+
+ var consumer = mock(Consumer.class);
+ when(consumer.consumerId()).thenReturn(1L);
+
+
serverCnx.setRemoteEndpointProtocolVersion(ProtocolVersion.v12.getValue());
+
+ serverCnx.closeConsumer(consumer, Optional.empty());
+
+ verify(ctx).writeAndFlush(any());
+ verify(ctx).close();
+ }
+
@Test(timeOut = 30000)
public void testSubscribeCommand() throws Exception {
final String failSubName = "failSub";
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
index eecff3686a7..17dcd6ac085 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
@@ -53,7 +53,8 @@ public abstract class PulsarHandler extends PulsarDecoder {
return remoteEndpointProtocolVersion;
}
- protected void setRemoteEndpointProtocolVersion(int
remoteEndpointProtocolVersion) {
+ @VisibleForTesting
+ public void setRemoteEndpointProtocolVersion(int
remoteEndpointProtocolVersion) {
this.remoteEndpointProtocolVersion = remoteEndpointProtocolVersion;
}