This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit bb0b7d5a7762dadacb312408be3fc5c5c534b1ac Author: Zixuan Liu <[email protected]> AuthorDate: Fri Apr 24 10:53:23 2026 +0800 [improve][client] Best-effort retry for individual/batch-index acks on send failure when ackReceiptEnabled=false (#25525) Co-authored-by: Copilot <[email protected]> (cherry picked from commit fae3df958f74f2afb0ba398b69d94fd3e6358c4f) --- .../org/apache/pulsar/client/impl/ClientCnx.java | 26 +++--- .../PersistentAcknowledgmentsGroupingTracker.java | 75 ++++++++++++++--- .../impl/AcknowledgementsGroupingTrackerTest.java | 94 +++++++++++++++++++++- .../impl/ClientCnxRequestTimeoutQueueTest.java | 1 + .../pulsar/client/impl/ClientTestFixtures.java | 5 ++ 5 files changed, 174 insertions(+), 27 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 35c49fb46a8..39822263906 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -1053,27 +1053,23 @@ public class ClientCnx extends PulsarHandler { } private <T> void sendRequestAndHandleTimeout(ByteBuf requestMessage, long requestId, - RequestType requestType, boolean flush, - TimedCompletableFuture<T> future) { + RequestType requestType, boolean flush, + TimedCompletableFuture<T> future) { pendingRequests.put(requestId, future); - if (flush) { - ctx.writeAndFlush(requestMessage).addListener(writeFuture -> { - if (!writeFuture.isSuccess()) { - if (pendingRequests.remove(requestId, future) && !future.isDone()) { - log.warn("{} Failed to send {} to broker: {}", ctx.channel(), - requestType.getDescription(), writeFuture.cause().getMessage()); - future.completeExceptionally(writeFuture.cause()); - } + (flush ? ctx.writeAndFlush(requestMessage) : ctx.write(requestMessage)).addListener(writeFuture -> { + if (!writeFuture.isSuccess()) { + if (pendingRequests.remove(requestId, future) && !future.isDone()) { + log.warn("{} Failed to send {} to broker: {}", ctx.channel(), + requestType.getDescription(), writeFuture.cause().getMessage()); + future.completeExceptionally(writeFuture.cause()); } - }); - } else { - ctx.write(requestMessage, ctx().voidPromise()); - } + } + }); requestTimeoutQueue.add(new RequestTime(requestId, requestType)); } private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMessage, long requestId, - RequestType requestType, boolean flush) { + RequestType requestType, boolean flush) { TimedCompletableFuture<T> future = new TimedCompletableFuture<>(); sendRequestAndHandleTimeout(requestMessage, requestId, requestType, flush, future); return future; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index 0598dc4fb36..0a366a759d3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -26,6 +26,7 @@ import io.netty.util.concurrent.FastThreadLocal; import java.util.ArrayList; import java.util.BitSet; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -279,6 +280,11 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments return CompletableFuture.completedFuture(null); } + @VisibleForTesting + int getPendingIndividualAcksSize() { + return pendingIndividualAcks.size(); + } + private CompletableFuture<Void> doIndividualBatchAck(MessageIdAdv batchMessageId, Map<String, Long> properties) { if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) { @@ -405,7 +411,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments } CompletableFuture<Void> completableFuture = newMessageAckCommandAndWrite(cnx, consumer.consumerId, - msgId.getLedgerId(), msgId.getEntryId(), bitSet, ackType, properties, true, null, null); + msgId.getLedgerId(), msgId.getEntryId(), bitSet, ackType, properties, true, null, null, null); bitSet.recycle(); return completableFuture; } @@ -441,13 +447,15 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments newMessageAckCommandAndWrite(cnx, consumer.consumerId, messageId.getLedgerId(), messageId.getEntryId(), lastCumulativeAckToFlush.getBitSetRecyclable(), AckType.Cumulative, Collections.emptyMap(), false, - (TimedCompletableFuture<Void>) this.currentCumulativeAckFuture, null); + (TimedCompletableFuture<Void>) this.currentCumulativeAckFuture, null, null); this.consumer.unAckedChunkedMessageIdSequenceMap.remove(messageId); } // Flush all individual acks List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck = new ArrayList<>(pendingIndividualAcks.size() + pendingIndividualBatchIndexAcks.size()); + List<MessageIdAdv> individualAcksToFlush = new ArrayList<>(pendingIndividualAcks.size()); + Map<MessageIdAdv, MessageIdImpl[]> chunkedMessageIdsToRestore = new HashMap<>(); if (!pendingIndividualAcks.isEmpty()) { if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) { // We can send 1 single protobuf command with all individual acks @@ -456,11 +464,13 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments if (msgId == null) { break; } + individualAcksToFlush.add(msgId); // if messageId is checked then all the chunked related to that msg also processed so, ack all of // them MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunkedMessageIdSequenceMap.get(msgId); if (chunkMsgIds != null && chunkMsgIds.length > 1) { + chunkedMessageIdsToRestore.put(msgId, chunkMsgIds); for (MessageIdImpl cMsgId : chunkMsgIds) { if (cMsgId != null) { entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null)); @@ -479,14 +489,16 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments if (msgId == null) { break; } + individualAcksToFlush.add(msgId); newMessageAckCommandAndWrite(cnx, consumer.consumerId, msgId.getLedgerId(), msgId.getEntryId(), null, AckType.Individual, Collections.emptyMap(), false, - null, null); + null, null, () -> restoreIndividualAck(msgId, null)); shouldFlush = true; } } } + List<Map.Entry<MessageIdAdv, ConcurrentBitSet>> batchIndexAcksToFlush = new ArrayList<>(); while (true) { Map.Entry<MessageIdAdv, ConcurrentBitSet> entry = pendingIndividualBatchIndexAcks.pollFirstEntry(); @@ -494,6 +506,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments // The entry has been removed in a different thread break; } + batchIndexAcksToFlush.add(entry); entriesToAck.add(Triple.of( entry.getKey().getLedgerId(), entry.getKey().getEntryId(), entry.getValue())); } @@ -502,7 +515,9 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments newMessageAckCommandAndWrite(cnx, consumer.consumerId, 0L, 0L, null, AckType.Individual, null, true, - (TimedCompletableFuture<Void>) currentIndividualAckFuture, entriesToAck); + (TimedCompletableFuture<Void>) currentIndividualAckFuture, entriesToAck, + () -> restoreIndividualAndBatchIndexAcks(individualAcksToFlush, chunkedMessageIdsToRestore, + batchIndexAcksToFlush)); shouldFlush = true; } @@ -547,19 +562,19 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments } } completableFuture = newMessageAckCommandAndWrite(cnx, consumer.consumerId, 0L, 0L, - null, ackType, null, true, null, entriesToAck); + null, ackType, null, true, null, entriesToAck, null); } else { // if don't support multi message ack, it also support ack receipt, so we should not think about the // ack receipt in this logic for (MessageIdImpl cMsgId : chunkMsgIds) { newMessageAckCommandAndWrite(cnx, consumerId, cMsgId.getLedgerId(), cMsgId.getEntryId(), - bitSet, ackType, map, true, null, null); + bitSet, ackType, map, true, null, null, null); } completableFuture = CompletableFuture.completedFuture(null); } } else { completableFuture = newMessageAckCommandAndWrite(cnx, consumerId, msgId.getLedgerId(), msgId.getEntryId(), - bitSet, ackType, map, true, null, null); + bitSet, ackType, map, true, null, null, null); } return completableFuture; } @@ -569,7 +584,8 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments long entryId, BitSetRecyclable ackSet, AckType ackType, Map<String, Long> properties, boolean flush, TimedCompletableFuture<Void> timedCompletableFuture, - List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck) { + List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck, + Runnable writeFailureCallback) { if (consumer.isAckReceiptEnabled()) { final long requestId = consumer.getClient().newRequestId(); final ByteBuf cmd; @@ -611,14 +627,53 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments cmd = Commands.newMultiMessageAck(consumerId, entriesToAck, -1); } if (flush) { - cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise()); + if (writeFailureCallback == null) { + cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise()); + } else { + cnx.ctx().writeAndFlush(cmd).addListener(writeFuture -> { + if (!writeFuture.isSuccess()) { + writeFailureCallback.run(); + } + }); + } } else { - cnx.ctx().write(cmd, cnx.ctx().voidPromise()); + if (writeFailureCallback == null) { + cnx.ctx().write(cmd, cnx.ctx().voidPromise()); + } else { + cnx.ctx().write(cmd).addListener(writeFuture -> { + if (!writeFuture.isSuccess()) { + writeFailureCallback.run(); + } + }); + } } return CompletableFuture.completedFuture(null); } } + private void restoreIndividualAck(MessageIdAdv messageId, @Nullable MessageIdImpl[] chunkMsgIds) { + pendingIndividualAcks.add(messageId); + restoreChunkedMessageIds(messageId, chunkMsgIds); + } + + private void restoreIndividualAndBatchIndexAcks(List<MessageIdAdv> messageIds, + Map<MessageIdAdv, MessageIdImpl[]> chunkMsgIds, + List<Map.Entry<MessageIdAdv, ConcurrentBitSet>> batchIndexAcks) { + pendingIndividualAcks.addAll(messageIds); + chunkMsgIds.forEach(this::restoreChunkedMessageIds); + batchIndexAcks.forEach(entry -> pendingIndividualBatchIndexAcks.merge(entry.getKey(), entry.getValue(), + (currentValue, valueToRestore) -> { + currentValue.and(valueToRestore); + return currentValue; + })); + } + + private void restoreChunkedMessageIds(MessageIdAdv messageId, @Nullable MessageIdImpl[] chunkMsgIds) { + if (chunkMsgIds != null) { + consumer.unAckedChunkedMessageIdSequenceMap.putIfAbsent(messageId, chunkMsgIds); + } + } + public Optional<Lock> acquireReadLock() { Optional<Lock> optionalLock = Optional.ofNullable(consumer.isAckReceiptEnabled() ? lock.readLock() : null); optionalLock.ifPresent(Lock::lock); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java index 76f2f45a030..410819ef732 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -28,9 +29,13 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import java.util.ArrayList; import java.util.BitSet; import java.util.Collections; @@ -46,6 +51,7 @@ import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.client.util.TimedCompletableFuture; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.ProtocolVersion; +import org.apache.pulsar.common.util.FutureUtil; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; @@ -57,6 +63,8 @@ public class AcknowledgementsGroupingTrackerTest { private ConsumerImpl<?> consumer; private EventLoopGroup eventLoopGroup; private AtomicBoolean returnCnx = new AtomicBoolean(true); + private ChannelHandlerContext successCtx; + private AtomicBoolean failAckCommandSend = new AtomicBoolean(false); @BeforeClass public void setup() throws NoSuchFieldException, IllegalAccessException { @@ -71,9 +79,9 @@ public class AcknowledgementsGroupingTrackerTest { doReturn(new ConsumerStatsRecorderImpl()).when(consumer).getStats(); doReturn(UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED) .when(consumer).getUnAckedMessageTracker(); - ChannelHandlerContext ctx = ClientTestFixtures.mockChannelHandlerContext(); + successCtx = ClientTestFixtures.mockChannelHandlerContext(); doAnswer(invocation -> returnCnx.get() ? cnx : null).when(consumer).getClientCnx(); - doReturn(ctx).when(cnx).ctx(); + doReturn(successCtx).when(cnx).ctx(); } @DataProvider(name = "isNeedReceipt") @@ -324,6 +332,60 @@ public class AcknowledgementsGroupingTrackerTest { tracker.close(); } + @Test + public void testFlushRetainsPendingIndividualAckOnSendFailureWithoutAckReceipt() throws Exception { + ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>(); + conf.setAcknowledgementsGroupTimeMicros(TimeUnit.SECONDS.toMicros(10)); + conf.setAckReceiptEnabled(false); + doReturn(false).when(consumer).isAckReceiptEnabled(); + PersistentAcknowledgmentsGroupingTracker tracker = + new PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup); + + MessageIdImpl msg1 = new MessageIdImpl(5, 1, 0); + tracker.addAcknowledgment(msg1, AckType.Individual, Collections.emptyMap()); + assertEquals(tracker.getPendingIndividualAcksSize(), 1); + + doReturn(createFailedChannelHandlerContext()).when(cnx).ctx(); + + tracker.flush(); + + assertTrue(tracker.isDuplicate(msg1)); + assertEquals(tracker.getPendingIndividualAcksSize(), 1); + + doReturn(successCtx).when(cnx).ctx(); + + tracker.flush(); + + assertFalse(tracker.isDuplicate(msg1)); + assertEquals(tracker.getPendingIndividualAcksSize(), 0); + tracker.close(); + } + + @Test + public void testFlushFailsAckFutureOnSendFailureWithAckReceipt() throws Exception { + ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>(); + conf.setAcknowledgementsGroupTimeMicros(TimeUnit.SECONDS.toMicros(10)); + conf.setAckReceiptEnabled(true); + doReturn(true).when(consumer).isAckReceiptEnabled(); + PersistentAcknowledgmentsGroupingTracker tracker = + new PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup); + + MessageIdImpl msg1 = new MessageIdImpl(5, 1, 0); + CompletableFuture<Void> ackFuture = + tracker.addAcknowledgment(msg1, AckType.Individual, Collections.emptyMap()); + assertEquals(tracker.getPendingIndividualAcksSize(), 1); + + failAckCommandSend.set(true); + tracker.flush(); + + assertTrue(ackFuture.isCompletedExceptionally()); + assertFalse(tracker.isDuplicate(msg1)); + assertEquals(tracker.getPendingIndividualAcksSize(), 0); + + failAckCommandSend.set(false); + tracker.close(); + } + @Test(dataProvider = "isNeedReceipt") public void testBatchAckTrackerMultiAck(boolean isNeedReceipt) throws Exception { ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>(); @@ -464,12 +526,40 @@ public class AcknowledgementsGroupingTrackerTest { @Override public CompletableFuture<Void> newAckForReceipt(ByteBuf request, long requestId) { + if (failAckCommandSend.get()) { + return FutureUtil.failedFuture(new RuntimeException("ack send failed")); + } return CompletableFuture.completedFuture(null); } @Override public void newAckForReceiptWithFuture(ByteBuf request, long requestId, TimedCompletableFuture<Void> future) { + if (failAckCommandSend.get()) { + future.completeExceptionally(new RuntimeException("ack send failed")); + } } } + + private ChannelHandlerContext createFailedChannelHandlerContext() { + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + ChannelFuture listenerFuture = mock(ChannelFuture.class); + ChannelFuture failedFuture = mock(ChannelFuture.class); + when(failedFuture.isSuccess()).thenReturn(false); + when(failedFuture.cause()).thenReturn(new RuntimeException("ack send failed")); + doAnswer(invocation -> { + GenericFutureListener<Future<Void>> listener = invocation.getArgument(0); + listener.operationComplete(failedFuture); + return listenerFuture; + }).when(listenerFuture).addListener(any()); + doAnswer(invocation -> { + ReferenceCountUtil.release(invocation.getArgument(0)); + return listenerFuture; + }).when(ctx).write(any()); + doAnswer(invocation -> { + ReferenceCountUtil.release(invocation.getArgument(0)); + return listenerFuture; + }).when(ctx).writeAndFlush(any()); + return ctx; + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java index c4dab1ad351..7e5d98b136a 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java @@ -64,6 +64,7 @@ public class ClientCnxRequestTimeoutQueueTest { ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); Channel channel = mock(Channel.class); when(ctx.writeAndFlush(any())).thenAnswer(args -> mock(ChannelFuture.class)); + when(ctx.write(any())).thenAnswer(args -> mock(ChannelFuture.class)); when(ctx.channel()).thenReturn(channel); when(channel.remoteAddress()).thenReturn(new InetSocketAddress(1234)); cnx.channelActive(ctx); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java index ae0797fa493..494dea365d9 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java @@ -147,6 +147,11 @@ class ClientTestFixtures { }).when(listenerFuture).addListener(any()); // handle write and writeAndFlush methods so that the input message is released + doAnswer(invocation -> { + Object msg = invocation.getArgument(0); + ReferenceCountUtil.release(msg); + return listenerFuture; + }).when(ctx).write(any()); doAnswer(invocation -> { Object msg = invocation.getArgument(0); ReferenceCountUtil.release(msg);
