junrao commented on code in PR #18795:
URL: https://github.com/apache/kafka/pull/18795#discussion_r2004442106


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -432,7 +439,14 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> 
prepareFetchRequests()
                 // going to be failed anyway before being sent, so skip 
sending the request for now
                 log.trace("Skipping fetch for partition {} because node {} is 
awaiting reconnect backoff", partition, node);
             } else if (nodesWithPendingFetchRequests.contains(node.id())) {
+                // If there's already an inflight request for this node, don't 
issue another request.
                 log.trace("Skipping fetch for partition {} because previous 
request to {} has not been processed", partition, node);
+            } else if (bufferedNodes.contains(node.id())) {

Review Comment:
   > Is it impossible for server A to have a pending fetch request when the 
consumer is still processing records from server A?
   
   Yes. Let's say a consumer is only consuming a single partition. In a poll() 
call, we will return the buffered data for that partition and if the buffered 
data is drained, trigger the next fetch request. So, the processing of the 
returned buffered data and the handling of the next fetch request can run in 
parallel.
   
   When a consumer is consuming multiple partitions, this PR only issues the 
next fetch request when the buffered data for all partitions is drained, in 
order to fix the session eviction issue on the server side. This reduces the 
concurrency a bit, but the concurrency mentioned above still exists.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to