lhotari commented on code in PR #25592:
URL: https://github.com/apache/pulsar/pull/25592#discussion_r3154297464
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -211,9 +212,18 @@ public void markDeleteComplete(Object ctx) {
long numMessagesExpired = (long) ctx -
cursor.getNumberOfEntriesInBacklog(false);
msgExpired.recordMultipleEvents(numMessagesExpired, 0 /* no value
stats */);
totalMsgExpired.add(numMessagesExpired);
- // If the subscription is a Key_Shared subscription, we should to
trigger message dispatch.
- if (subscription != null && subscription.getType() ==
SubType.Key_Shared) {
- subscription.getDispatcher().markDeletePositionMoveForward();
+ if (subscription != null) {
+ Dispatcher dispatcher = subscription.getDispatcher();
+ if (dispatcher != null) {
+ Position mdPos = cursor.getMarkDeletedPosition();
+ if (mdPos != null) {
+
dispatcher.prunePendingAcksUpToPosition(mdPos.getLedgerId(),
mdPos.getEntryId());
+ }
+ // If the subscription is a Key_Shared subscription, we
should to trigger message dispatch.
+ if (subscription.getType() == SubType.Key_Shared) {
+ dispatcher.markDeletePositionMoveForward();
+ }
Review Comment:
instead of adding a separate `prunePendingAcksUpToPosition`, leave the
pruning as an implementation of the dispatcher. Modify the existing
`markDeletePositionMoveForward` so that it contains the markDeletedPosition
argument. In the implementation, the pruning can be handled for Key_Shared.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java:
##########
@@ -142,6 +142,23 @@ default boolean checkAndUnblockIfStuck() {
return false;
}
+ /**
+ * Prune pending-ack entries up to the specified position and updates the
+ * consumer unacked-message counters accordingly.
+ *
+ * <p>This hook is invoked after the cursor mark-delete operation completes
+ * (for example, during message expiry, skip, or clear backlog). Since the
+ * cursor ack set may no longer be available after mark-delete, the counter
+ * adjustment relies on the remaining unacked count stored in the
+ * {@code PendingAcksMap} entries.
+ *
+ * @param ledgerId the ledger ID of the inclusive upper bound position
+ * @param entryId the entry ID of the inclusive upper bound position
+ */
+ default void prunePendingAcksUpToPosition(long ledgerId, long entryId) {
+ // No-op by default
+ }
Review Comment:
instead of adding a separate `prunePendingAcksUpToPosition`, leave the
pruning as an implementation of the dispatcher. Modify the existing
`markDeletePositionMoveForward` so that it contains the markDeletedPosition
argument. In the implementation, the pruning can be handled for Key_Shared.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]