dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1580210319
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1336,6 +1422,233 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group The group to join.
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group
phase completes.
+ */
+ private CoordinatorResult<Void, Record> consumerGroupJoin(
+ ConsumerGroup group,
+ RequestContext context,
+ JoinGroupRequestData request,
+ CompletableFuture<JoinGroupResponseData> responseFuture
+ ) throws ApiException {
+ final long currentTimeMs = time.milliseconds();
+ final List<Record> records = new ArrayList<>();
+ final String groupId = request.groupId();
+ String memberId = request.memberId();
+ final String instanceId = request.groupInstanceId();
+ final JoinGroupRequestProtocolCollection protocols =
request.protocols();
+
+ final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+ final ConsumerPartitionAssignor.Subscription subscription =
deserializeSubscription(protocols);
+ List<ConsumerGroupHeartbeatRequestData.TopicPartitions>
ownedTopicPartitions = null;
+
+ if (!validateClassicGroupSessionTimeout(memberId,
request.sessionTimeoutMs(), responseFuture)) {
+ return EMPTY_RESULT;
+ }
+ throwIfConsumerGroupIsFull(group, memberId);
+ // TODO: need to throw an exception if group is dead?
+
+ // Get or create the member.
+ if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ ConsumerGroupMember member;
+ ConsumerGroupMember.Builder updatedMemberBuilder;
+ boolean staticMemberReplaced = false;
+ boolean newMemberCreated = false;
+ if (instanceId == null) {
+ if (isUnknownMember &&
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+ // If member id required, send back a response to call for
another join group request with allocated member id.
+ log.info("Dynamic member with unknown member id joins group
{}. " +
+ "Created a new member id {} and requesting the member
to rejoin with this id.",
+ group.groupId(), memberId);
+
+ responseFuture.complete(new JoinGroupResponseData()
+ .setMemberId(memberId)
+ .setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+ );
+ return EMPTY_RESULT;
+ } else {
+ // A dynamic member (re-)joins.
+ throwIfClassicProtocolIsNotSupported(group, memberId,
request.protocolType(), protocols);
+ newMemberCreated = !group.hasMember(memberId);
+
+ member = group.getOrMaybeCreateMember(memberId, true);
+ if (!newMemberCreated) ownedTopicPartitions =
validateGenerationIdAndGetOwnedPartition(member, subscription);
+ log.info("[GroupId {}] Member {} joins the consumer group.",
groupId, memberId);
+ updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+ }
+ } else {
+ throwIfClassicProtocolIsNotSupported(group, memberId,
request.protocolType(), protocols);
+ member = group.staticMember(instanceId);
+ // A new static member joins or the existing static member rejoins.
+ if (isUnknownMember) {
+ newMemberCreated = true;
+ if (member == null) {
+ // New static member.
+ member = group.getOrMaybeCreateMember(memberId, true);
+ updatedMemberBuilder = new
ConsumerGroupMember.Builder(member);
+ log.info("[GroupId {}] Static member {} with instance id
{} joins the consumer group.", groupId, memberId, instanceId);
+ } else {
+ // Replace the current static member.
+ staticMemberReplaced = true;
+ updatedMemberBuilder = new
ConsumerGroupMember.Builder(memberId)
+ .setAssignedPartitions(member.assignedPartitions());
+ ownedTopicPartitions =
validateGenerationIdAndGetOwnedPartition(member, subscription);
+ removeMemberAndCancelTimers(records, group.groupId(),
member.memberId());
+ log.info("[GroupId {}] Static member {} with instance id
{} re-joins the consumer group.", groupId, memberId, instanceId);
+ }
+ } else {
+ // Rejoining static member. Fence the static group with
unmatched member id.
+ throwIfStaticMemberIsUnknown(member, instanceId);
+ throwIfInstanceIdIsFenced(member, groupId, memberId,
instanceId);
+
+ ownedTopicPartitions =
validateGenerationIdAndGetOwnedPartition(member, subscription);
+ updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+ log.info("[GroupId {}] Static member {} with instance id {}
re-joins the consumer group.", groupId, memberId, instanceId);
+ }
+ }
+
+ int groupEpoch = group.groupEpoch();
+ Map<String, TopicMetadata> subscriptionMetadata =
group.subscriptionMetadata();
+
+ // 1. Create or update the member. If the member is new or has
changed, a ConsumerGroupMemberMetadataValue
+ // record is written to the __consumer_offsets partition to persist
the change. If the subscriptions have
+ // changed, the subscription metadata is updated and persisted by
writing a ConsumerGroupPartitionMetadataValue
+ // record to the __consumer_offsets partition. Finally, the group
epoch is bumped if the subscriptions have
+ // changed, and persisted by writing a ConsumerGroupMetadataValue
record to the partition.
+ ConsumerGroupMember updatedMember = updatedMemberBuilder
+ .maybeUpdateInstanceId(Optional.ofNullable(instanceId))
+ .maybeUpdateRackId(subscription.rackId())
+
.maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs()))
+ .maybeUpdateServerAssignorName(Optional.empty())
+
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics()))
+ .setClientId(context.clientId())
+ .setClientHost(context.clientAddress.toString())
+ .setSupportedClassicProtocols(protocols)
+ .build();
+
+ boolean bumpGroupEpoch = false;
+ if (!updatedMember.equals(member)) {
+ records.add(newMemberSubscriptionRecord(groupId, updatedMember));
+
+ if
(!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) {
+ log.info("[GroupId {}] Member {} updated its subscribed topics
to: {}.",
+ groupId, memberId, updatedMember.subscribedTopicNames());
+ bumpGroupEpoch = true;
+ }
+
+ if
(!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) {
+ log.info("[GroupId {}] Member {} updated its subscribed regex
to: {}.",
+ groupId, memberId, updatedMember.subscribedTopicRegex());
+ bumpGroupEpoch = true;
+ }
+ }
+
+ if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+ // The subscription metadata is updated in two cases:
+ // 1) The member has updated its subscriptions;
+ // 2) The refresh deadline has been reached.
+ subscriptionMetadata = group.computeSubscriptionMetadata(
+ member,
+ updatedMember,
+ metadataImage.topics(),
+ metadataImage.cluster()
+ );
+
+ if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+ log.info("[GroupId {}] Computed new subscription metadata:
{}.",
+ groupId, subscriptionMetadata);
+ bumpGroupEpoch = true;
+ records.add(newGroupSubscriptionMetadataRecord(groupId,
subscriptionMetadata));
+ }
+
+ if (bumpGroupEpoch) {
+ groupEpoch += 1;
+ records.add(newGroupEpochRecord(groupId, groupEpoch));
+ log.info("[GroupId {}] Bumped group epoch to {}.", groupId,
groupEpoch);
+ metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
+ }
+
+ group.setMetadataRefreshDeadline(currentTimeMs +
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
+ }
+
+ // 2. Update the target assignment if the group epoch is larger than
the target assignment epoch or a static member
+ // replaces an existing static member. The delta between the existing
and the new target assignment is persisted to the partition.
+ int targetAssignmentEpoch = group.assignmentEpoch();
+ Assignment targetAssignment = group.targetAssignment(memberId);
+ if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
+ String preferredServerAssignor =
group.computePreferredServerAssignor(
+ member,
+ updatedMember
+ ).orElse(defaultAssignor.name());
+ try {
+ TargetAssignmentBuilder assignmentResultBuilder =
+ new TargetAssignmentBuilder(groupId, groupEpoch,
assignors.get(preferredServerAssignor))
+ .withMembers(group.members())
+ .withStaticMembers(group.staticMembers())
+ .withSubscriptionMetadata(subscriptionMetadata)
+ .withTargetAssignment(group.targetAssignment())
+ .addOrUpdateMember(memberId, updatedMember);
+ TargetAssignmentBuilder.TargetAssignmentResult
assignmentResult;
+ // A new static member is replacing an older one with the same
subscriptions.
+ // We just need to remove the older member and add the newer
one. The new member should
+ // reuse the target assignment of the older member.
+ if (staticMemberReplaced) {
+ assignmentResult = assignmentResultBuilder
+ .removeMember(member.memberId())
+ .build();
+ } else {
+ assignmentResult = assignmentResultBuilder
+ .build();
+ }
+
+ log.info("[GroupId {}] Computed a new target assignment for
epoch {} with '{}' assignor: {}.",
+ groupId, groupEpoch, preferredServerAssignor,
assignmentResult.targetAssignment());
+
+ records.addAll(assignmentResult.records());
+ targetAssignment =
assignmentResult.targetAssignment().get(memberId);
+ targetAssignmentEpoch = groupEpoch;
+ } catch (PartitionAssignorException ex) {
+ String msg = String.format("Failed to compute a new target
assignment for epoch %d: %s",
+ groupEpoch, ex.getMessage());
+ log.error("[GroupId {}] {}.", groupId, msg);
+ throw new UnknownServerException(msg, ex);
+ }
+ }
+
+ // 3. Reconcile the member's assignment with the target assignment if
the member is not
+ // fully reconciled yet.
+ updatedMember = maybeReconcile(
+ groupId,
+ updatedMember,
+ group::currentPartitionEpoch,
+ targetAssignmentEpoch,
+ targetAssignment,
+ ownedTopicPartitions,
+ records
+ );
+
+ if (newMemberCreated) {
+ scheduleConsumerGroupSessionTimeout(groupId, memberId);
+ }
+
+ // TODO: what will happen to the client if the generation id in
response is 0?
+ responseFuture.complete(new JoinGroupResponseData()
+ .setMemberId(updatedMember.memberId())
+ .setGenerationId(updatedMember.memberEpoch())
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .setProtocolName(protocols.stream().findAny().get().name())
+ );
Review Comment:
The responseFuture is set like this because we need to comply with
`GroupCoordinatorService#joinGroup` where `responseFuture` is returned instead
of the result of `scheduleWriteOperation`. To resolve `send the response before
the records are persisted/committed`, we should either 1) know the group type
in GroupCoordinatorService to decide whether to directly return the result of
`scheduleWriteOperation` or complete the future only when the records have
been persisted
For the errors, since we need to set the member id in the response for some
errors and not for the others, the handling can be a little messy. Maybe we can
unify them to all throw exceptions when handling errors.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]