junrao commented on code in PR #18795:
URL: https://github.com/apache/kafka/pull/18795#discussion_r2004442106
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -432,7 +439,14 @@ protected Map<Node, FetchSessionHandler.FetchRequestData>
prepareFetchRequests()
// going to be failed anyway before being sent, so skip
sending the request for now
log.trace("Skipping fetch for partition {} because node {} is
awaiting reconnect backoff", partition, node);
} else if (nodesWithPendingFetchRequests.contains(node.id())) {
+ // If there's already an inflight request for this node, don't
issue another request.
log.trace("Skipping fetch for partition {} because previous
request to {} has not been processed", partition, node);
+ } else if (bufferedNodes.contains(node.id())) {
Review Comment:
> Is it impossible for server A to have a pending fetch request when the
consumer is still processing records from server A?
Yes. Let's say a consumer is only consuming a single partition. In a poll()
call, we will return the buffered data for that partition and if the buffered
data is drained, trigger the next fetch request. So, the processing of the
returned buffered data and the handling of the next fetch request can run in
parallel.
When a consumer is consuming multiple partitions, this PR only issues the
next fetch request when the buffered data for all partitions is drained, in
order to fix the session eviction issue on the server side. This reduces the
concurrency a bit, but the concurrency mentioned above still exists.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]