merlimat opened a new pull request, #25578: URL: https://github.com/apache/pulsar/pull/25578
### Motivation `ConsumerImpl.updateAutoScaleReceiverQueueHint()` is invoked from `ConsumerBase.enqueueMessageAndCheckBatchReceive()` immediately after `incomingMessages.offer(message)`, under `incomingQueueLock`. The hint is then derived from `getAvailablePermits() + incomingMessages.size()`. But `incomingMessages.take()` (and `poll()`) do not acquire `incomingQueueLock`, so a consumer-thread `take()` that races with the enqueue can drain the just-offered message before the hint is recomputed. The read of `incomingMessages.size()` then observes zero, and the hint is spuriously cleared to `false` even though the pipeline was momentarily full. This produces flakiness in `ConsumerMemoryLimitTest#testMultiPulsarClientConsumerShareMemoryLimitController` — inside its receive() loop the consumer-thread `take()` frequently overlaps with the client IO thread enqueue, so the hint ends up `false` even though the queue was at capacity for every message. The subsequent `receiveAsync()` therefore fails to trigger `expectMoreIncomingMessages()` (the `CAS true→false` fails) and the receiver queue never expands to 2, causing the `Awaitility.await()` to time out. Example failing CI run: https://github.com/apache/pulsar/actions/runs/24909862708/job/72949410128 I reproduced it locally with `@Test(invocationCount = 100)` — roughly 1 in ~50 runs fails; 100/100 pass after the fix. ### Modifications `ConsumerImpl.updateAutoScaleReceiverQueueHint()`: clamp `incomingMessages.size()` to at least 1 in the enqueue-time hint calculation. The method is only called from the enqueue path, where a message was just added, so the post-enqueue state of the pipeline is at least 1 regardless of any concurrent drain. In the non-racy case `incomingMessages.size() >= 1` and `Math.max(1, …)` is a no-op. Adds `ConsumerImplTest#testUpdateAutoScaleReceiverQueueHintRaceWithConcurrentDrain`, a focused unit test that simulates the `offer + concurrent drain` sequence deterministically and asserts the hint is retained. It fails without the clamp and passes with it. ### Verifying this change - [ ] Make sure that the change passes the CI checks. This change added tests and can be verified as follows: - Added `ConsumerImplTest#testUpdateAutoScaleReceiverQueueHintRaceWithConcurrentDrain` covering the race deterministically. - Locally confirmed that `testMultiPulsarClientConsumerShareMemoryLimitController` passes 100/100 with the fix (and reliably fails within a handful of invocations without it). ### Does this pull request potentially affect one of the following parts: - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
