This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4f8ffc8291d7f8bc40e5250e27c7926339445dd1
Author: Zixuan Liu <[email protected]>
AuthorDate: Fri Apr 24 10:53:23 2026 +0800

    [improve][client] Best-effort retry for individual/batch-index acks on send 
failure when ackReceiptEnabled=false (#25525)
    
    Co-authored-by: Copilot <[email protected]>
    (cherry picked from commit fae3df958f74f2afb0ba398b69d94fd3e6358c4f)
---
 .../org/apache/pulsar/client/impl/ClientCnx.java   | 26 +++---
 .../PersistentAcknowledgmentsGroupingTracker.java  | 75 ++++++++++++++---
 .../impl/AcknowledgementsGroupingTrackerTest.java  | 93 +++++++++++++++++++++-
 .../impl/ClientCnxRequestTimeoutQueueTest.java     |  1 +
 .../pulsar/client/impl/ClientTestFixtures.java     |  5 ++
 5 files changed, 173 insertions(+), 27 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index eb3162e8ffa..a51e3ac8796 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -1054,27 +1054,23 @@ public class ClientCnx extends PulsarHandler {
     }
 
     private <T> void sendRequestAndHandleTimeout(ByteBuf requestMessage, long 
requestId,
-                                                                 RequestType 
requestType, boolean flush,
-                                                                 
TimedCompletableFuture<T> future) {
+                                                 RequestType requestType, 
boolean flush,
+                                                 TimedCompletableFuture<T> 
future) {
         pendingRequests.put(requestId, future);
-        if (flush) {
-            ctx.writeAndFlush(requestMessage).addListener(writeFuture -> {
-                if (!writeFuture.isSuccess()) {
-                    if (pendingRequests.remove(requestId, future) && 
!future.isDone()) {
-                        log.warn("{} Failed to send {} to broker: {}", 
ctx.channel(),
-                                requestType.getDescription(), 
writeFuture.cause().getMessage());
-                        future.completeExceptionally(writeFuture.cause());
-                    }
+        (flush ? ctx.writeAndFlush(requestMessage) : 
ctx.write(requestMessage)).addListener(writeFuture -> {
+            if (!writeFuture.isSuccess()) {
+                if (pendingRequests.remove(requestId, future) && 
!future.isDone()) {
+                    log.warn("{} Failed to send {} to broker: {}", 
ctx.channel(),
+                            requestType.getDescription(), 
writeFuture.cause().getMessage());
+                    future.completeExceptionally(writeFuture.cause());
                 }
-            });
-        } else {
-            ctx.write(requestMessage, ctx().voidPromise());
-        }
+            }
+        });
         requestTimeoutQueue.add(new RequestTime(requestId, requestType));
     }
 
     private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf 
requestMessage, long requestId,
-                                                 RequestType requestType, 
boolean flush) {
+                                                   RequestType requestType, 
boolean flush) {
         TimedCompletableFuture<T> future = new TimedCompletableFuture<>();
         sendRequestAndHandleTimeout(requestMessage, requestId, requestType, 
flush, future);
         return future;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 0598dc4fb36..0a366a759d3 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -26,6 +26,7 @@ import io.netty.util.concurrent.FastThreadLocal;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -279,6 +280,11 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
         return CompletableFuture.completedFuture(null);
     }
 
+    @VisibleForTesting
+    int getPendingIndividualAcksSize() {
+        return pendingIndividualAcks.size();
+    }
+
     private CompletableFuture<Void> doIndividualBatchAck(MessageIdAdv 
batchMessageId,
                                                          Map<String, Long> 
properties) {
         if (acknowledgementGroupTimeMicros == 0 || (properties != null && 
!properties.isEmpty())) {
@@ -405,7 +411,7 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
         }
 
         CompletableFuture<Void> completableFuture = 
newMessageAckCommandAndWrite(cnx, consumer.consumerId,
-                msgId.getLedgerId(), msgId.getEntryId(), bitSet, ackType, 
properties, true, null, null);
+                msgId.getLedgerId(), msgId.getEntryId(), bitSet, ackType, 
properties, true, null, null, null);
         bitSet.recycle();
         return completableFuture;
     }
