This is an automated email from the ASF dual-hosted git repository.
nodece pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fae3df958f7 [improve][client] Best-effort retry for
individual/batch-index acks on send failure when ackReceiptEnabled=false
(#25525)
fae3df958f7 is described below
commit fae3df958f74f2afb0ba398b69d94fd3e6358c4f
Author: Zixuan Liu <[email protected]>
AuthorDate: Fri Apr 24 10:53:23 2026 +0800
[improve][client] Best-effort retry for individual/batch-index acks on send
failure when ackReceiptEnabled=false (#25525)
Co-authored-by: Copilot <[email protected]>
---
.../org/apache/pulsar/client/impl/ClientCnx.java | 30 +++----
.../PersistentAcknowledgmentsGroupingTracker.java | 75 ++++++++++++++---
.../impl/AcknowledgementsGroupingTrackerTest.java | 93 +++++++++++++++++++++-
.../impl/ClientCnxRequestTimeoutQueueTest.java | 1 +
.../pulsar/client/impl/ClientTestFixtures.java | 5 ++
5 files changed, 175 insertions(+), 29 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index a6996386322..0c8bf66ae25 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -1080,29 +1080,25 @@ public class ClientCnx extends PulsarHandler {
}
private <T> void sendRequestAndHandleTimeout(ByteBuf requestMessage, long
requestId,
- RequestType
requestType, boolean flush,
-
TimedCompletableFuture<T> future) {
+ RequestType requestType,
boolean flush,
+ TimedCompletableFuture<T>
future) {
pendingRequests.put(requestId, future);
- if (flush) {
- ctx.writeAndFlush(requestMessage).addListener(writeFuture -> {
- if (!writeFuture.isSuccess()) {
- if (pendingRequests.remove(requestId, future) &&
!future.isDone()) {
- log.warn()
- .attr("send", requestType.getDescription())
- .exceptionMessage(writeFuture.cause())
- .log("Failed to send to broker");
- future.completeExceptionally(writeFuture.cause());
- }
+ (flush ? ctx.writeAndFlush(requestMessage) :
ctx.write(requestMessage)).addListener(writeFuture -> {
+ if (!writeFuture.isSuccess()) {
+ if (pendingRequests.remove(requestId, future) &&
!future.isDone()) {
+ log.warn()
+ .attr("send", requestType.getDescription())
+ .exceptionMessage(writeFuture.cause())
+ .log("Failed to send to broker");
+ future.completeExceptionally(writeFuture.cause());
}
- });
- } else {
- ctx.write(requestMessage, ctx().voidPromise());
- }
+ }
+ });
requestTimeoutQueue.add(new RequestTime(requestId, requestType));
}
private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf
requestMessage, long requestId,
- RequestType requestType,
boolean flush) {
+ RequestType requestType,
boolean flush) {
TimedCompletableFuture<T> future = new TimedCompletableFuture<>();
sendRequestAndHandleTimeout(requestMessage, requestId, requestType,
flush, future);
return future;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 0cd013f20b2..001137bd912 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -26,6 +26,7 @@ import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -279,6 +280,11 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
return CompletableFuture.completedFuture(null);
}
+ @VisibleForTesting
+ int getPendingIndividualAcksSize() {
+ return pendingIndividualAcks.size();
+ }
+
private CompletableFuture<Void> doIndividualBatchAck(MessageIdAdv
batchMessageId,
Map<String, Long>
properties) {
if (acknowledgementGroupTimeMicros == 0 || (properties != null &&
!properties.isEmpty())) {
@@ -405,7 +411,7 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
}
CompletableFuture<Void> completableFuture =
newMessageAckCommandAndWrite(cnx, consumer.consumerId,
- msgId.getLedgerId(), msgId.getEntryId(), bitSet, ackType,
properties, true, null, null);
+ msgId.getLedgerId(), msgId.getEntryId(), bitSet, ackType,
properties, true, null, null, null);
bitSet.recycle();
return completableFuture;
}
@@ -440,13 +446,15 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
newMessageAckCommandAndWrite(cnx, consumer.consumerId,
messageId.getLedgerId(), messageId.getEntryId(),
lastCumulativeAckToFlush.getBitSetRecyclable(),
AckType.Cumulative,
Collections.emptyMap(), false,
- (TimedCompletableFuture<Void>)
this.currentCumulativeAckFuture, null);
+ (TimedCompletableFuture<Void>)
this.currentCumulativeAckFuture, null, null);
this.consumer.unAckedChunkedMessageIdSequenceMap.remove(messageId);
}
// Flush all individual acks
List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck =
new ArrayList<>(pendingIndividualAcks.size() +
pendingIndividualBatchIndexAcks.size());
+ List<MessageIdAdv> individualAcksToFlush = new
ArrayList<>(pendingIndividualAcks.size());
+ Map<MessageIdAdv, MessageIdImpl[]> chunkedMessageIdsToRestore = new
HashMap<>();
if (!pendingIndividualAcks.isEmpty()) {
if
(Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion()))
{
// We can send 1 single protobuf command with all individual
acks
@@ -455,11 +463,13 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
if (msgId == null) {
break;
}
+ individualAcksToFlush.add(msgId);
// if messageId is checked then all the chunked related to
that msg also processed so, ack all of
// them
MessageIdImpl[] chunkMsgIds =
this.consumer.unAckedChunkedMessageIdSequenceMap.get(msgId);
if (chunkMsgIds != null && chunkMsgIds.length > 1) {
+ chunkedMessageIdsToRestore.put(msgId, chunkMsgIds);
for (MessageIdImpl cMsgId : chunkMsgIds) {
if (cMsgId != null) {
entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
@@ -478,14 +488,16 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
if (msgId == null) {
break;
}
+ individualAcksToFlush.add(msgId);
newMessageAckCommandAndWrite(cnx, consumer.consumerId,
msgId.getLedgerId(), msgId.getEntryId(),
null, AckType.Individual, Collections.emptyMap(),
false,
- null, null);
+ null, null, () -> restoreIndividualAck(msgId,
null));
shouldFlush = true;
}
}
}
+ List<Map.Entry<MessageIdAdv, ConcurrentBitSet>> batchIndexAcksToFlush
= new ArrayList<>();
while (true) {
Map.Entry<MessageIdAdv, ConcurrentBitSet> entry =
pendingIndividualBatchIndexAcks.pollFirstEntry();
@@ -493,6 +505,7 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
// The entry has been removed in a different thread
break;
}
+ batchIndexAcksToFlush.add(entry);
entriesToAck.add(Triple.of(
entry.getKey().getLedgerId(), entry.getKey().getEntryId(),
entry.getValue()));
}
@@ -501,7 +514,9 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
newMessageAckCommandAndWrite(cnx, consumer.consumerId, 0L, 0L,
null, AckType.Individual, null, true,
- (TimedCompletableFuture<Void>) currentIndividualAckFuture,
entriesToAck);
+ (TimedCompletableFuture<Void>) currentIndividualAckFuture,
entriesToAck,
+ () ->
restoreIndividualAndBatchIndexAcks(individualAcksToFlush,
chunkedMessageIdsToRestore,
+ batchIndexAcksToFlush));
shouldFlush = true;
}
@@ -546,19 +561,19 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
}
}
completableFuture = newMessageAckCommandAndWrite(cnx,
consumer.consumerId, 0L, 0L,
- null, ackType, null, true, null, entriesToAck);
+ null, ackType, null, true, null, entriesToAck, null);
} else {
// if don't support multi message ack, it also support ack
receipt, so we should not think about the
// ack receipt in this logic
for (MessageIdImpl cMsgId : chunkMsgIds) {
newMessageAckCommandAndWrite(cnx, consumerId,
cMsgId.getLedgerId(), cMsgId.getEntryId(),
- bitSet, ackType, map, true, null, null);
+ bitSet, ackType, map, true, null, null, null);
}
completableFuture = CompletableFuture.completedFuture(null);
}
} else {
completableFuture = newMessageAckCommandAndWrite(cnx, consumerId,
msgId.getLedgerId(), msgId.getEntryId(),
- bitSet, ackType, map, true, null, null);
+ bitSet, ackType, map, true, null, null, null);
}
return completableFuture;
}
@@ -568,7 +583,8 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
long entryId, BitSetRecyclable ackSet, AckType ackType,
Map<String, Long> properties, boolean flush,
TimedCompletableFuture<Void> timedCompletableFuture,
- List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck) {
+ List<Triple<Long, Long, ConcurrentBitSet>> entriesToAck,
+ Runnable writeFailureCallback) {
if (consumer.isAckReceiptEnabled()) {
final long requestId = consumer.getClient().newRequestId();
final ByteBuf cmd;
@@ -610,14 +626,53 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
cmd = Commands.newMultiMessageAck(consumerId, entriesToAck,
-1);
}
if (flush) {
- cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+ if (writeFailureCallback == null) {
+ cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+ } else {
+ cnx.ctx().writeAndFlush(cmd).addListener(writeFuture -> {
+ if (!writeFuture.isSuccess()) {
+ writeFailureCallback.run();
+ }
+ });
+ }
} else {
- cnx.ctx().write(cmd, cnx.ctx().voidPromise());
+ if (writeFailureCallback == null) {
+ cnx.ctx().write(cmd, cnx.ctx().voidPromise());
+ } else {
+ cnx.ctx().write(cmd).addListener(writeFuture -> {
+ if (!writeFuture.isSuccess()) {
+ writeFailureCallback.run();
+ }
+ });
+ }
}
return CompletableFuture.completedFuture(null);
}
}
+ private void restoreIndividualAck(MessageIdAdv messageId, @Nullable
MessageIdImpl[] chunkMsgIds) {
+ pendingIndividualAcks.add(messageId);
+ restoreChunkedMessageIds(messageId, chunkMsgIds);
+ }
+
+ private void restoreIndividualAndBatchIndexAcks(List<MessageIdAdv>
messageIds,
+ Map<MessageIdAdv,
MessageIdImpl[]> chunkMsgIds,
+
List<Map.Entry<MessageIdAdv, ConcurrentBitSet>> batchIndexAcks) {
+ pendingIndividualAcks.addAll(messageIds);
+ chunkMsgIds.forEach(this::restoreChunkedMessageIds);
+ batchIndexAcks.forEach(entry ->
pendingIndividualBatchIndexAcks.merge(entry.getKey(), entry.getValue(),
+ (currentValue, valueToRestore) -> {
+ currentValue.and(valueToRestore);
+ return currentValue;
+ }));
+ }
+
+ private void restoreChunkedMessageIds(MessageIdAdv messageId, @Nullable
MessageIdImpl[] chunkMsgIds) {
+ if (chunkMsgIds != null) {
+ consumer.unAckedChunkedMessageIdSequenceMap.putIfAbsent(messageId,
chunkMsgIds);
+ }
+ }
+
public Optional<Lock> acquireReadLock() {
Optional<Lock> optionalLock =
Optional.ofNullable(consumer.isAckReceiptEnabled() ? lock.readLock() : null);
optionalLock.ifPresent(Lock::lock);
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index 0b91d86064c..8bb85b054a3 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -28,9 +29,13 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
@@ -56,6 +61,8 @@ public class AcknowledgementsGroupingTrackerTest {
private ConsumerImpl<?> consumer;
private EventLoopGroup eventLoopGroup;
private AtomicBoolean returnCnx = new AtomicBoolean(true);
+ private ChannelHandlerContext successCtx;
+ private AtomicBoolean failAckCommandSend = new AtomicBoolean(false);
@BeforeClass
public void setup() throws NoSuchFieldException, IllegalAccessException {
@@ -70,9 +77,9 @@ public class AcknowledgementsGroupingTrackerTest {
doReturn(new ConsumerStatsRecorderImpl()).when(consumer).getStats();
doReturn(UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED)
.when(consumer).getUnAckedMessageTracker();
- ChannelHandlerContext ctx =
ClientTestFixtures.mockChannelHandlerContext();
+ successCtx = ClientTestFixtures.mockChannelHandlerContext();
doAnswer(invocation -> returnCnx.get() ? cnx :
null).when(consumer).getClientCnx();
- doReturn(ctx).when(cnx).ctx();
+ doReturn(successCtx).when(cnx).ctx();
}
@DataProvider(name = "isNeedReceipt")
@@ -323,6 +330,60 @@ public class AcknowledgementsGroupingTrackerTest {
tracker.close();
}
+ @Test
+ public void
testFlushRetainsPendingIndividualAckOnSendFailureWithoutAckReceipt() throws
Exception {
+ ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
+ conf.setAcknowledgementsGroupTimeMicros(TimeUnit.SECONDS.toMicros(10));
+ conf.setAckReceiptEnabled(false);
+ doReturn(false).when(consumer).isAckReceiptEnabled();
+ PersistentAcknowledgmentsGroupingTracker tracker =
+ new PersistentAcknowledgmentsGroupingTracker(consumer, conf,
eventLoopGroup);
+
+ MessageIdImpl msg1 = new MessageIdImpl(5, 1, 0);
+ tracker.addAcknowledgment(msg1, AckType.Individual,
Collections.emptyMap());
+ assertEquals(tracker.getPendingIndividualAcksSize(), 1);
+
+ doReturn(createFailedChannelHandlerContext()).when(cnx).ctx();
+
+ tracker.flush();
+
+ assertTrue(tracker.isDuplicate(msg1));
+ assertEquals(tracker.getPendingIndividualAcksSize(), 1);
+
+ doReturn(successCtx).when(cnx).ctx();
+
+ tracker.flush();
+
+ assertFalse(tracker.isDuplicate(msg1));
+ assertEquals(tracker.getPendingIndividualAcksSize(), 0);
+ tracker.close();
+ }
+
+ @Test
+ public void testFlushFailsAckFutureOnSendFailureWithAckReceipt() throws
Exception {
+ ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
+ conf.setAcknowledgementsGroupTimeMicros(TimeUnit.SECONDS.toMicros(10));
+ conf.setAckReceiptEnabled(true);
+ doReturn(true).when(consumer).isAckReceiptEnabled();
+ PersistentAcknowledgmentsGroupingTracker tracker =
+ new PersistentAcknowledgmentsGroupingTracker(consumer, conf,
eventLoopGroup);
+
+ MessageIdImpl msg1 = new MessageIdImpl(5, 1, 0);
+ CompletableFuture<Void> ackFuture =
+ tracker.addAcknowledgment(msg1, AckType.Individual,
Collections.emptyMap());
+ assertEquals(tracker.getPendingIndividualAcksSize(), 1);
+
+ failAckCommandSend.set(true);
+ tracker.flush();
+
+ assertTrue(ackFuture.isCompletedExceptionally());
+ assertFalse(tracker.isDuplicate(msg1));
+ assertEquals(tracker.getPendingIndividualAcksSize(), 0);
+
+ failAckCommandSend.set(false);
+ tracker.close();
+ }
+
@Test(dataProvider = "isNeedReceipt")
public void testBatchAckTrackerMultiAck(boolean isNeedReceipt) throws
Exception {
ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
@@ -463,12 +524,40 @@ public class AcknowledgementsGroupingTrackerTest {
@Override
public CompletableFuture<Void> newAckForReceipt(ByteBuf request, long
requestId) {
+ if (failAckCommandSend.get()) {
+ return CompletableFuture.failedFuture(new
RuntimeException("ack send failed"));
+ }
return CompletableFuture.completedFuture(null);
}
@Override
public void newAckForReceiptWithFuture(ByteBuf request, long requestId,
TimedCompletableFuture<Void>
future) {
+ if (failAckCommandSend.get()) {
+ future.completeExceptionally(new RuntimeException("ack send
failed"));
+ }
}
}
+
+ private ChannelHandlerContext createFailedChannelHandlerContext() {
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ ChannelFuture listenerFuture = mock(ChannelFuture.class);
+ ChannelFuture failedFuture = mock(ChannelFuture.class);
+ when(failedFuture.isSuccess()).thenReturn(false);
+ when(failedFuture.cause()).thenReturn(new RuntimeException("ack send
failed"));
+ doAnswer(invocation -> {
+ GenericFutureListener<Future<Void>> listener =
invocation.getArgument(0);
+ listener.operationComplete(failedFuture);
+ return listenerFuture;
+ }).when(listenerFuture).addListener(any());
+ doAnswer(invocation -> {
+ ReferenceCountUtil.release(invocation.getArgument(0));
+ return listenerFuture;
+ }).when(ctx).write(any());
+ doAnswer(invocation -> {
+ ReferenceCountUtil.release(invocation.getArgument(0));
+ return listenerFuture;
+ }).when(ctx).writeAndFlush(any());
+ return ctx;
+ }
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java
index c4dab1ad351..7e5d98b136a 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java
@@ -64,6 +64,7 @@ public class ClientCnxRequestTimeoutQueueTest {
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
Channel channel = mock(Channel.class);
when(ctx.writeAndFlush(any())).thenAnswer(args ->
mock(ChannelFuture.class));
+ when(ctx.write(any())).thenAnswer(args -> mock(ChannelFuture.class));
when(ctx.channel()).thenReturn(channel);
when(channel.remoteAddress()).thenReturn(new InetSocketAddress(1234));
cnx.channelActive(ctx);
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
index e4ab5da96f6..c8bcfc65a95 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
@@ -151,6 +151,11 @@ class ClientTestFixtures {
}).when(listenerFuture).addListener(any());
// handle write and writeAndFlush methods so that the input message is
released
+ doAnswer(invocation -> {
+ Object msg = invocation.getArgument(0);
+ ReferenceCountUtil.release(msg);
+ return listenerFuture;
+ }).when(ctx).write(any());
doAnswer(invocation -> {
Object msg = invocation.getArgument(0);
ReferenceCountUtil.release(msg);