lianetm commented on code in PR #20363:
URL: https://github.com/apache/kafka/pull/20363#discussion_r2291474222
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -859,16 +865,19 @@ public ConsumerRecords<K, V> poll(final Duration timeout)
{
throw new IllegalStateException("Consumer is not subscribed to
any topics or assigned any partitions");
}
- do {
- PollEvent event = new PollEvent(timer.currentTimeMs());
- // Make sure to let the background thread know that we are
still polling.
- // This will trigger async auto-commits of consumed positions
when hitting
- // the interval time or reconciling new assignments
- applicationEventHandler.add(event);
+ PollEvent event = new PollEvent(timer.currentTimeMs());
+ // Make sure to let the background thread know that we are still
polling.
+ // This will trigger async auto-commits of consumed positions when
hitting
+ // the interval time or reconciling new assignments
+ applicationEventHandler.add(event);
+
+ if (reconciliationInProgress.get() ||
autoCommitState.shouldAutoCommit()) {
Review Comment:
Agreed, and trying to push it a bit more, even with auto-commit enabled,
maybe we can still optimise if we check the timer in the app thread, and then
use an atomic var "timerExpired" that the background can reuse (instead of
checking the timer again).
So we would optimse if auto-commit disabled, or if it's enabled but the
timer hasn't expired.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]