@@ -441,13 +447,15 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
             newMessageAckCommandAndWrite(cnx, consumer.consumerId, 
messageId.getLedgerId(), messageId.getEntryId(),
                     lastCumulativeAckToFlush.getBitSetRecyclable(), 
AckType.Cumulative,
                     Collections.emptyMap(), false,
-                    (TimedCompletableFuture<Void>) 
this.currentCumulativeAckFuture, null);
+                    (TimedCompletableFuture<Void>) 
this.currentCumulativeAckFuture, null, null);
             this.consumer.unAckedChunkedMessageIdSequenceMap.remove(messageId);
         }
 
         // Flush all individual acks
         List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck =
                 new ArrayList<>(pendingIndividualAcks.size() + 
pendingIndividualBatchIndexAcks.size());
+        List<MessageIdAdv> individualAcksToFlush = new 
ArrayList<>(pendingIndividualAcks.size());
+        Map<MessageIdAdv, MessageIdImpl[]> chunkedMessageIdsToRestore = new 
HashMap<>();
         if (!pendingIndividualAcks.isEmpty()) {
             if 
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion()))
 {
                 // We can send 1 single protobuf command with all individual 
acks
@@ -456,11 +464,13 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
                     if (msgId == null) {
                         break;
                     }
+                    individualAcksToFlush.add(msgId);
 
                     // if messageId is checked then all the chunked related to 
that msg also processed so, ack all of
                     // them
                     MessageIdImpl[] chunkMsgIds = 
this.consumer.unAckedChunkedMessageIdSequenceMap.get(msgId);
                     if (chunkMsgIds != null && chunkMsgIds.length > 1) {
+                        chunkedMessageIdsToRestore.put(msgId, chunkMsgIds);
                         for (MessageIdImpl cMsgId : chunkMsgIds) {
                             if (cMsgId != null) {
                                 
entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
@@ -479,14 +489,16 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
                     if (msgId == null) {
                         break;
                     }
+                    individualAcksToFlush.add(msgId);
                     newMessageAckCommandAndWrite(cnx, consumer.consumerId, 
msgId.getLedgerId(), msgId.getEntryId(),
                             null, AckType.Individual, Collections.emptyMap(), 
false,
-                            null, null);
+                            null, null, () -> restoreIndividualAck(msgId, 
null));
                     shouldFlush = true;
                 }
             }
         }
 
+        List<Map.Entry<MessageIdAdv, ConcurrentBitSet>> batchIndexAcksToFlush 
= new ArrayList<>();
         while (true) {
             Map.Entry<MessageIdAdv, ConcurrentBitSet> entry =
                     pendingIndividualBatchIndexAcks.pollFirstEntry();
@@ -494,6 +506,7 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
                 // The entry has been removed in a different thread
                 break;
             }
+            batchIndexAcksToFlush.add(entry);
             entriesToAck.add(Triple.of(
                     entry.getKey().getLedgerId(), entry.getKey().getEntryId(), 
entry.getValue()));
         }
@@ -502,7 +515,9 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
 
             newMessageAckCommandAndWrite(cnx, consumer.consumerId, 0L, 0L,
                     null, AckType.Individual, null, true,
-                    (TimedCompletableFuture<Void>) currentIndividualAckFuture, 
entriesToAck);
+                    (TimedCompletableFuture<Void>) currentIndividualAckFuture, 
entriesToAck,
+                    () -> 
restoreIndividualAndBatchIndexAcks(individualAcksToFlush, 
chunkedMessageIdsToRestore,
+                            batchIndexAcksToFlush));
             shouldFlush = true;
         }
 
@@ -547,19 +562,19 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
                     }
                 }
                 completableFuture = newMessageAckCommandAndWrite(cnx, 
consumer.consumerId, 0L, 0L,
-                        null, ackType, null, true, null, entriesToAck);
+                        null, ackType, null, true, null, entriesToAck, null);
             } else {
                 // if don't support multi message ack, it also support ack 
receipt, so we should not think about the
                 // ack receipt in this logic
                 for (MessageIdImpl cMsgId : chunkMsgIds) {
                     newMessageAckCommandAndWrite(cnx, consumerId, 
cMsgId.getLedgerId(), cMsgId.getEntryId(),
-                            bitSet, ackType, map, true, null, null);
+                            bitSet, ackType, map, true, null, null, null);
                 }
                 completableFuture = CompletableFuture.completedFuture(null);
             }
         } else {
             completableFuture = newMessageAckCommandAndWrite(cnx, consumerId, 
msgId.getLedgerId(), msgId.getEntryId(),
-                    bitSet, ackType, map, true, null, null);
+                    bitSet, ackType, map, true, null, null, null);
         }
         return completableFuture;
     }
@@ -569,7 +584,8 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
             long entryId, BitSetRecyclable ackSet, AckType ackType,
             Map<String, Long> properties, boolean flush,
             TimedCompletableFuture<Void> timedCompletableFuture,
