lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1521933499
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -889,43 +914,36 @@ private void transitionToStale() {
*/
void maybeReconcile() {
if (targetAssignmentReconciled()) {
- log.debug("Ignoring reconciliation attempt. Target assignment is
equal to the " +
+ log.trace("Ignoring reconciliation attempt. Target assignment is
equal to the " +
"current assignment.");
return;
}
if (reconciliationInProgress) {
- log.debug("Ignoring reconciliation attempt. Another reconciliation
is already in progress. Assignment " +
+ log.trace("Ignoring reconciliation attempt. Another reconciliation
is already in progress. Assignment " +
currentTargetAssignment + " will be handled in the next
reconciliation loop.");
return;
}
// Find the subset of the target assignment that can be resolved to
topic names, and trigger a metadata update
// if some topic IDs are not resolvable.
SortedSet<TopicIdPartition> assignedTopicIdPartitions =
findResolvableAssignmentAndTriggerMetadataUpdate();
+ final LocalAssignment resolvedAssignment = new
LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions);
- SortedSet<TopicPartition> ownedPartitions = new
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
- ownedPartitions.addAll(subscriptions.assignedPartitions());
-
- // Keep copy of assigned TopicPartitions created from the
TopicIdPartitions that are
- // being reconciled. Needed for interactions with the centralized
subscription state that
- // does not support topic IDs yet, and for the callbacks.
- SortedSet<TopicPartition> assignedTopicPartitions =
toTopicPartitionSet(assignedTopicIdPartitions);
-
- // Check same assignment. Based on topic names for now, until topic
IDs are properly
- // supported in the centralized subscription state object. Note that
this check is
- // required to make sure that reconciliation is not triggered if the
assignment ready to
- // be reconciled is the same as the current one (even though the
member may remain
- // in RECONCILING state if it has some unresolved assignments).
- boolean sameAssignmentReceived =
assignedTopicPartitions.equals(ownedPartitions);
-
- if (sameAssignmentReceived) {
+ if (resolvedAssignment.equals(currentAssignment)) {
Review Comment:
This is short-circuiting the reconciliation if the same assignment is
received (same epoch, same partitions). But we also need to consider the case
that we get the same partitions assigned but in a different epoch. In that
case, we should not carry on with the full reconciliation process (there is
truly nothing to reconcile), but we should send an ack to the broker, so I
would expect we need a similar short-circuit for `if sameAssignmentInDiffEpoch
=> transitionToAck & return;`.
It's mainly thinking about the case:
- member owns t1-1 epoch 3
- receives new assignment [t1-1, t2-1] epoch 4, stuck reconciling, ex.
missing t2 metadata
- receives new assignment [t1-1] epoch 5 (ex. broker realized t2 has been
deleted)
- member does not need to reconcile t1-1, but should send an ack to the
broker with t1-1 that it received on a newer epoch
Makes sense? Not sure if I may be missing something regarding the
expectations
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]