lianetm commented on code in PR #16569:
URL: https://github.com/apache/kafka/pull/16569#discussion_r1681354714
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -459,8 +464,30 @@ public void
onHeartbeatSuccess(ConsumerGroupHeartbeatResponseData response) {
}
@Override
- public void onHeartbeatFailure() {
- metricsManager.maybeRecordRebalanceFailed();
+ public void onHeartbeatFailure(boolean retriable) {
+ if (!retriable) {
+ metricsManager.maybeRecordRebalanceFailed();
+ }
+ // The leave group request is sent out once (not retried), so we
should complete the leave
+ // operation once the request completes, regardless of the response.
+ if (state == MemberState.UNSUBSCRIBED &&
maybeCompleteLeaveInProgress()) {
+ log.warn("Member {} with epoch {} received a failed response to
the heartbeat to " +
+ "leave the group and completed the leave operation. ",
memberId, memberEpoch);
+ }
+ }
+
+ /**
+ * Complete the leave in progress (if any), if the member is UNSUBSCRIBED.
This is expected to
Review Comment:
the "maybe" in the name is not because of the `MemberState`, it's because of
the `leaveInProgress` that may be present or not. Even though most of the time
this is needed when the state is `UNSUBSCRIBED`, we also need to
`maybeCompleteLeaveInProgress` when the state is `FATAL`, so that's why I chose
not to include the state check at this level.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]