This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 759a5f520f8 [fix][broker] Fix stuck chunks in SharedConsumerAssignor 
permit tracking (#25620)
759a5f520f8 is described below

commit 759a5f520f81ba45caef9bd94a79997855a695fd
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Apr 30 06:18:54 2026 -0700

    [fix][broker] Fix stuck chunks in SharedConsumerAssignor permit tracking 
(#25620)
---
 .../apache/pulsar/broker/service/SharedConsumerAssignor.java   | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
index 847a8385d45..4814d0e2215 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
@@ -92,7 +92,7 @@ public class SharedConsumerAssignor {
             if (metadata == null || !metadata.hasUuid() || 
!metadata.hasChunkId() || !metadata.hasNumChunksFromMsg()) {
                 consumerToEntries.computeIfAbsent(consumer, __ -> new 
ArrayList<>()).add(entryAndMetadata);
             } else {
-                final Consumer consumerForUuid = getConsumerForUuid(metadata, 
consumer, availablePermits);
+                final Consumer consumerForUuid = getConsumerForUuid(metadata, 
consumer);
                 if (consumerForUuid == null) {
                     unassignedMessageProcessor.accept(entryAndMetadata);
                     continue;
@@ -123,9 +123,7 @@ public class SharedConsumerAssignor {
         return null;
     }
 
-    private Consumer getConsumerForUuid(final MessageMetadata metadata,
-                                        final Consumer defaultConsumer,
-                                        final int currentAvailablePermits) {
+    private Consumer getConsumerForUuid(final MessageMetadata metadata, final 
Consumer defaultConsumer) {
         final String uuid = metadata.getUuid();
         Consumer consumer = uuidToConsumer.get(uuid);
         if (consumer == null) {
@@ -144,7 +142,9 @@ public class SharedConsumerAssignor {
             // The last chunk is received, we should remove the cache
             uuidToConsumer.remove(uuid);
         }
-        consumerToPermits.put(consumer, currentAvailablePermits - 1);
+        // Decrement target consumer's permits, not the loop's local 
availablePermits — on a cache
+        // redirect those track different consumers.
+        consumerToPermits.put(consumer, permits - 1);
         return consumer;
     }
 }

Reply via email to