This is an automated email from the ASF dual-hosted git repository.

nodece pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 138595f6256 [fix][broker] Move pending acks cleanup to selected 
mark-delete callbacks (#25592)
138595f6256 is described below

commit 138595f6256c301956f9d77fde8534699e992536
Author: Zixuan Liu <[email protected]>
AuthorDate: Wed Apr 29 23:18:31 2026 +0800

    [fix][broker] Move pending acks cleanup to selected mark-delete callbacks 
(#25592)
---
 .../org/apache/pulsar/broker/service/Consumer.java | 130 ++++++++++++---------
 .../apache/pulsar/broker/service/Dispatcher.java   |   9 ++
 .../pulsar/broker/service/PendingAcksMap.java      |  46 ++++++--
 .../PersistentDispatcherMultipleConsumers.java     |  24 ++--
 .../persistent/PersistentMessageExpiryMonitor.java |  10 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java |   1 +
 ...tickyKeyDispatcherMultipleConsumersClassic.java |   4 +-
 .../service/persistent/PersistentSubscription.java |   6 +-
 .../client/impl/TransactionEndToEndTest.java       |   9 +-
 9 files changed, 146 insertions(+), 93 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 3a0bf99eb6d..a198905eed9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -385,9 +385,18 @@ public class Consumer {
                     } else {
                         stickyKeyHash = stickyKeyHashes.get(i);
                     }
-                    boolean sendingAllowed =
-                            
pendingAcks.addPendingAckIfAllowed(entry.getLedgerId(), entry.getEntryId(), 
batchSize,
-                                    stickyKeyHash);
+                    boolean sendingAllowed;
+                    long[] ackSet = batchIndexesAcks == null ? null : 
batchIndexesAcks.getAckSet(i);
+                    int remainingUnacked;
+                    if (ackSet != null) {
+                        remainingUnacked = 
BitSet.valueOf(ackSet).cardinality();
+                        unackedMessages -= (batchSize - remainingUnacked);
+                    } else {
+                        remainingUnacked = batchSize;
+                    }
+                    sendingAllowed =
+                            
pendingAcks.addPendingAckIfAllowed(entry.getLedgerId(), entry.getEntryId(),
+                                    remainingUnacked, stickyKeyHash);
                     if (!sendingAllowed) {
                         // sending isn't allowed when pending acks doesn't 
accept adding the entry
                         // this happens when Key_Shared draining hashes 
contains the stickyKeyHash
@@ -401,10 +410,6 @@ public class Consumer {
                                 .attr("batchSize", batchSize)
                                 .log("Skipping sending of entry since adding 
to pending acks failed");
                     } else {
-                        long[] ackSet = batchIndexesAcks == null ? null : 
batchIndexesAcks.getAckSet(i);
-                        if (ackSet != null) {
-                            unackedMessages -= (batchSize - 
BitSet.valueOf(ackSet).cardinality());
-                        }
                         log.debug()
                                 .attr("ledgerId", entry.getLedgerId())
                                 .attr("entryId", entry.getEntryId())
@@ -580,7 +585,7 @@ public class Consumer {
             ObjectIntPair<Consumer> ackOwnerConsumerAndBatchSize =
                     getAckOwnerConsumerAndBatchSize(msgId.getLedgerId(), 
msgId.getEntryId());
             Consumer ackOwnerConsumer = ackOwnerConsumerAndBatchSize.left();
-            long ackedCount;
+            long ackedCount = 0;
             int batchSize = ackOwnerConsumerAndBatchSize.rightInt();
             if (msgId.getAckSetsCount() > 0) {
                 long[] ackSets = new long[msgId.getAckSetsCount()];
@@ -596,11 +601,19 @@ public class Consumer {
                                 
.syncBatchPositionBitSetForPendingAck(position);
                     }
                 }
-                addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
+                if (ackedCount > 0) {
+                    boolean updated = ackOwnerConsumer.updateRemainingUnacked(
+                            position.getLedgerId(), position.getEntryId(), 
(int) ackedCount);
+                    if (updated) {
+                        addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) 
ackedCount);
+                    }
+                }
             } else {
                 position = PositionFactory.create(msgId.getLedgerId(), 
msgId.getEntryId());
-                ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, 
position, ackOwnerConsumer);
-                if (checkCanRemovePendingAcksAndHandle(ackOwnerConsumer, 
position, msgId)) {
+                IntIntPair removed = ackOwnerConsumer.removePendingAckAndGet(
+                        position.getLedgerId(), position.getEntryId());
+                if (removed != null) {
+                    ackedCount = removed.leftInt();
                     addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
                     updateBlockedConsumerOnUnackedMsgs(ackOwnerConsumer);
                 }
@@ -679,12 +692,22 @@ public class Consumer {
                 }
                 AckSetStateUtil.getAckSetState(position).setAckSet(ackSets);
                 ackedCount = getAckedCountForTransactionAck(batchSize, 
ackSets);
+                if (ackedCount > 0) {
+                    boolean updated = ackOwnerConsumer.updateRemainingUnacked(
+                            position.getLedgerId(), position.getEntryId(), 
(int) ackedCount);
+                    if (updated) {
+                        addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) 
ackedCount);
+                    }
+                }
+            } else {
+                IntIntPair removed = ackOwnerConsumer.removePendingAckAndGet(
+                        position.getLedgerId(), position.getEntryId());
+                if (removed != null) {
+                    addAndGetUnAckedMsgs(ackOwnerConsumer, -removed.leftInt());
+                    updateBlockedConsumerOnUnackedMsgs(ackOwnerConsumer);
+                }
             }
 
-            addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
-
-            checkCanRemovePendingAcksAndHandle(ackOwnerConsumer, position, 
msgId);
-
             checkAckValidationError(ack, position);
 
             totalAckCount.add(ackedCount);
@@ -708,16 +731,6 @@ public class Consumer {
         return completableFuture.thenApply(__ -> totalAckCount.sum());
     }
 
-    private long getAckedCountForMsgIdNoAckSets(int batchSize, Position 
position, Consumer consumer) {
-        if (isAcknowledgmentAtBatchIndexLevelEnabled && 
Subscription.isIndividualAckMode(subType)) {
-            long[] cursorAckSet = getCursorAckSet(position);
-            if (cursorAckSet != null) {
-                return getAckedCountForBatchIndexLevelEnabled(position, 
batchSize, EMPTY_ACK_SET, consumer);
-            }
-        }
-        return batchSize;
-    }
-
     private long getAckedCountForBatchIndexLevelEnabled(Position position, int 
batchSize, long[] ackSets,
                                                         Consumer consumer) {
         long ackedCount = 0;
@@ -747,19 +760,6 @@ public class Consumer {
         return ackedCount;
     }
 
-    private long getUnAckedCountForBatchIndexLevelEnabled(Position position, 
int batchSize) {
-        long unAckedCount = batchSize;
-        if (isAcknowledgmentAtBatchIndexLevelEnabled) {
-            long[] cursorAckSet = getCursorAckSet(position);
-            if (cursorAckSet != null) {
-                BitSetRecyclable cursorBitSet = 
BitSetRecyclable.create().resetWords(cursorAckSet);
-                unAckedCount = cursorBitSet.cardinality();
-                cursorBitSet.recycle();
-            }
-        }
-        return unAckedCount;
-    }
-
     private void checkAckValidationError(CommandAck ack, Position position) {
         if (ack.hasValidationError()) {
             log.warn()
@@ -769,14 +769,6 @@ public class Consumer {
         }
     }
 
-    private boolean checkCanRemovePendingAcksAndHandle(Consumer 
ackOwnedConsumer,
-                                                       Position position, 
MessageIdData msgId) {
-        if (Subscription.isIndividualAckMode(subType) && 
msgId.getAckSetsCount() == 0) {
-            return removePendingAcks(ackOwnedConsumer, position);
-        }
-        return false;
-    }
-
     /**
      * Retrieves the acknowledgment owner consumer and batch size for the 
specified ledgerId and entryId.
      *
@@ -1121,6 +1113,37 @@ public class Consumer {
         return pendingAcks;
     }
 
+    /**
+     * Atomically decrement the remaining unacked count for the specified 
position
+     * by the given acknowledged delta.
+     *
+     * <p>No-op if {@code pendingAcks} is not initialized.
+     *
+     * @return {@code true} if the update succeeds or pendingAcks is null;
+     *         {@code false} otherwise
+     */
+    public boolean updateRemainingUnacked(long ledgerId, long entryId, int 
ackedDelta) {
+        if (pendingAcks != null) {
+            return pendingAcks.updateRemainingUnacked(ledgerId, entryId, 
ackedDelta);
+        }
+        return true;
+    }
+
+    /**
+     * Atomically remove the pending ack entry and return its stored values.
+     *
+     * <p>No-op if {@code pendingAcks} is not initialized.
+     *
+     * @return the removed {@link IntIntPair#leftInt() remainingUnacked} and
+     *         {@link IntIntPair#rightInt() stickyKeyHash}, or {@code null} if 
not found
+     */
+    public IntIntPair removePendingAckAndGet(long ledgerId, long entryId) {
+        if (pendingAcks != null) {
+            return pendingAcks.removeAndGet(ledgerId, entryId);
+        }
+        return null;
+    }
+
     /**
      * Remove all pending acks up to the given mark-delete position and 
decrement the consumer's unacked message
      * counter by the remaining unacked count for each removed entry.
@@ -1139,9 +1162,8 @@ public class Consumer {
 
         MutableInt mutableTotalUnacked = new MutableInt(0);
         pendingAcks.removeAllUpTo(markDeleteLedgerId, markDeleteEntryId,
-                (ledgerId, entryId, batchSize, stickyKeyHash) -> {
-                    mutableTotalUnacked.add((int) 
getUnAckedCountForBatchIndexLevelEnabled(
-                            PositionFactory.create(ledgerId, entryId), 
batchSize));
+                (ledgerId, entryId, remainingUnacked, stickyKeyHash) -> {
+                    mutableTotalUnacked.add(remainingUnacked);
                 });
         int totalUnacked = mutableTotalUnacked.intValue();
         if (totalUnacked > 0) {
@@ -1160,11 +1182,8 @@ public class Consumer {
         if (pendingAcks != null) {
             List<Position> pendingPositions = new ArrayList<>((int) 
pendingAcks.size());
             MutableInt totalRedeliveryMessages = new MutableInt(0);
-            pendingAcks.forEachAndClear((ledgerId, entryId, batchSize, 
stickyKeyHash) -> {
-                int unAckedCount =
-                        (int) 
getUnAckedCountForBatchIndexLevelEnabled(PositionFactory.create(ledgerId, 
entryId),
-                                batchSize);
-                totalRedeliveryMessages.add(unAckedCount);
+            pendingAcks.forEachAndClear((ledgerId, entryId, remainingUnacked, 
stickyKeyHash) -> {
+                totalRedeliveryMessages.add(remainingUnacked);
                 pendingPositions.add(PositionFactory.create(ledgerId, 
entryId));
             });
 
@@ -1193,8 +1212,7 @@ public class Consumer {
             Position position = PositionFactory.create(msg.getLedgerId(), 
msg.getEntryId());
             IntIntPair pendingAck = 
pendingAcks.removeAndGet(position.getLedgerId(), position.getEntryId());
             if (pendingAck != null) {
-                int unAckedCount = (int) 
getUnAckedCountForBatchIndexLevelEnabled(position, pendingAck.leftInt());
-                totalRedeliveryMessages += unAckedCount;
+                totalRedeliveryMessages += pendingAck.leftInt();
                 pendingPositions.add(position);
             }
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index e19deb34e31..9f59d4cd175 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -131,6 +131,15 @@ public interface Dispatcher {
         //No-op
     }
 
+    /**
+     * This hook is invoked after cursor mark-delete operations triggered by
+     * message removal flows such as expiry, skip, or clear backlog, but not 
for
+     * regular ack-driven mark-delete operations due to their higher frequency.
+     *
+     * <p>Since the cursor ack set may no longer be available after 
mark-delete,
+     * the cleanup logic relies on the remaining unacked count stored in
+     * {@code PendingAcksMap} entries.
+     */
     default void markDeletePositionMoveForward() {
         // No-op
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
index 8be69aa7879..70f1a7dd247 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
@@ -87,12 +87,13 @@ public class PendingAcksMap {
         /**
          * Accept a pending acknowledgment.
          *
-         * @param ledgerId      the ledger ID
-         * @param entryId       the entry ID
-         * @param batchSize     the batch size
-         * @param stickyKeyHash the sticky key hash
+         * @param ledgerId          the ledger ID
+         * @param entryId           the entry ID
+         * @param remainingUnacked  the number of remaining unacked messages 
in this entry
+         *                          (accounts for batch index level 
acknowledgments)
+         * @param stickyKeyHash     the sticky key hash
          */
-        void accept(long ledgerId, long entryId, int batchSize, int 
stickyKeyHash);
+        void accept(long ledgerId, long entryId, int remainingUnacked, int 
stickyKeyHash);
     }
 
     private final Consumer consumer;
@@ -122,11 +123,12 @@ public class PendingAcksMap {
      *
      * @param ledgerId the ledger ID
      * @param entryId the entry ID
-     * @param batchSize the batch size
+     * @param remainingUnacked the number of remaining unacked messages in 
this entry
+     *                         (for batch entries with some indexes already 
acked, this may be less than batchSize)
      * @param stickyKeyHash the sticky key hash
      * @return true if the pending ack was added, and it's allowed to send a 
message, false otherwise
      */
-    public boolean addPendingAckIfAllowed(long ledgerId, long entryId, int 
batchSize, int stickyKeyHash) {
+    public boolean addPendingAckIfAllowed(long ledgerId, long entryId, int 
remainingUnacked, int stickyKeyHash) {
         try {
             writeLock.lock();
             // prevent adding sticky hash to pending acks if the 
PendingAcksMap has already been closed
@@ -143,7 +145,7 @@ public class PendingAcksMap {
             }
             TreeMap<Long, IntIntPair> ledgerPendingAcks =
                     pendingAcks.computeIfAbsent(ledgerId, k -> new 
TreeMap<>());
-            ledgerPendingAcks.put(entryId, IntIntPair.of(batchSize, 
stickyKeyHash));
+            ledgerPendingAcks.put(entryId, IntIntPair.of(remainingUnacked, 
stickyKeyHash));
             return true;
         } finally {
             writeLock.unlock();
@@ -311,6 +313,34 @@ public class PendingAcksMap {
         }
     }
 
+    /**
+     * Atomically update the remaining unacked count for a pending ack entry 
by subtracting the given delta.
+     * Called from the ack handler after computing the number of batch indexes 
acknowledged in a partial ack.
+     *
+     * @param ledgerId the ledger ID
+     * @param entryId the entry ID
+     * @param ackedDelta the number of batch indexes that were just 
acknowledged
+     * @return true if the entry was found and updated, false otherwise
+     */
+    public boolean updateRemainingUnacked(long ledgerId, long entryId, int 
ackedDelta) {
+        try {
+            writeLock.lock();
+            TreeMap<Long, IntIntPair> ledgerMap = pendingAcks.get(ledgerId);
+            if (ledgerMap == null) {
+                return false;
+            }
+            IntIntPair current = ledgerMap.get(entryId);
+            if (current == null) {
+                return false;
+            }
+            int newRemaining = current.leftInt() - ackedDelta;
+            ledgerMap.put(entryId, IntIntPair.of(newRemaining, 
current.rightInt()));
+            return true;
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
     /**
      * Remove the pending ack for the given ledger ID and entry ID.
      *
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 65ed8a5acc3..c569cf5b68c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -145,7 +145,6 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractPersistentDis
     protected enum ReadType {
         Normal, Replay
     }
-    private Position lastMarkDeletePositionBeforeReadMoreEntries;
     private volatile long readMoreEntriesCallCount;
 
     public PersistentDispatcherMultipleConsumers(PersistentTopic topic, 
ManagedCursor cursor,
@@ -362,17 +361,6 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractPersistentDis
         // increment the counter for readMoreEntries calls, to track the 
number of times readMoreEntries is called
         readMoreEntriesCallCount++;
 
-        // remove possible expired messages from redelivery tracker and 
pending acks
-        Position markDeletePosition = cursor.getMarkDeletedPosition();
-        if (lastMarkDeletePositionBeforeReadMoreEntries != markDeletePosition) 
{
-            redeliveryMessages.removeAllUpTo(markDeletePosition.getLedgerId(), 
markDeletePosition.getEntryId());
-            for (Consumer consumer : consumerList) {
-                consumer.removePendingAcksUpToPositionAndDecrementUnacked(
-                        markDeletePosition.getLedgerId(), 
markDeletePosition.getEntryId());
-            }
-            lastMarkDeletePositionBeforeReadMoreEntries = markDeletePosition;
-        }
-
         // totalAvailablePermits may be updated by other threads
         int firstAvailableConsumerPermits = getFirstAvailableConsumerPermits();
         int currentTotalAvailablePermits = Math.max(totalAvailablePermits, 
firstAvailableConsumerPermits);
@@ -600,6 +588,18 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractPersistentDis
         return consumerList;
     }
 
+    @Override
+    public void markDeletePositionMoveForward() {
+        Position markDeletePosition = cursor.getMarkDeletedPosition();
+        if (markDeletePosition != null) {
+            redeliveryMessages.removeAllUpTo(markDeletePosition.getLedgerId(), 
markDeletePosition.getEntryId());
+            for (Consumer consumer : consumerList) {
+                consumer.removePendingAcksUpToPositionAndDecrementUnacked(
+                        markDeletePosition.getLedgerId(), 
markDeletePosition.getEntryId());
+            }
+        }
+    }
+
     @Override
     public synchronized boolean canUnsubscribe(Consumer consumer) {
         return consumerList.size() == 1 && consumerSet.contains(consumer);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index bfc8951be49..a9f7e305104 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -38,9 +38,9 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.proto.ManagedLedgerInfo;
+import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.MessageExpirer;
 import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.stats.Rate;
 import org.jspecify.annotations.Nullable;
 /**
@@ -211,9 +211,11 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback, Messag
             long numMessagesExpired = (long) ctx - 
cursor.getNumberOfEntriesInBacklog(false);
             msgExpired.recordMultipleEvents(numMessagesExpired, 0 /* no value 
stats */);
             totalMsgExpired.add(numMessagesExpired);
-            // If the subscription is a Key_Shared subscription, we should to 
trigger message dispatch.
-            if (subscription != null && subscription.getType() == 
SubType.Key_Shared) {
-                subscription.getDispatcher().markDeletePositionMoveForward();
+            if (subscription != null) {
+                Dispatcher dispatcher = subscription.getDispatcher();
+                if (dispatcher != null) {
+                    dispatcher.markDeletePositionMoveForward();
+                }
             }
             expirationCheckInProgress = FALSE;
             log.debug()
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 2549e4a34a9..02055b28f3b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -642,6 +642,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
 
     @Override
     public void markDeletePositionMoveForward() {
+        super.markDeletePositionMoveForward();
         // reschedule a read with a backoff after moving the mark-delete 
position forward since there might have
         // been consumers that were blocked by hash and couldn't make progress
         reScheduleReadWithKeySharedUnblockingInterval();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
index 4a15e9a6bd4..e5e81af4ca4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
@@ -489,11 +489,13 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersClassic
     }
 
     @Override
-    public void markDeletePositionMoveForward() {
+    public void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion) 
{
+        super.afterAckMessages(exOfDeletion, ctxOfDeletion);
         // Execute the notification in different thread to avoid a mutex chain 
here
         // from the delete operation that was completed
         topic.getBrokerService().getTopicOrderedExecutor().execute(() -> {
             synchronized 
(PersistentStickyKeyDispatcherMultipleConsumersClassic.this) {
+                super.markDeletePositionMoveForward();
                 if (recentlyJoinedConsumers != null && 
!recentlyJoinedConsumers.isEmpty()
                         && removeConsumersFromRecentJoinedConsumers()) {
                     // After we process acks, we need to check whether the 
mark-delete position was advanced and we
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index cb60a70a0c3..eb3d024ab9a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -577,10 +577,6 @@ public class PersistentSubscription extends 
AbstractSubscription {
         if (newMD.compareTo(oldPosition) != 0) {
             updateLastMarkDeleteAdvancedTimestamp();
             handleReplicatedSubscriptionsUpdate(newMD);
-
-            if (dispatcher != null) {
-                dispatcher.markDeletePositionMoveForward();
-            }
         }
     }
 
@@ -798,6 +794,7 @@ public class PersistentSubscription extends 
AbstractSubscription {
                             future.complete(null);
                         }
                     });
+                    dispatcher.markDeletePositionMoveForward();
                     dispatcher.afterAckMessages(null, ctx);
                 } else {
                     future.complete(null);
@@ -837,6 +834,7 @@ public class PersistentSubscription extends 
AbstractSubscription {
                                 .log("Skipped messages");
                         future.complete(null);
                         if (dispatcher != null) {
+                            dispatcher.markDeletePositionMoveForward();
                             dispatcher.afterAckMessages(null, ctx);
                         }
                     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 90a11d34eff..16467fabd15 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -1527,14 +1527,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
             messageIds.add(consumer.receive(waitTimeForCanReceiveMsgInSec, 
TimeUnit.SECONDS).getMessageId());
         }
 
-        MessageIdImpl messageId = (MessageIdImpl) messageIds.get(0);
-
-
-        // remove the message from the pendingAcks, in fact redeliver will 
remove the messageId from the pendingAck
-        getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false)
-                
.get().get().getSubscription(subName).getConsumers().get(0).getPendingAcks()
-                .remove(messageId.ledgerId, messageId.entryId);
-
+        // Ack one message in the batch with a transaction.
         Transaction txn = getTxn();
         consumer.acknowledgeAsync(messageIds.get(1), txn).get();
 

Reply via email to