-            List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck) {
+            List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck,
+            Runnable writeFailureCallback) {
         if (consumer.isAckReceiptEnabled()) {
             final long requestId = consumer.getClient().newRequestId();
             final ByteBuf cmd;
@@ -611,14 +627,53 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
                 cmd = Commands.newMultiMessageAck(consumerId, entriesToAck, 
-1);
             }
             if (flush) {
-                cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+                if (writeFailureCallback == null) {
+                    cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+                } else {
+                    cnx.ctx().writeAndFlush(cmd).addListener(writeFuture -> {
+                        if (!writeFuture.isSuccess()) {
+                            writeFailureCallback.run();
+                        }
+                    });
+                }
             } else {
-                cnx.ctx().write(cmd, cnx.ctx().voidPromise());
+                if (writeFailureCallback == null) {
+                    cnx.ctx().write(cmd, cnx.ctx().voidPromise());
+                } else {
+                    cnx.ctx().write(cmd).addListener(writeFuture -> {
+                        if (!writeFuture.isSuccess()) {
+                            writeFailureCallback.run();
+                        }
+                    });
+                }
             }
             return CompletableFuture.completedFuture(null);
         }
     }
 
+    private void restoreIndividualAck(MessageIdAdv messageId, @Nullable 
MessageIdImpl[] chunkMsgIds) {
+        pendingIndividualAcks.add(messageId);
+        restoreChunkedMessageIds(messageId, chunkMsgIds);
+    }
+
+    private void restoreIndividualAndBatchIndexAcks(List<MessageIdAdv> 
messageIds,
+                                                    Map<MessageIdAdv, 
MessageIdImpl[]> chunkMsgIds,
+                                                    
List<Map.Entry<MessageIdAdv, ConcurrentBitSet>> batchIndexAcks) {
+        pendingIndividualAcks.addAll(messageIds);
+        chunkMsgIds.forEach(this::restoreChunkedMessageIds);
+        batchIndexAcks.forEach(entry -> 
pendingIndividualBatchIndexAcks.merge(entry.getKey(), entry.getValue(),
+                (currentValue, valueToRestore) -> {
+                    currentValue.and(valueToRestore);
+                    return currentValue;
+                }));
+    }
+
+    private void restoreChunkedMessageIds(MessageIdAdv messageId, @Nullable 
MessageIdImpl[] chunkMsgIds) {
+        if (chunkMsgIds != null) {
+            consumer.unAckedChunkedMessageIdSequenceMap.putIfAbsent(messageId, 
chunkMsgIds);
+        }
+    }
+
     public Optional<Lock> acquireReadLock() {
         Optional<Lock> optionalLock = 
Optional.ofNullable(consumer.isAckReceiptEnabled() ? lock.readLock() : null);
         optionalLock.ifPresent(Lock::lock);
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index 0b91d86064c..8bb85b054a3 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -28,9 +29,13 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collections;
@@ -56,6 +61,8 @@ public class AcknowledgementsGroupingTrackerTest {
     private ConsumerImpl<?> consumer;
     private EventLoopGroup eventLoopGroup;
     private AtomicBoolean returnCnx = new AtomicBoolean(true);
+    private ChannelHandlerContext successCtx;
+    private AtomicBoolean failAckCommandSend = new AtomicBoolean(false);
 
     @BeforeClass
     public void setup() throws NoSuchFieldException, IllegalAccessException {
@@ -70,9 +77,9 @@ public class AcknowledgementsGroupingTrackerTest {
         doReturn(new ConsumerStatsRecorderImpl()).when(consumer).getStats();
         doReturn(UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED)
                 .when(consumer).getUnAckedMessageTracker();
-        ChannelHandlerContext ctx = 
ClientTestFixtures.mockChannelHandlerContext();
+        successCtx = ClientTestFixtures.mockChannelHandlerContext();
         doAnswer(invocation -> returnCnx.get() ? cnx : 
null).when(consumer).getClientCnx();
-        doReturn(ctx).when(cnx).ctx();
+        doReturn(successCtx).when(cnx).ctx();
     }
 
     @DataProvider(name = "isNeedReceipt")
@@ -323,6 +330,60 @@ public class AcknowledgementsGroupingTrackerTest {
         tracker.close();
     }
 
+    @Test
+    public void 
testFlushRetainsPendingIndividualAckOnSendFailureWithoutAckReceipt() throws 
Exception {
+        ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
+        conf.setAcknowledgementsGroupTimeMicros(TimeUnit.SECONDS.toMicros(10));
+        conf.setAckReceiptEnabled(false);
+        doReturn(false).when(consumer).isAckReceiptEnabled();
+        PersistentAcknowledgmentsGroupingTracker tracker =
+                new PersistentAcknowledgmentsGroupingTracker(consumer, conf, 
eventLoopGroup);
+
+        MessageIdImpl msg1 = new MessageIdImpl(5, 1, 0);
+        tracker.addAcknowledgment(msg1, AckType.Individual, 
Collections.emptyMap());
+        assertEquals(tracker.getPendingIndividualAcksSize(), 1);
+
+        doReturn(createFailedChannelHandlerContext()).when(cnx).ctx();
+
+        tracker.flush();
+
+        assertTrue(tracker.isDuplicate(msg1));
+        assertEquals(tracker.getPendingIndividualAcksSize(), 1);
+
+        doReturn(successCtx).when(cnx).ctx();
+
+        tracker.flush();
+
+        assertFalse(tracker.isDuplicate(msg1));
+        assertEquals(tracker.getPendingIndividualAcksSize(), 0);
+        tracker.close();
+    }
+
+    @Test
+    public void testFlushFailsAckFutureOnSendFailureWithAckReceipt() throws 
Exception {
+        ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
+        conf.setAcknowledgementsGroupTimeMicros(TimeUnit.SECONDS.toMicros(10));
+        conf.setAckReceiptEnabled(true);
+        doReturn(true).when(consumer).isAckReceiptEnabled();
+        PersistentAcknowledgmentsGroupingTracker tracker =
+                new PersistentAcknowledgmentsGroupingTracker(consumer, conf, 
eventLoopGroup);
+
+        MessageIdImpl msg1 = new MessageIdImpl(5, 1, 0);
+        CompletableFuture<Void> ackFuture =
+                tracker.addAcknowledgment(msg1, AckType.Individual, 
Collections.emptyMap());
+        assertEquals(tracker.getPendingIndividualAcksSize(), 1);
+
+        failAckCommandSend.set(true);
+        tracker.flush();
+
+        assertTrue(ackFuture.isCompletedExceptionally());
+        assertFalse(tracker.isDuplicate(msg1));
+        assertEquals(tracker.getPendingIndividualAcksSize(), 0);
+
+        failAckCommandSend.set(false);
+        tracker.close();
+    }
+
     @Test(dataProvider = "isNeedReceipt")
     public void testBatchAckTrackerMultiAck(boolean isNeedReceipt) throws 
Exception {
         ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
@@ -463,12 +524,40 @@ public class AcknowledgementsGroupingTrackerTest {
 
         @Override
         public CompletableFuture<Void> newAckForReceipt(ByteBuf request, long 
requestId) {
+            if (failAckCommandSend.get()) {
+                return CompletableFuture.failedFuture(new 
RuntimeException("ack send failed"));
+            }
             return CompletableFuture.completedFuture(null);
         }
 
         @Override
         public void newAckForReceiptWithFuture(ByteBuf request, long requestId,
                                                TimedCompletableFuture<Void> 
future) {
+            if (failAckCommandSend.get()) {
+                future.completeExceptionally(new RuntimeException("ack send 
failed"));
+            }
         }
     }
+
+    private ChannelHandlerContext createFailedChannelHandlerContext() {
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        ChannelFuture listenerFuture = mock(ChannelFuture.class);
+        ChannelFuture failedFuture = mock(ChannelFuture.class);
+        when(failedFuture.isSuccess()).thenReturn(false);
+        when(failedFuture.cause()).thenReturn(new RuntimeException("ack send 
failed"));
+        doAnswer(invocation -> {
+            GenericFutureListener<Future<Void>> listener = 
invocation.getArgument(0);
+            listener.operationComplete(failedFuture);
+            return listenerFuture;
+        }).when(listenerFuture).addListener(any());
+        doAnswer(invocation -> {
+            ReferenceCountUtil.release(invocation.getArgument(0));
+            return listenerFuture;
+        }).when(ctx).write(any());
+        doAnswer(invocation -> {
+            ReferenceCountUtil.release(invocation.getArgument(0));
+            return listenerFuture;
+        }).when(ctx).writeAndFlush(any());
+        return ctx;
+    }
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java
index c4dab1ad351..7e5d98b136a 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java
@@ -64,6 +64,7 @@ public class ClientCnxRequestTimeoutQueueTest {
         ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
         Channel channel = mock(Channel.class);
         when(ctx.writeAndFlush(any())).thenAnswer(args -> 
mock(ChannelFuture.class));
+        when(ctx.write(any())).thenAnswer(args -> mock(ChannelFuture.class));
         when(ctx.channel()).thenReturn(channel);
         when(channel.remoteAddress()).thenReturn(new InetSocketAddress(1234));
         cnx.channelActive(ctx);
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
index e4ab5da96f6..c8bcfc65a95 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
@@ -151,6 +151,11 @@ class ClientTestFixtures {
         }).when(listenerFuture).addListener(any());
 
         // handle write and writeAndFlush methods so that the input message is 
released
+        doAnswer(invocation -> {
+            Object msg = invocation.getArgument(0);
+            ReferenceCountUtil.release(msg);
+            return listenerFuture;
+        }).when(ctx).write(any());
         doAnswer(invocation -> {
             Object msg = invocation.getArgument(0);
             ReferenceCountUtil.release(msg);

Reply via email to