lhotari commented on code in PR #25581:
URL: https://github.com/apache/pulsar/pull/25581#discussion_r3146316588


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java:
##########
@@ -1121,6 +1121,34 @@ public PendingAcksMap getPendingAcks() {
         return pendingAcks;
     }
 
+    /**
+     * Remove all pending acks up to the given mark-delete position and 
decrement the consumer's unacked message
+     * counter by the remaining unacked count for each removed entry.
+     *
+     * <p>This is used when the cursor's mark-delete position advances past 
entries that are still in the consumer's
+     * pending acks. The remaining unacked count accounts for batch index 
level acknowledgments — only the truly
+     * unacked batch indexes are decremented.
+     *
+     * @param markDeleteLedgerId the ledger ID up to which to remove pending 
acks
+     * @param markDeleteEntryId the entry ID up to which to remove pending acks
+     */
+    public void removePendingAcksUpToAndCountUnacked(long markDeleteLedgerId, 
long markDeleteEntryId) {

Review Comment:
   instead of "removePendingAcksUpToAndCountUnacked", this could be called 
"removePendingAcksUpToPositionAndDecrementUnacked" so that the name clearly 
expresses the intent.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java:
##########
@@ -1121,6 +1121,34 @@ public PendingAcksMap getPendingAcks() {
         return pendingAcks;
     }
 
+    /**
+     * Remove all pending acks up to the given mark-delete position and 
decrement the consumer's unacked message
+     * counter by the remaining unacked count for each removed entry.
+     *
+     * <p>This is used when the cursor's mark-delete position advances past 
entries that are still in the consumer's
+     * pending acks. The remaining unacked count accounts for batch index 
level acknowledgments — only the truly
+     * unacked batch indexes are decremented.
+     *
+     * @param markDeleteLedgerId the ledger ID up to which to remove pending 
acks
+     * @param markDeleteEntryId the entry ID up to which to remove pending acks
+     */
+    public void removePendingAcksUpToAndCountUnacked(long markDeleteLedgerId, 
long markDeleteEntryId) {
+        if (pendingAcks == null) {
+            return;
+        }
+
+        int[] totalUnacked = {0};

Review Comment:
   In other locations, we use MutableInt for such purpose. Using an array is 
slightly confusing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to