dongnuo123 commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1610728790
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4274,6 +4343,81 @@ private void validateClassicGroupHeartbeat(
}
}
+ /**
+ * Handle a classic group HeartbeatRequest to a consumer group. A response
with
+ * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller
than the
+ * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member
is in
+ * UNRELEASED_PARTITIONS and all its partitions pending assignment are
free.
+ *
+ * @param group The ConsumerGroup.
+ * @param context The request context.
+ * @param request The actual Heartbeat request.
+ *
+ * @return The coordinator result that contains the heartbeat response.
+ */
+ private CoordinatorResult<HeartbeatResponseData, CoordinatorRecord>
classicGroupHeartbeatToConsumerGroup(
+ ConsumerGroup group,
+ RequestContext context,
+ HeartbeatRequestData request
+ ) throws UnknownMemberIdException, FencedInstanceIdException,
IllegalGenerationException {
+ String groupId = request.groupId();
+ String memberId = request.memberId();
+ String instanceId = request.groupInstanceId();
+ ConsumerGroupMember member = validateConsumerGroupMember(group,
memberId, instanceId);
+
+ throwIfMemberDoesNotUseClassicProtocol(member);
+ throwIfGenerationIdUnmatched(memberId, member.memberEpoch(),
request.generationId());
+
+ scheduleConsumerGroupSessionTimeout(groupId, memberId,
member.classicProtocolSessionTimeout().get());
+
+ Errors error = Errors.NONE;
+ // The member should rejoin if any of the following conditions is met.
+ // 1) The group epoch is bumped so the member need to rejoin to catch
up.
+ // 2) The member needs to revoke some partitions and rejoin to
reconcile with the new epoch.
+ // 3) The member's partitions pending assignment are free, so it can
rejoin to get the complete assignment.
+ if (member.memberEpoch() < group.groupEpoch() ||
+ member.state() == MemberState.UNREVOKED_PARTITIONS ||
+ (member.state() == MemberState.UNRELEASED_PARTITIONS &&
!group.waitingOnUnreleasedPartition(member))) {
Review Comment:
> the helper checks that the latest state does in fact have all partitions
released but we want it to rejoin to get the updated assignment
Yes this is correct.
> Will this member be updated to STABLE state in the next
CurrentAssignmentBuilder#computeNextAssignment
Yes it will in the reconciliation part in the
`classicGroupJoinToConsumerGroup`
https://github.com/apache/kafka/blob/27a6c156c49e375edea0e6f33a35c64c615db1b5/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L1737-L1745
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]