nodece commented on code in PR #25571:
URL: https://github.com/apache/pulsar/pull/25571#discussion_r3130112332
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -339,6 +339,24 @@ public void readMoreEntriesAsync() {
}
}
+ @Override
+ public void markDeletePositionMoveForward() {
+ // When the mark-delete position advances (due to ack or TTL expiry),
remove stale entries that are
+ // now below the new mark-delete position from the redelivery tracker
and each consumer's pending acks.
+ synchronized (PersistentDispatcherMultipleConsumers.this) {
+ Position mdp = cursor.getMarkDeletedPosition();
+ if (mdp != null) {
+ redeliveryMessages.removeAllUpTo(mdp.getLedgerId(),
mdp.getEntryId());
+ for (Consumer consumer : consumerList) {
+ PendingAcksMap pendingAcks = consumer.getPendingAcks();
+ if (pendingAcks != null) {
+ pendingAcks.removeAllUpTo(mdp.getLedgerId(),
mdp.getEntryId());
+ }
+ }
Review Comment:
There's likely a performance impact due to the higher invocation frequency.
Thanks for pointing that out. I'll revisit the change and adjust it to avoid
unnecessary executions!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]