lucasbru commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1525940513
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -911,9 +911,13 @@ void maybeReconcile() {
SortedSet<TopicIdPartition> assignedTopicIdPartitions =
findResolvableAssignmentAndTriggerMetadataUpdate();
final LocalAssignmentImpl resolvedAssignment = new
LocalAssignmentImpl(currentTargetAssignment.localEpoch,
assignedTopicIdPartitions);
- if (resolvedAssignment.equals(currentAssignment)) {
- log.debug("Ignoring reconciliation attempt. Target assignment
ready to reconcile {} " +
- "is equal to the member current assignment.",
resolvedAssignment);
+ if (currentAssignment != LocalAssignmentImpl.NONE &&
+ resolvedAssignment.localEpoch <= currentAssignment.localEpoch + 1
&&
+
resolvedAssignment.partitions.equals(currentAssignment.partitions)) {
+ log.debug("Ignoring reconciliation attempt. The resolvable
fragment of the target assignment {} " +
Review Comment:
I didn't want to talk about the local epoch here, since it's more of
implementation detail how to detect intermediate assignments. But then I should
log the local epoch I supposed. Updated it accordingly.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##########
@@ -1028,9 +1028,9 @@ public void
testNewEmptyAssignmentReplacesPreviousOneWaitingOnMetadata() {
verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
+ membershipManager.onHeartbeatRequestSent();
Review Comment:
Done
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##########
@@ -2279,22 +2277,23 @@ private MembershipManagerImpl
mockJoinAndReceiveAssignment(boolean expectSubscri
return mockJoinAndReceiveAssignment(expectSubscriptionUpdated,
createAssignment(expectSubscriptionUpdated));
}
- private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean
expectSubscriptionUpdated,
+ private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean
triggerReconciliation,
Review Comment:
Done
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -976,6 +974,10 @@ void maybeReconcile() {
}
revokeAndAssign(resolvedAssignment, assignedTopicIdPartitions,
revokedPartitions, addedPartitions);
+ }).whenComplete((__, error) -> {
+ if (error != null) {
+ log.error("Reconciliation failed.", error);
Review Comment:
Nope, the exception handling inside `revokeAndAssign` is only triggered if
the revoke and assign future fails. If `revokeAndAssign` fails outside the
future (in particular, inside `revokePartitions`, there is logic that may
fail), the exception falls through to here and was silently swallowed before,
which cost me an hour of debugging to find.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]