appchemist commented on code in PR #15647:
URL: https://github.com/apache/kafka/pull/15647#discussion_r1580499797
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -200,6 +200,9 @@ protected void handleFetchSuccess(final Node fetchTarget,
if (partitionData.currentLeader().leaderId() != -1 &&
partitionData.currentLeader().leaderEpoch() != -1) {
partitionsWithUpdatedLeaderInfo.put(partition, new
Metadata.LeaderIdAndEpoch(
Optional.of(partitionData.currentLeader().leaderId()),
Optional.of(partitionData.currentLeader().leaderEpoch())));
+ } else {
+ requestMetadataUpdate(metadata, subscriptions,
partition);
+ subscriptions.awaitUpdate(partition);
Review Comment:
If the FetchStates is `FETCHING` as per KIP-951, the
`FetchCollector.handleInitializeErrors()` method is called.
I thought that in this case, it should not be changed to `AWAIT_UPDATE`.
Additionally, if it's `AWAIT_UPDATE`, it will be filtered out by the
following code inside the `FetchCollector.initialize()` method and will not go
through `FetchCollector.handleInitializeErrors()`.
```
if (!subscriptions.hasValidPosition(tp)) {
// this can happen when a rebalance happened while fetch is still
in-flight
log.debug("Ignoring fetched records for partition {} since it no longer
has valid position", tp);
return null;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]