appchemist commented on code in PR #15647:
URL: https://github.com/apache/kafka/pull/15647#discussion_r1580499797
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -200,6 +200,9 @@ protected void handleFetchSuccess(final Node fetchTarget,
if (partitionData.currentLeader().leaderId() != -1 &&
partitionData.currentLeader().leaderEpoch() != -1) {
partitionsWithUpdatedLeaderInfo.put(partition, new
Metadata.LeaderIdAndEpoch(
Optional.of(partitionData.currentLeader().leaderId()),
Optional.of(partitionData.currentLeader().leaderEpoch())));
+ } else {
+ requestMetadataUpdate(metadata, subscriptions,
partition);
+ subscriptions.awaitUpdate(partition);
Review Comment:
If the FetchStates is FETCHING as per KIP-951, the
FetchCollector.handleInitializeErrors() method is called.
I thought that in this case, it should not be changed to AWAIT_UPDATE.
Additionally, if it's AWAIT_UPDATE, it will be filtered out by the following
code inside the FetchCollector.initialize() method and will not go through
FetchCollector.handleInitializeErrors().
```
if (!subscriptions.hasValidPosition(tp)) {
// this can happen when a rebalance happened while fetch is still
in-flight
log.debug("Ignoring fetched records for partition {} since it no longer
has valid position", tp);
return null;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]