This is an automated email from the ASF dual-hosted git repository.
nodece pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9e1bae353ad [fix][broker] Fix race in pending acks removal in
redeliverUnacknowledgedMessages (#25589)
9e1bae353ad is described below
commit 9e1bae353ad3f869aaab99a98548fbe37375364e
Author: Zixuan Liu <[email protected]>
AuthorDate: Tue Apr 28 15:18:55 2026 +0800
[fix][broker] Fix race in pending acks removal in
redeliverUnacknowledgedMessages (#25589)
---
.../org/apache/pulsar/broker/service/Consumer.java | 26 +++----
.../pulsar/broker/service/PendingAcksMap.java | 48 ++++++++++++-
.../pulsar/broker/service/PendingAcksMapTest.java | 81 ++++++++++++++++++++++
3 files changed, 136 insertions(+), 19 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 15074f28cc2..3a0bf99eb6d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -1155,15 +1155,12 @@ public class Consumer {
}
public void redeliverUnacknowledgedMessages(long consumerEpoch) {
- // cleanup unackedMessage bucket and redeliver those unack-msgs again
- clearUnAckedMsgs();
- blockedConsumerOnUnackedMsgs = false;
log.debug("Consumer received redelivery");
if (pendingAcks != null) {
List<Position> pendingPositions = new ArrayList<>((int)
pendingAcks.size());
MutableInt totalRedeliveryMessages = new MutableInt(0);
- pendingAcks.forEach((ledgerId, entryId, batchSize, stickyKeyHash)
-> {
+ pendingAcks.forEachAndClear((ledgerId, entryId, batchSize,
stickyKeyHash) -> {
int unAckedCount =
(int)
getUnAckedCountForBatchIndexLevelEnabled(PositionFactory.create(ledgerId,
entryId),
batchSize);
@@ -1171,15 +1168,18 @@ public class Consumer {
pendingPositions.add(PositionFactory.create(ledgerId,
entryId));
});
- for (Position p : pendingPositions) {
- pendingAcks.remove(p.getLedgerId(), p.getEntryId());
+ if (totalRedeliveryMessages.intValue() > 0) {
+ addAndGetUnAckedMsgs(this,
-totalRedeliveryMessages.intValue());
}
+ blockedConsumerOnUnackedMsgs = false;
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages.intValue(),
totalRedeliveryMessages.intValue());
msgRedeliverCounter.add(totalRedeliveryMessages.intValue());
subscription.redeliverUnacknowledgedMessages(this,
pendingPositions);
} else {
+ clearUnAckedMsgs();
+ blockedConsumerOnUnackedMsgs = false;
subscription.redeliverUnacknowledgedMessages(this, consumerEpoch);
}
@@ -1191,10 +1191,9 @@ public class Consumer {
List<Position> pendingPositions = new ArrayList<>();
for (MessageIdData msg : messageIds) {
Position position = PositionFactory.create(msg.getLedgerId(),
msg.getEntryId());
- IntIntPair pendingAck = pendingAcks.get(position.getLedgerId(),
position.getEntryId());
+ IntIntPair pendingAck =
pendingAcks.removeAndGet(position.getLedgerId(), position.getEntryId());
if (pendingAck != null) {
int unAckedCount = (int)
getUnAckedCountForBatchIndexLevelEnabled(position, pendingAck.leftInt());
- pendingAcks.remove(position.getLedgerId(),
position.getEntryId());
totalRedeliveryMessages += unAckedCount;
pendingPositions.add(position);
}
@@ -1212,16 +1211,7 @@ public class Consumer {
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages,
totalRedeliveryMessages);
msgRedeliverCounter.add(totalRedeliveryMessages);
- int numberOfBlockedPermits =
PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(this, 0);
-
- // if permitsReceivedWhileConsumerBlocked has been accumulated then
pass it to Dispatcher to flow messages
- if (numberOfBlockedPermits > 0) {
- MESSAGE_PERMITS_UPDATER.getAndAdd(this, numberOfBlockedPermits);
- log.debug()
- .attr("numberOfBlockedPermits", numberOfBlockedPermits)
- .log("Added blockedPermits to consumer's messagePermits");
- subscription.consumerFlow(this, numberOfBlockedPermits);
- }
+ flowConsumerBlockedPermits(this);
}
public Subscription getSubscription() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
index 46fb7f6e6c8..8be69aa7879 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
@@ -202,9 +202,26 @@ public class PendingAcksMap {
* @param processor the processor to handle each pending ack
*/
public void forEachAndClose(PendingAcksConsumer processor) {
+ internalForEachAndClear(processor, true);
+ }
+
+ /**
+ * Iterate over all the pending acks and clear the map.
+ * Unlike {@link #forEachAndClose(PendingAcksConsumer)}, this method does
not close the map,
+ * so new entries can still be added after this method returns.
+ *
+ * @param processor the processor to handle each pending ack
+ */
+ public void forEachAndClear(PendingAcksConsumer processor) {
+ internalForEachAndClear(processor, false);
+ }
+
+ private void internalForEachAndClear(PendingAcksConsumer processor,
boolean close) {
try {
writeLock.lock();
- closed = true;
+ if (close) {
+ closed = true;
+ }
PendingAcksRemoveHandler pendingAcksRemoveHandler =
pendingAcksRemoveHandlerSupplier.get();
if (pendingAcksRemoveHandler != null) {
try {
@@ -323,6 +340,35 @@ public class PendingAcksMap {
}
}
+ /**
+ * Atomically remove and return the pending ack for the given ledger ID
and entry ID.
+ * Unlike {@link #remove(long, long)}, this method returns the removed
entry so the caller
+ * can access the batch size and sticky key hash without a separate get
operation.
+ *
+ * @param ledgerId the ledger ID
+ * @param entryId the entry ID
+ * @return the removed entry as an IntIntPair (batchSize, stickyKeyHash),
or null if not found
+ */
+ public IntIntPair removeAndGet(long ledgerId, long entryId) {
+ try {
+ writeLock.lock();
+ TreeMap<Long, IntIntPair> ledgerMap = pendingAcks.get(ledgerId);
+ if (ledgerMap == null) {
+ return null;
+ }
+ IntIntPair removedEntry = ledgerMap.remove(entryId);
+ if (removedEntry != null) {
+ handleRemovePendingAck(ledgerId, entryId,
removedEntry.rightInt());
+ }
+ if (removedEntry != null && ledgerMap.isEmpty()) {
+ pendingAcks.remove(ledgerId);
+ }
+ return removedEntry;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
/**
* Remove all pending acks up to the given ledger ID and entry ID,
invoking a callback for each removed entry.
*
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java
index 8db0e3a0f73..02bf098561c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java
@@ -31,6 +31,7 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
+import org.apache.pulsar.common.util.collections.IntIntPair;
import org.testng.annotations.Test;
public class PendingAcksMapTest {
@@ -218,4 +219,84 @@ public class PendingAcksMapTest {
assertEquals(pendingAcksMap.size(), 3);
}
+
+ @Test
+ public void forEachAndClear_ProcessesAndClearsAllPendingAcks() {
+ Consumer consumer = createMockConsumer("consumer1");
+ PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () ->
null, () -> null);
+ pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 1, 123);
+ pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 1, 124);
+
+ List<Long> processedEntries = new ArrayList<>();
+ pendingAcksMap.forEachAndClear((ledgerId, entryId, batchSize,
stickyKeyHash) -> processedEntries.add(entryId));
+
+ assertEquals(processedEntries, List.of(1L, 2L));
+ assertEquals(pendingAcksMap.size(), 0);
+ }
+
+ @Test
+ public void forEachAndClear_AllowsAddingAfterClear() {
+ Consumer consumer = createMockConsumer("consumer1");
+ PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () ->
null, () -> null);
+ pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 1, 123);
+
+ pendingAcksMap.forEachAndClear((ledgerId, entryId, batchSize,
stickyKeyHash) -> {});
+
+ // Unlike forEachAndClose, forEachAndClear should allow new additions
+ boolean result = pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 1, 124);
+ assertTrue(result);
+ assertTrue(pendingAcksMap.contains(1L, 2L));
+ }
+
+ @Test
+ public void forEachAndClear_InvokesRemoveHandler() {
+ Consumer consumer = createMockConsumer("consumer1");
+ PendingAcksMap.PendingAcksRemoveHandler removeHandler =
mock(PendingAcksMap.PendingAcksRemoveHandler.class);
+ PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () ->
null, () -> removeHandler);
+ pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 1, 123);
+ pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 1, 124);
+
+ pendingAcksMap.forEachAndClear((ledgerId, entryId, batchSize,
stickyKeyHash) -> {});
+
+ verify(removeHandler).startBatch();
+ verify(removeHandler).handleRemoving(consumer, 1L, 1L, 123, false);
+ verify(removeHandler).handleRemoving(consumer, 1L, 2L, 124, false);
+ verify(removeHandler).endBatch();
+ }
+
+ @Test
+ public void removeAndGet_RemovesAndReturnsEntry() {
+ Consumer consumer = createMockConsumer("consumer1");
+ PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () ->
null, () -> null);
+ pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 5, 123);
+
+ IntIntPair result = pendingAcksMap.removeAndGet(1L, 1L);
+
+ assertTrue(result != null);
+ assertEquals(result.leftInt(), 5);
+ assertEquals(result.rightInt(), 123);
+ assertFalse(pendingAcksMap.contains(1L, 1L));
+ }
+
+ @Test
+ public void removeAndGet_ReturnsNullWhenNotFound() {
+ Consumer consumer = createMockConsumer("consumer1");
+ PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () ->
null, () -> null);
+
+ IntIntPair result = pendingAcksMap.removeAndGet(1L, 1L);
+
+ assertTrue(result == null);
+ }
+
+ @Test
+ public void removeAndGet_InvokesRemoveHandler() {
+ Consumer consumer = createMockConsumer("consumer1");
+ PendingAcksMap.PendingAcksRemoveHandler removeHandler =
mock(PendingAcksMap.PendingAcksRemoveHandler.class);
+ PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () ->
null, () -> removeHandler);
+ pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 5, 123);
+
+ pendingAcksMap.removeAndGet(1L, 1L);
+
+ verify(removeHandler).handleRemoving(consumer, 1L, 1L, 123, false);
+ }
}
\ No newline at end of file