This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5c749e2051e2e3bd8848183ecba9ef26c191a65a Author: Matteo Merli <[email protected]> AuthorDate: Thu Apr 30 06:18:54 2026 -0700 [fix][broker] Fix stuck chunks in SharedConsumerAssignor permit tracking (#25620) (cherry picked from commit 759a5f520f81ba45caef9bd94a79997855a695fd) --- .../apache/pulsar/broker/service/SharedConsumerAssignor.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java index bbf8dfd2b10..a317ad7560b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java @@ -89,7 +89,7 @@ public class SharedConsumerAssignor { if (metadata == null || !metadata.hasUuid() || !metadata.hasChunkId() || !metadata.hasNumChunksFromMsg()) { consumerToEntries.computeIfAbsent(consumer, __ -> new ArrayList<>()).add(entryAndMetadata); } else { - final Consumer consumerForUuid = getConsumerForUuid(metadata, consumer, availablePermits); + final Consumer consumerForUuid = getConsumerForUuid(metadata, consumer); if (consumerForUuid == null) { unassignedMessageProcessor.accept(entryAndMetadata); continue; @@ -120,9 +120,7 @@ public class SharedConsumerAssignor { return null; } - private Consumer getConsumerForUuid(final MessageMetadata metadata, - final Consumer defaultConsumer, - final int currentAvailablePermits) { + private Consumer getConsumerForUuid(final MessageMetadata metadata, final Consumer defaultConsumer) { final String uuid = metadata.getUuid(); Consumer consumer = uuidToConsumer.get(uuid); if (consumer == null) { @@ -141,7 +139,9 @@ public class SharedConsumerAssignor { // The last chunk is received, we should remove the cache uuidToConsumer.remove(uuid); } - consumerToPermits.put(consumer, currentAvailablePermits - 1); + // Decrement target consumer's permits, not the loop's local availablePermits — on a cache + // redirect those track different consumers. + consumerToPermits.put(consumer, permits - 1); return consumer; } }
