This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new ca27ca98954 [fix][client] Stabilize scaleReceiverQueueHint against
concurrent enqueue/take (#25578)
ca27ca98954 is described below
commit ca27ca989543ed61f29065df9c120738cb49cea4
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Apr 24 16:10:37 2026 -0700
[fix][client] Stabilize scaleReceiverQueueHint against concurrent
enqueue/take (#25578)
(cherry picked from commit f6598d8408f51d5f199c2ec8d47b1dc74fb60b04)
---
.../apache/pulsar/client/impl/ConsumerImpl.java | 7 ++++-
.../pulsar/client/impl/ConsumerImplTest.java | 32 ++++++++++++++++++++++
2 files changed, 38 insertions(+), 1 deletion(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 868c45b277e..1b844d120b3 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2292,8 +2292,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
@Override
protected void updateAutoScaleReceiverQueueHint() {
+ // Called from the enqueue path right after offer(): the message we
just added may
+ // already have been drained by a concurrent take() (which doesn't hold
+ // incomingQueueLock), so clamp incomingMessages.size() to at least 1
to reflect the
+ // post-enqueue state and avoid spuriously clearing the hint in that
race.
boolean prev = scaleReceiverQueueHint.getAndSet(
- getAvailablePermits() + incomingMessages.size() >=
getCurrentReceiverQueueSize());
+ getAvailablePermits() + Math.max(1, incomingMessages.size())
+ >= getCurrentReceiverQueueSize());
if (log.isDebugEnabled() && prev != scaleReceiverQueueHint.get()) {
log.debug("updateAutoScaleReceiverQueueHint {} -> {}", prev,
scaleReceiverQueueHint.get());
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 62d6c0b3f7b..005a4cd5ba6 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -304,4 +304,36 @@ public class ConsumerImplTest {
Pattern consumerNamePattern = Pattern.compile("[a-zA-Z0-9]{5}");
assertTrue(consumerNamePattern.matcher(consumer.getConsumerName()).matches());
}
+
+ @Test(invocationTimeOut = 1000)
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void testUpdateAutoScaleReceiverQueueHintRaceWithConcurrentDrain() {
+ // Regression test: ConsumerBase.enqueueMessageAndCheckBatchReceive()
calls
+ // updateAutoScaleReceiverQueueHint() after
incomingMessages.offer(message) under
+ // incomingQueueLock, but incomingMessages.take()/poll() does NOT
acquire that lock.
+ // A consumer thread draining the queue in parallel with the client-IO
thread's
+ // enqueue can therefore remove the just-offered message before the
hint read of
+ // incomingMessages.size() runs. The hint would then see size() == 0
and be
+ // spuriously cleared, even though the pipeline was full at enqueue
time.
+ consumerConf = new ConsumerConfigurationData<>();
+ consumerConf.setAutoScaledReceiverQueueSizeEnabled(true);
+ createConsumer(consumerConf);
+ consumer.setCurrentReceiverQueueSize(1);
+
+ // Simulate the race: enqueue a message and drain it before the hint
is computed.
+ MessageImpl message = mock(MessageImpl.class);
+ when(message.size()).thenReturn(100);
+ consumer.incomingMessages.offer(message);
+ consumer.incomingMessages.poll();
+
+ Assert.assertEquals(consumer.incomingMessages.size(), 0);
+ Assert.assertEquals(consumer.getAvailablePermits(), 0);
+ Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 1);
+
+ consumer.updateAutoScaleReceiverQueueHint();
+
+ Assert.assertTrue(consumer.scaleReceiverQueueHint.get(),
+ "Hint must reflect the post-enqueue state (pipeline had >=1
message); "
+ + "a concurrent drain of the just-enqueued message
must not clear it.");
+ }
}