kirktrue commented on code in PR #20324:
URL: https://github.com/apache/kafka/pull/20324#discussion_r2310690119
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1847,11 +1871,21 @@ private Fetch<K, V> collectFetch() {
* defined
*/
private boolean updateFetchPositions(final Timer timer) {
- cachedSubscriptionHasAllFetchPositions = false;
+ // Fetch position validation is in the hot path for poll() and the
cost of thread interaction for
+ // event processing is *very* heavy, CPU-wise. In a stable system, the
positions are valid; having the
+ // network thread check the validity yields the same answer 99%+ of
the time. But calling the
+ // network thread to determine that is very expensive.
+ //
+ // Instead, let the *application thread* determine if any partitions
need their positions updated. If not,
+ // the application thread can skip sending an event to the network
thread that will simply end up coming
+ // to the same conclusion, albeit much slower.
+ if (sharedConsumerState.canSkipUpdateFetchPositions())
+ return true;
+
try {
CheckAndUpdatePositionsEvent checkAndUpdatePositionsEvent = new
CheckAndUpdatePositionsEvent(calculateDeadlineMs(timer));
wakeupTrigger.setActiveTask(checkAndUpdatePositionsEvent.future());
- cachedSubscriptionHasAllFetchPositions =
applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent);
+ applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent);
Review Comment:
Good point. I've removed the returned value from
`CheckAndUpdatePositionsEvent` because we're no longer caching the result of
the `hasAllFetchPositions()` anyway.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]