kirktrue commented on code in PR #20363:
URL: https://github.com/apache/kafka/pull/20363#discussion_r2302368414
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -859,16 +865,19 @@ public ConsumerRecords<K, V> poll(final Duration timeout)
{
throw new IllegalStateException("Consumer is not subscribed to
any topics or assigned any partitions");
}
- do {
- PollEvent event = new PollEvent(timer.currentTimeMs());
- // Make sure to let the background thread know that we are
still polling.
- // This will trigger async auto-commits of consumed positions
when hitting
- // the interval time or reconciling new assignments
- applicationEventHandler.add(event);
+ PollEvent event = new PollEvent(timer.currentTimeMs());
+ // Make sure to let the background thread know that we are still
polling.
+ // This will trigger async auto-commits of consumed positions when
hitting
+ // the interval time or reconciling new assignments
+ applicationEventHandler.add(event);
+
+ if (reconciliationInProgress.get() ||
autoCommitState.shouldAutoCommit()) {
Review Comment:
> Couldn't we end up with a race condition here if the app thread sees
`autoCommitState.shouldAutoCommit()` false at this point (because interval
hasn't expired just yet), but by the time the background checks the same when
processing the poll event the interval expired?
No, simply because nothing updates the timer's sense of the current time
between when the application thread submits the `PollEvent` and when it's
processed by the network thread.
The application thread determines the timestamp used by the auto-commit
timer via `PollEvent` (and `AssignmentChangeEvent`). And it's only checked when
`CommitRequestManager.updateTimerAndMaybeCommit()` is invoked by the
`ApplicationEventProcessor`. So when we update the timer in the application
thread via `AutoCommitState.updateTimer()` and then call
`AutoCommitState.shouldAutoCommit()`, we should get the same result in either
thread, at least until the auto-commit is actually kicked off.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]