lianetm commented on code in PR #15275:
URL: https://github.com/apache/kafka/pull/15275#discussion_r1478881882
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1392,4 +1402,16 @@ public void registerStateListener(MemberStateListener
listener) {
}
this.stateUpdatesListeners.add(listener);
}
+
+ /**
+ * If either a new target assignment or new metadata is available that we
have not yet attempted
+ * to reconcile, and we are currently in state RECONCILING, trigger
reconciliation.
+ */
+ @Override
+ public PollResult poll(final long currentTimeMs) {
+ if (state == MemberState.RECONCILING && attemptReconciliation) {
+ reconcile();
Review Comment:
Regarding the reconcile on poll, agree with @dajac that we were moving away
from the multiple triggers looking for a more unified approach, that would
reconcile not based on different events, but rather on a regular check of the
target assignment, that could be affected by all those events. So with this, we
have a `reconcile()` as the single place to determine if we should reconcile
([this initial
block](https://github.com/apache/kafka/blob/2158ab7b66e0b57bdaf7872511e48ff076160320/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java#L787-L822)),
1. no reconciliation in progress
2. target assignment different from the current assignment
3. resolved target different from the current assignment
In that sense, calling that reconcile regularly on every poll is probably
all we want, and aligns with what @dajac described, as it will run the checks
above and proceed. But the `attemptReconciliation` introduces something extra
that maybe we can simplify a bit.
If I get it right, all we want with that attempt logic is to avoid going
through the unresolved assignment and requesting metadata if we know we were
waiting for it already and it hasn't been received, right? In that sense, it
looks to me as an internal improvement to how we resolve metadata in the
`[findResolvableAssignmentAndTriggerMetadataUpdate](https://github.com/apache/kafka/blob/2158ab7b66e0b57bdaf7872511e48ff076160320/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java#L1012)`,
and not as another condition to reconcile. The
`findResolvableAssignmentAndTriggerMetadataUpdate` could consider if we are
awaiting metadata for a target assignment before checking the target and
requesting it again. It would be no-op if we are awaiting metadata for the
target, and the same common reconcile checks (check # 3) would ensure that a
reconciliation is not triggered if resolved target didn't change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]