kirktrue commented on code in PR #15647:
URL: https://github.com/apache/kafka/pull/15647#discussion_r1580103763
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java:
##########
@@ -327,21 +327,27 @@ private void handleInitializeErrors(final CompletedFetch
completedFetch, final E
final long fetchOffset = completedFetch.nextFetchOffset();
if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
- error == Errors.REPLICA_NOT_AVAILABLE ||
+ error == Errors.FENCED_LEADER_EPOCH) {
+ log.debug("Error in fetch for partition {}: {}", tp,
error.exceptionName());
+ requestMetadataUpdate(metadata, subscriptions, tp);
+ } else if (error == Errors.REPLICA_NOT_AVAILABLE ||
error == Errors.KAFKA_STORAGE_ERROR ||
- error == Errors.FENCED_LEADER_EPOCH ||
error == Errors.OFFSET_NOT_AVAILABLE) {
log.debug("Error in fetch for partition {}: {}", tp,
error.exceptionName());
requestMetadataUpdate(metadata, subscriptions, tp);
+ subscriptions.awaitUpdate(tp);
Review Comment:
With this change, if the replica is not available, we will flag the
partition as awaiting a metadata update. Is this a key part of this change? Why
don't we want the first `if` block to also await an update?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##########
@@ -967,7 +984,8 @@ private boolean
maybeValidatePosition(Metadata.LeaderAndEpoch currentLeaderAndEp
return false;
}
- if (position != null &&
!position.currentLeader.equals(currentLeaderAndEpoch)) {
+ if (position != null &&
+ (!position.currentLeader.equals(currentLeaderAndEpoch) ||
this.fetchState.equals(FetchStates.AWAIT_UPDATE))) {
Review Comment:
Not sure if using the helper method shortens the line length enough to avoid
wrapping 🤷♂️
```suggestion
if (position != null &&
(!position.currentLeader.equals(currentLeaderAndEpoch) || awaitingUpdate())) {
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -200,6 +200,9 @@ protected void handleFetchSuccess(final Node fetchTarget,
if (partitionData.currentLeader().leaderId() != -1 &&
partitionData.currentLeader().leaderEpoch() != -1) {
partitionsWithUpdatedLeaderInfo.put(partition, new
Metadata.LeaderIdAndEpoch(
Optional.of(partitionData.currentLeader().leaderId()),
Optional.of(partitionData.currentLeader().leaderEpoch())));
+ } else {
+ requestMetadataUpdate(metadata, subscriptions,
partition);
+ subscriptions.awaitUpdate(partition);
Review Comment:
With this change, we first request a metadata update, then flag our
partition as awaiting the metadata update whenever we encounter a
`NOT_LEADER_OR_FOLLOWER` or `FENCED_LEADER_EPOCH`. However, in the
`FetchCollector.handleInitializeErrors()` method, we only only request the
metadata update, but _don't_ flag the partition. Is that seeming inconsistency
intentional?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]