This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 759a5f520f8 [fix][broker] Fix stuck chunks in SharedConsumerAssignor
permit tracking (#25620)
759a5f520f8 is described below
commit 759a5f520f81ba45caef9bd94a79997855a695fd
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Apr 30 06:18:54 2026 -0700
[fix][broker] Fix stuck chunks in SharedConsumerAssignor permit tracking
(#25620)
---
.../apache/pulsar/broker/service/SharedConsumerAssignor.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
index 847a8385d45..4814d0e2215 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
@@ -92,7 +92,7 @@ public class SharedConsumerAssignor {
if (metadata == null || !metadata.hasUuid() ||
!metadata.hasChunkId() || !metadata.hasNumChunksFromMsg()) {
consumerToEntries.computeIfAbsent(consumer, __ -> new
ArrayList<>()).add(entryAndMetadata);
} else {
- final Consumer consumerForUuid = getConsumerForUuid(metadata,
consumer, availablePermits);
+ final Consumer consumerForUuid = getConsumerForUuid(metadata,
consumer);
if (consumerForUuid == null) {
unassignedMessageProcessor.accept(entryAndMetadata);
continue;
@@ -123,9 +123,7 @@ public class SharedConsumerAssignor {
return null;
}
- private Consumer getConsumerForUuid(final MessageMetadata metadata,
- final Consumer defaultConsumer,
- final int currentAvailablePermits) {
+ private Consumer getConsumerForUuid(final MessageMetadata metadata, final
Consumer defaultConsumer) {
final String uuid = metadata.getUuid();
Consumer consumer = uuidToConsumer.get(uuid);
if (consumer == null) {
@@ -144,7 +142,9 @@ public class SharedConsumerAssignor {
// The last chunk is received, we should remove the cache
uuidToConsumer.remove(uuid);
}
- consumerToPermits.put(consumer, currentAvailablePermits - 1);
+ // Decrement target consumer's permits, not the loop's local
availablePermits — on a cache
+ // redirect those track different consumers.
+ consumerToPermits.put(consumer, permits - 1);
return consumer;
}
}