merlimat opened a new pull request, #25577:
URL: https://github.com/apache/pulsar/pull/25577
### Motivation
`ConsumerMemoryLimitTest#testMultiPulsarClientConsumerShareMemoryLimitController`
intermittently fails at the final `Awaitility.await().until(() ->
topic2Consumer.getCurrentReceiverQueueSize() == 2)` assertion:
```
org.awaitility.core.ConditionTimeoutException: Condition ... was not
fulfilled within 10 seconds.
at
ConsumerMemoryLimitTest.testMultiPulsarClientConsumerShareMemoryLimitController(ConsumerMemoryLimitTest.java:193)
```
Example flaky run:
https://scans.gradle.com/s/bw6broyxzeuve/tests/task/:pulsar-broker:test/details/org.apache.pulsar.client.impl.ConsumerMemoryLimitTest/testMultiPulsarClientConsumerShareMemoryLimitController/2/output
**Root cause.** Earlier in the test, `msgCount + 1` (= 4) messages are sent
to `topic2` but only `msgCount` (= 3) are drained by the `for` loop. With
`receiverQueueSize = 1`, each `receive()` returns an available permit and the
broker pushes the next message — so after the loop exits, the 4th message ends
up buffered in the consumer's `incomingMessages` queue.
The subsequent `topic2Consumer.receiveAsync()` then takes the fast path in
`ConsumerImpl.internalReceiveAsync()`:
```java
Message<T> message = incomingMessages.poll();
if (message == null) {
expectMoreIncomingMessages(); // only path that expands the queue
pendingReceives.add(result);
} else {
messageProcessed(message);
result.complete(beforeConsume(message));
}
```
Because `poll()` returns the buffered 4th message immediately,
`expectMoreIncomingMessages()` is never invoked and the receiver queue stays at
1, causing the Awaitility call to time out. The test normally passes because
the 4th message usually arrives after `receiveAsync()` has already gone down
the `null` branch — a timing race.
### Modifications
Drain the trailing `(msgCount+1)`-th message before the `receiveAsync()`
call, so the buffer is empty and the queue-expansion path is actually exercised.
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
This change is already covered by existing tests —
`testMultiPulsarClientConsumerShareMemoryLimitController` itself verifies the
queue expansion behavior; the fix stabilizes the test without changing its
intent.
### Does this pull request potentially affect one of the following parts:
- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]