This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f688ff748fe [improve][broker] Close connection when close consumer 
write fails (#25520)
f688ff748fe is described below

commit f688ff748fe49d06ffa0829ce61d51707584f712
Author: Zixuan Liu <[email protected]>
AuthorDate: Wed Apr 15 10:45:02 2026 +0800

    [improve][broker] Close connection when close consumer write fails (#25520)
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 17 +++++++++--
 .../pulsar/broker/service/ServerCnxTest.java       | 33 ++++++++++++++++++++++
 .../pulsar/common/protocol/PulsarHandler.java      |  3 +-
 3 files changed, 49 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index c81d3e5fb04..43f4abd6156 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -3405,16 +3405,27 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
             assignedBrokerLookupData.ifPresentOrElse(lookup -> {
                         LookupData lookupData = getLookupData(lookup);
-                        writeAndFlush(Commands.newCloseConsumer(consumerId, 
-1L,
+                        
writeCloseConsumerAndCloseConnectionOnFailure(Commands.newCloseConsumer(consumerId,
 -1L,
                                 lookupData.getBrokerUrl(),
-                                lookupData.getBrokerUrlTls()));
+                                lookupData.getBrokerUrlTls()), consumerId);
                     },
-                    () -> writeAndFlush(Commands.newCloseConsumer(consumerId, 
-1L, null, null)));
+                    () -> writeCloseConsumerAndCloseConnectionOnFailure(
+                            Commands.newCloseConsumer(consumerId, -1L, null, 
null), consumerId));
         } else {
             close();
         }
     }
 
+    private void writeCloseConsumerAndCloseConnectionOnFailure(ByteBuf cmd, 
long consumerId) {
+        ctx.writeAndFlush(cmd).addListener(future -> {
+            if (!future.isSuccess()) {
+                log.warn("[{}] Forcing connection to close since cannot send 
close consumer command for consumer {}",
+                        remoteAddress, consumerId, future.cause());
+                close();
+            }
+        });
+    }
+
     /**
      * It closes the connection with client which triggers {@code 
channelInactive()} which clears all producers and
      * consumers from connection-map.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 5cbdaf22bc3..0247fbc637e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -45,10 +45,14 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.DefaultChannelId;
 import io.netty.channel.embedded.EmbeddedChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 import io.vertx.core.impl.ConcurrentHashSet;
 import java.io.Closeable;
 import java.io.IOException;
@@ -2488,6 +2492,35 @@ public class ServerCnxTest {
         channel.finish();
     }
 
+    @Test
+    public void testCloseConsumerClosesConnectionWhenWriteFails() throws 
Exception {
+        resetChannel();
+        setChannelConnected();
+
+        var ctx = mock(ChannelHandlerContext.class);
+        var writeFuture = mock(ChannelFuture.class);
+        var writeFailure = new RuntimeException("close consumer write failed");
+        when(ctx.writeAndFlush(any())).thenReturn(writeFuture);
+        when(writeFuture.isSuccess()).thenReturn(false);
+        when(writeFuture.cause()).thenReturn(writeFailure);
+        when(writeFuture.addListener(any())).thenAnswer(invocation -> {
+            GenericFutureListener<Future<? super Void>>  listener = 
invocation.getArgument(0);
+            listener.operationComplete(writeFuture);
+            return writeFuture;
+        });
+        serverCnx.setCtx(ctx);
+
+        var consumer = mock(Consumer.class);
+        when(consumer.consumerId()).thenReturn(1L);
+
+        
serverCnx.setRemoteEndpointProtocolVersion(ProtocolVersion.v12.getValue());
+
+        serverCnx.closeConsumer(consumer, Optional.empty());
+
+        verify(ctx).writeAndFlush(any());
+        verify(ctx).close();
+    }
+
     @Test(timeOut = 30000)
     public void testSubscribeCommand() throws Exception {
         final String failSubName = "failSub";
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
index eecff3686a7..17dcd6ac085 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
@@ -53,7 +53,8 @@ public abstract class PulsarHandler extends PulsarDecoder {
         return remoteEndpointProtocolVersion;
     }
 
-    protected void setRemoteEndpointProtocolVersion(int 
remoteEndpointProtocolVersion) {
+    @VisibleForTesting
+    public void setRemoteEndpointProtocolVersion(int 
remoteEndpointProtocolVersion) {
         this.remoteEndpointProtocolVersion = remoteEndpointProtocolVersion;
     }
 

Reply via email to