This is an automated email from the ASF dual-hosted git repository.

nodece pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e1bae353ad [fix][broker] Fix race in pending acks removal in 
redeliverUnacknowledgedMessages (#25589)
9e1bae353ad is described below

commit 9e1bae353ad3f869aaab99a98548fbe37375364e
Author: Zixuan Liu <[email protected]>
AuthorDate: Tue Apr 28 15:18:55 2026 +0800

    [fix][broker] Fix race in pending acks removal in 
redeliverUnacknowledgedMessages (#25589)
---
 .../org/apache/pulsar/broker/service/Consumer.java | 26 +++----
 .../pulsar/broker/service/PendingAcksMap.java      | 48 ++++++++++++-
 .../pulsar/broker/service/PendingAcksMapTest.java  | 81 ++++++++++++++++++++++
 3 files changed, 136 insertions(+), 19 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 15074f28cc2..3a0bf99eb6d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -1155,15 +1155,12 @@ public class Consumer {
     }
 
     public void redeliverUnacknowledgedMessages(long consumerEpoch) {
-        // cleanup unackedMessage bucket and redeliver those unack-msgs again
-        clearUnAckedMsgs();
-        blockedConsumerOnUnackedMsgs = false;
         log.debug("Consumer received redelivery");
 
         if (pendingAcks != null) {
             List<Position> pendingPositions = new ArrayList<>((int) 
pendingAcks.size());
             MutableInt totalRedeliveryMessages = new MutableInt(0);
-            pendingAcks.forEach((ledgerId, entryId, batchSize, stickyKeyHash) 
-> {
+            pendingAcks.forEachAndClear((ledgerId, entryId, batchSize, 
stickyKeyHash) -> {
                 int unAckedCount =
                         (int) 
getUnAckedCountForBatchIndexLevelEnabled(PositionFactory.create(ledgerId, 
entryId),
                                 batchSize);
@@ -1171,15 +1168,18 @@ public class Consumer {
                 pendingPositions.add(PositionFactory.create(ledgerId, 
entryId));
             });
 
-            for (Position p : pendingPositions) {
-                pendingAcks.remove(p.getLedgerId(), p.getEntryId());
+            if (totalRedeliveryMessages.intValue() > 0) {
+                addAndGetUnAckedMsgs(this, 
-totalRedeliveryMessages.intValue());
             }
+            blockedConsumerOnUnackedMsgs = false;
 
             
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages.intValue(), 
totalRedeliveryMessages.intValue());
             msgRedeliverCounter.add(totalRedeliveryMessages.intValue());
 
             subscription.redeliverUnacknowledgedMessages(this, 
pendingPositions);
         } else {
+            clearUnAckedMsgs();
+            blockedConsumerOnUnackedMsgs = false;
             subscription.redeliverUnacknowledgedMessages(this, consumerEpoch);
         }
 
@@ -1191,10 +1191,9 @@ public class Consumer {
         List<Position> pendingPositions = new ArrayList<>();
         for (MessageIdData msg : messageIds) {
             Position position = PositionFactory.create(msg.getLedgerId(), 
msg.getEntryId());
-            IntIntPair pendingAck = pendingAcks.get(position.getLedgerId(), 
position.getEntryId());
+            IntIntPair pendingAck = 
pendingAcks.removeAndGet(position.getLedgerId(), position.getEntryId());
             if (pendingAck != null) {
                 int unAckedCount = (int) 
getUnAckedCountForBatchIndexLevelEnabled(position, pendingAck.leftInt());
-                pendingAcks.remove(position.getLedgerId(), 
position.getEntryId());
                 totalRedeliveryMessages += unAckedCount;
                 pendingPositions.add(position);
             }
@@ -1212,16 +1211,7 @@ public class Consumer {
         msgRedeliver.recordMultipleEvents(totalRedeliveryMessages, 
totalRedeliveryMessages);
         msgRedeliverCounter.add(totalRedeliveryMessages);
 
-        int numberOfBlockedPermits = 
PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(this, 0);
-
-        // if permitsReceivedWhileConsumerBlocked has been accumulated then 
pass it to Dispatcher to flow messages
-        if (numberOfBlockedPermits > 0) {
-            MESSAGE_PERMITS_UPDATER.getAndAdd(this, numberOfBlockedPermits);
-            log.debug()
-                    .attr("numberOfBlockedPermits", numberOfBlockedPermits)
-                    .log("Added blockedPermits to consumer's messagePermits");
-            subscription.consumerFlow(this, numberOfBlockedPermits);
-        }
+        flowConsumerBlockedPermits(this);
     }
 
     public Subscription getSubscription() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
index 46fb7f6e6c8..8be69aa7879 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
@@ -202,9 +202,26 @@ public class PendingAcksMap {
      * @param processor the processor to handle each pending ack
      */
     public void forEachAndClose(PendingAcksConsumer processor) {
+        internalForEachAndClear(processor, true);
+    }
+
+    /**
+     * Iterate over all the pending acks and clear the map.
+     * Unlike {@link #forEachAndClose(PendingAcksConsumer)}, this method does 
not close the map,
+     * so new entries can still be added after this method returns.
+     *
+     * @param processor the processor to handle each pending ack
+     */
+    public void forEachAndClear(PendingAcksConsumer processor) {
+        internalForEachAndClear(processor, false);
+    }
+
+    private void internalForEachAndClear(PendingAcksConsumer processor, 
boolean close) {
         try {
             writeLock.lock();
-            closed = true;
+            if (close) {
+                closed = true;
+            }
             PendingAcksRemoveHandler pendingAcksRemoveHandler = 
pendingAcksRemoveHandlerSupplier.get();
             if (pendingAcksRemoveHandler != null) {
                 try {
@@ -323,6 +340,35 @@ public class PendingAcksMap {
         }
     }
 
+    /**
+     * Atomically remove and return the pending ack for the given ledger ID 
and entry ID.
+     * Unlike {@link #remove(long, long)}, this method returns the removed 
entry so the caller
+     * can access the batch size and sticky key hash without a separate get 
operation.
+     *
+     * @param ledgerId the ledger ID
+     * @param entryId the entry ID
+     * @return the removed entry as an IntIntPair (batchSize, stickyKeyHash), 
or null if not found
+     */
+    public IntIntPair removeAndGet(long ledgerId, long entryId) {
+        try {
+            writeLock.lock();
+            TreeMap<Long, IntIntPair> ledgerMap = pendingAcks.get(ledgerId);
+            if (ledgerMap == null) {
+                return null;
+            }
+            IntIntPair removedEntry = ledgerMap.remove(entryId);
+            if (removedEntry != null) {
+                handleRemovePendingAck(ledgerId, entryId, 
removedEntry.rightInt());
+            }
+            if (removedEntry != null && ledgerMap.isEmpty()) {
+                pendingAcks.remove(ledgerId);
+            }
+            return removedEntry;
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
     /**
      * Remove all pending acks up to the given ledger ID and entry ID, 
invoking a callback for each removed entry.
      *
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java
index 8db0e3a0f73..02bf098561c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PendingAcksMapTest.java
@@ -31,6 +31,7 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.pulsar.common.util.collections.IntIntPair;
 import org.testng.annotations.Test;
 
 public class PendingAcksMapTest {
@@ -218,4 +219,84 @@ public class PendingAcksMapTest {
 
         assertEquals(pendingAcksMap.size(), 3);
     }
+
+    @Test
+    public void forEachAndClear_ProcessesAndClearsAllPendingAcks() {
+        Consumer consumer = createMockConsumer("consumer1");
+        PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> 
null, () -> null);
+        pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 1, 123);
+        pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 1, 124);
+
+        List<Long> processedEntries = new ArrayList<>();
+        pendingAcksMap.forEachAndClear((ledgerId, entryId, batchSize, 
stickyKeyHash) -> processedEntries.add(entryId));
+
+        assertEquals(processedEntries, List.of(1L, 2L));
+        assertEquals(pendingAcksMap.size(), 0);
+    }
+
+    @Test
+    public void forEachAndClear_AllowsAddingAfterClear() {
+        Consumer consumer = createMockConsumer("consumer1");
+        PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> 
null, () -> null);
+        pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 1, 123);
+
+        pendingAcksMap.forEachAndClear((ledgerId, entryId, batchSize, 
stickyKeyHash) -> {});
+
+        // Unlike forEachAndClose, forEachAndClear should allow new additions
+        boolean result = pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 1, 124);
+        assertTrue(result);
+        assertTrue(pendingAcksMap.contains(1L, 2L));
+    }
+
+    @Test
+    public void forEachAndClear_InvokesRemoveHandler() {
+        Consumer consumer = createMockConsumer("consumer1");
+        PendingAcksMap.PendingAcksRemoveHandler removeHandler = 
mock(PendingAcksMap.PendingAcksRemoveHandler.class);
+        PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> 
null, () -> removeHandler);
+        pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 1, 123);
+        pendingAcksMap.addPendingAckIfAllowed(1L, 2L, 1, 124);
+
+        pendingAcksMap.forEachAndClear((ledgerId, entryId, batchSize, 
stickyKeyHash) -> {});
+
+        verify(removeHandler).startBatch();
+        verify(removeHandler).handleRemoving(consumer, 1L, 1L, 123, false);
+        verify(removeHandler).handleRemoving(consumer, 1L, 2L, 124, false);
+        verify(removeHandler).endBatch();
+    }
+
+    @Test
+    public void removeAndGet_RemovesAndReturnsEntry() {
+        Consumer consumer = createMockConsumer("consumer1");
+        PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> 
null, () -> null);
+        pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 5, 123);
+
+        IntIntPair result = pendingAcksMap.removeAndGet(1L, 1L);
+
+        assertTrue(result != null);
+        assertEquals(result.leftInt(), 5);
+        assertEquals(result.rightInt(), 123);
+        assertFalse(pendingAcksMap.contains(1L, 1L));
+    }
+
+    @Test
+    public void removeAndGet_ReturnsNullWhenNotFound() {
+        Consumer consumer = createMockConsumer("consumer1");
+        PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> 
null, () -> null);
+
+        IntIntPair result = pendingAcksMap.removeAndGet(1L, 1L);
+
+        assertTrue(result == null);
+    }
+
+    @Test
+    public void removeAndGet_InvokesRemoveHandler() {
+        Consumer consumer = createMockConsumer("consumer1");
+        PendingAcksMap.PendingAcksRemoveHandler removeHandler = 
mock(PendingAcksMap.PendingAcksRemoveHandler.class);
+        PendingAcksMap pendingAcksMap = new PendingAcksMap(consumer, () -> 
null, () -> removeHandler);
+        pendingAcksMap.addPendingAckIfAllowed(1L, 1L, 5, 123);
+
+        pendingAcksMap.removeAndGet(1L, 1L);
+
+        verify(removeHandler).handleRemoving(consumer, 1L, 1L, 123, false);
+    }
 }
\ No newline at end of file

Reply via email to