This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f6598d8408f [fix][client] Stabilize scaleReceiverQueueHint against 
concurrent enqueue/take (#25578)
f6598d8408f is described below

commit f6598d8408f51d5f199c2ec8d47b1dc74fb60b04
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Apr 24 16:10:37 2026 -0700

    [fix][client] Stabilize scaleReceiverQueueHint against concurrent 
enqueue/take (#25578)
---
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  7 ++++-
 .../pulsar/client/impl/ConsumerImplTest.java       | 32 ++++++++++++++++++++++
 2 files changed, 38 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index fc5eabdba25..5862fce64de 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2290,8 +2290,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     @Override
     protected void updateAutoScaleReceiverQueueHint() {
+        // Called from the enqueue path right after offer(): the message we 
just added may
+        // already have been drained by a concurrent take() (which doesn't hold
+        // incomingQueueLock), so clamp incomingMessages.size() to at least 1 
to reflect the
+        // post-enqueue state and avoid spuriously clearing the hint in that 
race.
         boolean prev = scaleReceiverQueueHint.getAndSet(
-                getAvailablePermits() + incomingMessages.size() >= 
getCurrentReceiverQueueSize());
+                getAvailablePermits() + Math.max(1, incomingMessages.size())
+                        >= getCurrentReceiverQueueSize());
         log.debug().attr("updateautoscalereceiverqueuehint", prev)
                 .attr("get", scaleReceiverQueueHint.get())
                 .log("updateAutoScaleReceiverQueueHint ->");
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 27ba9f4f2ed..f53051e2c42 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -309,4 +309,36 @@ public class ConsumerImplTest {
         Pattern consumerNamePattern = Pattern.compile("[a-zA-Z0-9]{5}");
         
assertTrue(consumerNamePattern.matcher(consumer.getConsumerName()).matches());
     }
+
+    @Test(invocationTimeOut = 1000)
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public void testUpdateAutoScaleReceiverQueueHintRaceWithConcurrentDrain() {
+        // Regression test: ConsumerBase.enqueueMessageAndCheckBatchReceive() 
calls
+        // updateAutoScaleReceiverQueueHint() after 
incomingMessages.offer(message) under
+        // incomingQueueLock, but incomingMessages.take()/poll() does NOT 
acquire that lock.
+        // A consumer thread draining the queue in parallel with the client-IO 
thread's
+        // enqueue can therefore remove the just-offered message before the 
hint read of
+        // incomingMessages.size() runs. The hint would then see size() == 0 
and be
+        // spuriously cleared, even though the pipeline was full at enqueue 
time.
+        consumerConf = new ConsumerConfigurationData<>();
+        consumerConf.setAutoScaledReceiverQueueSizeEnabled(true);
+        createConsumer(consumerConf);
+        consumer.setCurrentReceiverQueueSize(1);
+
+        // Simulate the race: enqueue a message and drain it before the hint 
is computed.
+        MessageImpl message = mock(MessageImpl.class);
+        when(message.size()).thenReturn(100);
+        consumer.incomingMessages.offer(message);
+        consumer.incomingMessages.poll();
+
+        Assert.assertEquals(consumer.incomingMessages.size(), 0);
+        Assert.assertEquals(consumer.getAvailablePermits(), 0);
+        Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 1);
+
+        consumer.updateAutoScaleReceiverQueueHint();
+
+        Assert.assertTrue(consumer.scaleReceiverQueueHint.get(),
+                "Hint must reflect the post-enqueue state (pipeline had >=1 
message); "
+                        + "a concurrent drain of the just-enqueued message 
must not clear it.");
+    }
 }

Reply via email to