dajac commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1580929744
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1413,6 +1506,243 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group The group to join.
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group
phase completes.
+ */
+ private CoordinatorResult<Void, Record> classicGroupJoinToConsumerGroup(
+ ConsumerGroup group,
+ RequestContext context,
+ JoinGroupRequestData request,
+ CompletableFuture<JoinGroupResponseData> responseFuture
+ ) throws ApiException {
+ final long currentTimeMs = time.milliseconds();
+ final List<Record> records = new ArrayList<>();
+ final String groupId = request.groupId();
+ String memberId = request.memberId();
+ final String instanceId = request.groupInstanceId();
+ final JoinGroupRequestProtocolCollection protocols =
request.protocols();
+ final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+
+ throwIfConsumerGroupIsFull(group, memberId);
+ throwIfClassicProtocolIsNotSupported(group, memberId,
request.protocolType(), protocols);
+ // TODO: need to throw an exception if group is dead?
+
+ // Get or create the member.
+ if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ ConsumerGroupMember member;
+ ConsumerGroupMember.Builder updatedMemberBuilder;
+ boolean staticMemberReplaced = false;
+ boolean newMemberCreated = false;
+ if (instanceId == null) {
+ // A dynamic member (re-)joins.
+ throwIfRequiresKnownMemberId(groupId, memberId, isUnknownMember,
context);
+ newMemberCreated = !group.hasMember(memberId);
+ member = group.getOrMaybeCreateMember(memberId, true);
+ log.info("[GroupId {}] Member {} joins the consumer group.",
groupId, memberId);
+ updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+ } else {
+ member = group.staticMember(instanceId);
+ // A new static member joins or the existing static member rejoins.
+ if (isUnknownMember) {
+ newMemberCreated = true;
+ if (member == null) {
+ // New static member.
+ member = group.getOrMaybeCreateMember(memberId, true);
+ updatedMemberBuilder = new
ConsumerGroupMember.Builder(member);
+ log.info("[GroupId {}] Static member {} with instance id
{} joins the consumer group.", groupId, memberId, instanceId);
+ } else {
+ // Replace the current static member.
+ staticMemberReplaced = true;
+ updatedMemberBuilder = new
ConsumerGroupMember.Builder(memberId)
+ .setAssignedPartitions(member.assignedPartitions());
+ removeMember(records, groupId, member.memberId());
+ log.info("[GroupId {}] Static member with unknown member
id and instance id {} re-joins the consumer group. " +
+ "Created a new member {} to replace the existing
member {}.", groupId, instanceId, memberId, member.memberId());
+ }
+ } else {
+ // Rejoining static member. Fence the static group with
unmatched member id.
+ throwIfStaticMemberIsUnknown(member, instanceId);
+ throwIfInstanceIdIsFenced(member, groupId, memberId,
instanceId);
+ updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+ log.info("[GroupId {}] Static member {} with instance id {}
re-joins the consumer group.", groupId, memberId, instanceId);
+ }
+ }
+
+ int groupEpoch = group.groupEpoch();
+ Map<String, TopicMetadata> subscriptionMetadata =
group.subscriptionMetadata();
+ final ConsumerPartitionAssignor.Subscription subscription =
deserializeSubscription(protocols);
+ final List<ConsumerGroupHeartbeatRequestData.TopicPartitions>
ownedTopicPartitions =
+ validateGenerationIdAndGetOwnedPartition(member, subscription);
+
+ // 1. Create or update the member. If the member is new or has
changed, a ConsumerGroupMemberMetadataValue
+ // record is written to the __consumer_offsets partition to persist
the change. If the subscriptions have
+ // changed, the subscription metadata is updated and persisted by
writing a ConsumerGroupPartitionMetadataValue
+ // record to the __consumer_offsets partition. Finally, the group
epoch is bumped if the subscriptions have
+ // changed, and persisted by writing a ConsumerGroupMetadataValue
record to the partition.
+ ConsumerGroupMember updatedMember = updatedMemberBuilder
+ .maybeUpdateInstanceId(Optional.ofNullable(instanceId))
+ .maybeUpdateRackId(subscription.rackId())
+
.maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs()))
+ .maybeUpdateServerAssignorName(Optional.empty())
+
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics()))
+ .setClientId(context.clientId())
+ .setClientHost(context.clientAddress.toString())
+ .setSupportedClassicProtocols(protocols)
+ .build();
+
+ boolean bumpGroupEpoch = updateMemberSubscription(groupId, memberId,
member, updatedMember, records);
+ if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+ // The subscription metadata is updated in two cases:
+ // 1) The member has updated its subscriptions;
+ // 2) The refresh deadline has been reached.
+ subscriptionMetadata = group.computeSubscriptionMetadata(
+ member,
+ updatedMember,
+ metadataImage.topics(),
+ metadataImage.cluster()
+ );
+
+ if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+ log.info("[GroupId {}] Computed new subscription metadata:
{}.",
+ groupId, subscriptionMetadata);
+ bumpGroupEpoch = true;
+ records.add(newGroupSubscriptionMetadataRecord(groupId,
subscriptionMetadata));
+ }
+
+ if (bumpGroupEpoch) {
+ groupEpoch += 1;
+ records.add(newGroupEpochRecord(groupId, groupEpoch));
+ log.info("[GroupId {}] Bumped group epoch to {}.", groupId,
groupEpoch);
+ metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
+ }
+
+ group.setMetadataRefreshDeadline(currentTimeMs +
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
+ }
+
+ // 2. Update the target assignment if the group epoch is larger than
the target assignment epoch or a static member
+ // replaces an existing static member. The delta between the existing
and the new target assignment is persisted to the partition.
+ int targetAssignmentEpoch = group.assignmentEpoch();
+ Assignment targetAssignment = group.targetAssignment(memberId);
+ if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
+ String preferredServerAssignor =
group.computePreferredServerAssignor(
+ member,
+ updatedMember
+ ).orElse(defaultAssignor.name());
+ try {
+ TargetAssignmentBuilder assignmentResultBuilder =
+ new TargetAssignmentBuilder(groupId, groupEpoch,
assignors.get(preferredServerAssignor))
+ .withMembers(group.members())
+ .withStaticMembers(group.staticMembers())
+ .withSubscriptionMetadata(subscriptionMetadata)
+ .withTargetAssignment(group.targetAssignment())
+ .addOrUpdateMember(memberId, updatedMember);
+ TargetAssignmentBuilder.TargetAssignmentResult
assignmentResult;
+ // A new static member is replacing an older one with the same
subscriptions.
+ // We just need to remove the older member and add the newer
one. The new member should
+ // reuse the target assignment of the older member.
+ if (staticMemberReplaced) {
+ assignmentResult = assignmentResultBuilder
+ .removeMember(member.memberId())
+ .build();
+ } else {
+ assignmentResult = assignmentResultBuilder
+ .build();
+ }
+
+ log.info("[GroupId {}] Computed a new target assignment for
epoch {} with '{}' assignor: {}.",
+ groupId, groupEpoch, preferredServerAssignor,
assignmentResult.targetAssignment());
+
+ records.addAll(assignmentResult.records());
+ targetAssignment =
assignmentResult.targetAssignment().get(memberId);
+ targetAssignmentEpoch = groupEpoch;
+ } catch (PartitionAssignorException ex) {
+ String msg = String.format("Failed to compute a new target
assignment for epoch %d: %s",
+ groupEpoch, ex.getMessage());
+ log.error("[GroupId {}] {}.", groupId, msg);
+ throw new UnknownServerException(msg, ex);
+ }
+ }
+
+ // 3. Reconcile the member's assignment with the target assignment if
the member is not
+ // fully reconciled yet.
+
+ /**
+ * TODO:
+ * joinGroup - sync timeout
+ * syncGroup - (join timeout)
+ * heartbeat - (join timeout)
+ * => scheduleConsumerGroupRebalanceTimeout is not necessary
+ */
+ updatedMember = maybeReconcile(
+ groupId,
+ updatedMember,
+ group::currentPartitionEpoch,
+ targetAssignmentEpoch,
+ targetAssignment,
+ ownedTopicPartitions,
+ records
+ );
+
+ if (newMemberCreated) {
+ scheduleConsumerGroupSessionTimeout(groupId, memberId);
+ }
+ scheduleConsumerGroupSyncTimeout(groupId, memberId,
request.rebalanceTimeoutMs());
+
+ responseFuture.complete(new JoinGroupResponseData()
+ .setMemberId(updatedMember.memberId())
+ .setGenerationId(updatedMember.memberEpoch())
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .setProtocolName(protocols.iterator().next().name())
+ );
+
+ return new CoordinatorResult<>(records);
+ }
+
+ /**
+ * Creates the member subscription record if the updatedMember is
different from
+ * the old member. Returns true if the
subscribedTopicNames/subscribedTopicRegex
+ * has changed.
+ *
+ * @param groupId The group id.
+ * @param memberId The member id.
+ * @param member The old member.
+ * @param updatedMember The updated member.
+ * @param records The list to accumulate any new records.
+ * @return A boolean indicating whether the updatedMember has a different
+ * subscribedTopicNames/subscribedTopicRegex from the old member.
+ */
+ private boolean updateMemberSubscription(
+ String groupId,
+ String memberId,
Review Comment:
nit: Could we get the member id from the updated member?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1413,6 +1506,243 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group The group to join.
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group
phase completes.
+ */
+ private CoordinatorResult<Void, Record> classicGroupJoinToConsumerGroup(
+ ConsumerGroup group,
+ RequestContext context,
+ JoinGroupRequestData request,
+ CompletableFuture<JoinGroupResponseData> responseFuture
+ ) throws ApiException {
+ final long currentTimeMs = time.milliseconds();
+ final List<Record> records = new ArrayList<>();
+ final String groupId = request.groupId();
+ String memberId = request.memberId();
+ final String instanceId = request.groupInstanceId();
+ final JoinGroupRequestProtocolCollection protocols =
request.protocols();
+ final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+
+ throwIfConsumerGroupIsFull(group, memberId);
+ throwIfClassicProtocolIsNotSupported(group, memberId,
request.protocolType(), protocols);
+ // TODO: need to throw an exception if group is dead?
+
+ // Get or create the member.
+ if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ ConsumerGroupMember member;
+ ConsumerGroupMember.Builder updatedMemberBuilder;
+ boolean staticMemberReplaced = false;
+ boolean newMemberCreated = false;
+ if (instanceId == null) {
+ // A dynamic member (re-)joins.
+ throwIfRequiresKnownMemberId(groupId, memberId, isUnknownMember,
context);
+ newMemberCreated = !group.hasMember(memberId);
+ member = group.getOrMaybeCreateMember(memberId, true);
+ log.info("[GroupId {}] Member {} joins the consumer group.",
groupId, memberId);
+ updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+ } else {
+ member = group.staticMember(instanceId);
+ // A new static member joins or the existing static member rejoins.
+ if (isUnknownMember) {
+ newMemberCreated = true;
+ if (member == null) {
+ // New static member.
+ member = group.getOrMaybeCreateMember(memberId, true);
+ updatedMemberBuilder = new
ConsumerGroupMember.Builder(member);
+ log.info("[GroupId {}] Static member {} with instance id
{} joins the consumer group.", groupId, memberId, instanceId);
+ } else {
+ // Replace the current static member.
+ staticMemberReplaced = true;
+ updatedMemberBuilder = new
ConsumerGroupMember.Builder(memberId)
+ .setAssignedPartitions(member.assignedPartitions());
+ removeMember(records, groupId, member.memberId());
+ log.info("[GroupId {}] Static member with unknown member
id and instance id {} re-joins the consumer group. " +
+ "Created a new member {} to replace the existing
member {}.", groupId, instanceId, memberId, member.memberId());
+ }
+ } else {
+ // Rejoining static member. Fence the static group with
unmatched member id.
+ throwIfStaticMemberIsUnknown(member, instanceId);
+ throwIfInstanceIdIsFenced(member, groupId, memberId,
instanceId);
+ updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+ log.info("[GroupId {}] Static member {} with instance id {}
re-joins the consumer group.", groupId, memberId, instanceId);
+ }
+ }
+
+ int groupEpoch = group.groupEpoch();
+ Map<String, TopicMetadata> subscriptionMetadata =
group.subscriptionMetadata();
+ final ConsumerPartitionAssignor.Subscription subscription =
deserializeSubscription(protocols);
+ final List<ConsumerGroupHeartbeatRequestData.TopicPartitions>
ownedTopicPartitions =
+ validateGenerationIdAndGetOwnedPartition(member, subscription);
+
+ // 1. Create or update the member. If the member is new or has
changed, a ConsumerGroupMemberMetadataValue
+ // record is written to the __consumer_offsets partition to persist
the change. If the subscriptions have
+ // changed, the subscription metadata is updated and persisted by
writing a ConsumerGroupPartitionMetadataValue
+ // record to the __consumer_offsets partition. Finally, the group
epoch is bumped if the subscriptions have
+ // changed, and persisted by writing a ConsumerGroupMetadataValue
record to the partition.
+ ConsumerGroupMember updatedMember = updatedMemberBuilder
+ .maybeUpdateInstanceId(Optional.ofNullable(instanceId))
+ .maybeUpdateRackId(subscription.rackId())
+
.maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs()))
+ .maybeUpdateServerAssignorName(Optional.empty())
+
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics()))
+ .setClientId(context.clientId())
+ .setClientHost(context.clientAddress.toString())
+ .setSupportedClassicProtocols(protocols)
+ .build();
+
+ boolean bumpGroupEpoch = updateMemberSubscription(groupId, memberId,
member, updatedMember, records);
+ if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+ // The subscription metadata is updated in two cases:
+ // 1) The member has updated its subscriptions;
+ // 2) The refresh deadline has been reached.
+ subscriptionMetadata = group.computeSubscriptionMetadata(
+ member,
+ updatedMember,
+ metadataImage.topics(),
+ metadataImage.cluster()
+ );
+
+ if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+ log.info("[GroupId {}] Computed new subscription metadata:
{}.",
+ groupId, subscriptionMetadata);
+ bumpGroupEpoch = true;
+ records.add(newGroupSubscriptionMetadataRecord(groupId,
subscriptionMetadata));
+ }
+
+ if (bumpGroupEpoch) {
+ groupEpoch += 1;
+ records.add(newGroupEpochRecord(groupId, groupEpoch));
+ log.info("[GroupId {}] Bumped group epoch to {}.", groupId,
groupEpoch);
+ metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
+ }
+
+ group.setMetadataRefreshDeadline(currentTimeMs +
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
+ }
+
+ // 2. Update the target assignment if the group epoch is larger than
the target assignment epoch or a static member
+ // replaces an existing static member. The delta between the existing
and the new target assignment is persisted to the partition.
+ int targetAssignmentEpoch = group.assignmentEpoch();
+ Assignment targetAssignment = group.targetAssignment(memberId);
+ if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
+ String preferredServerAssignor =
group.computePreferredServerAssignor(
+ member,
+ updatedMember
+ ).orElse(defaultAssignor.name());
+ try {
+ TargetAssignmentBuilder assignmentResultBuilder =
+ new TargetAssignmentBuilder(groupId, groupEpoch,
assignors.get(preferredServerAssignor))
+ .withMembers(group.members())
+ .withStaticMembers(group.staticMembers())
+ .withSubscriptionMetadata(subscriptionMetadata)
+ .withTargetAssignment(group.targetAssignment())
+ .addOrUpdateMember(memberId, updatedMember);
+ TargetAssignmentBuilder.TargetAssignmentResult
assignmentResult;
+ // A new static member is replacing an older one with the same
subscriptions.
+ // We just need to remove the older member and add the newer
one. The new member should
+ // reuse the target assignment of the older member.
+ if (staticMemberReplaced) {
+ assignmentResult = assignmentResultBuilder
+ .removeMember(member.memberId())
+ .build();
+ } else {
+ assignmentResult = assignmentResultBuilder
+ .build();
+ }
+
+ log.info("[GroupId {}] Computed a new target assignment for
epoch {} with '{}' assignor: {}.",
+ groupId, groupEpoch, preferredServerAssignor,
assignmentResult.targetAssignment());
+
+ records.addAll(assignmentResult.records());
+ targetAssignment =
assignmentResult.targetAssignment().get(memberId);
+ targetAssignmentEpoch = groupEpoch;
+ } catch (PartitionAssignorException ex) {
+ String msg = String.format("Failed to compute a new target
assignment for epoch %d: %s",
+ groupEpoch, ex.getMessage());
+ log.error("[GroupId {}] {}.", groupId, msg);
+ throw new UnknownServerException(msg, ex);
+ }
+ }
+
+ // 3. Reconcile the member's assignment with the target assignment if
the member is not
+ // fully reconciled yet.
+
+ /**
+ * TODO:
+ * joinGroup - sync timeout
+ * syncGroup - (join timeout)
+ * heartbeat - (join timeout)
+ * => scheduleConsumerGroupRebalanceTimeout is not necessary
+ */
+ updatedMember = maybeReconcile(
+ groupId,
+ updatedMember,
+ group::currentPartitionEpoch,
+ targetAssignmentEpoch,
+ targetAssignment,
+ ownedTopicPartitions,
+ records
+ );
+
+ if (newMemberCreated) {
+ scheduleConsumerGroupSessionTimeout(groupId, memberId);
+ }
Review Comment:
To simplify, I wonder if we should just consider the join/sync group as a
heartbeat signal too. What do you think?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1413,6 +1506,243 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group The group to join.
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group
phase completes.
+ */
+ private CoordinatorResult<Void, Record> classicGroupJoinToConsumerGroup(
+ ConsumerGroup group,
+ RequestContext context,
+ JoinGroupRequestData request,
+ CompletableFuture<JoinGroupResponseData> responseFuture
+ ) throws ApiException {
+ final long currentTimeMs = time.milliseconds();
+ final List<Record> records = new ArrayList<>();
+ final String groupId = request.groupId();
+ String memberId = request.memberId();
+ final String instanceId = request.groupInstanceId();
+ final JoinGroupRequestProtocolCollection protocols =
request.protocols();
+ final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+
+ throwIfConsumerGroupIsFull(group, memberId);
+ throwIfClassicProtocolIsNotSupported(group, memberId,
request.protocolType(), protocols);
+ // TODO: need to throw an exception if group is dead?
+
+ // Get or create the member.
+ if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ ConsumerGroupMember member;
+ ConsumerGroupMember.Builder updatedMemberBuilder;
+ boolean staticMemberReplaced = false;
+ boolean newMemberCreated = false;
+ if (instanceId == null) {
+ // A dynamic member (re-)joins.
+ throwIfRequiresKnownMemberId(groupId, memberId, isUnknownMember,
context);
+ newMemberCreated = !group.hasMember(memberId);
+ member = group.getOrMaybeCreateMember(memberId, true);
+ log.info("[GroupId {}] Member {} joins the consumer group.",
groupId, memberId);
+ updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+ } else {
+ member = group.staticMember(instanceId);
+ // A new static member joins or the existing static member rejoins.
+ if (isUnknownMember) {
+ newMemberCreated = true;
+ if (member == null) {
+ // New static member.
+ member = group.getOrMaybeCreateMember(memberId, true);
+ updatedMemberBuilder = new
ConsumerGroupMember.Builder(member);
+ log.info("[GroupId {}] Static member {} with instance id
{} joins the consumer group.", groupId, memberId, instanceId);
+ } else {
+ // Replace the current static member.
+ staticMemberReplaced = true;
+ updatedMemberBuilder = new
ConsumerGroupMember.Builder(memberId)
+ .setAssignedPartitions(member.assignedPartitions());
+ removeMember(records, groupId, member.memberId());
+ log.info("[GroupId {}] Static member with unknown member
id and instance id {} re-joins the consumer group. " +
+ "Created a new member {} to replace the existing
member {}.", groupId, instanceId, memberId, member.memberId());
+ }
+ } else {
+ // Rejoining static member. Fence the static group with
unmatched member id.
+ throwIfStaticMemberIsUnknown(member, instanceId);
+ throwIfInstanceIdIsFenced(member, groupId, memberId,
instanceId);
+ updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+ log.info("[GroupId {}] Static member {} with instance id {}
re-joins the consumer group.", groupId, memberId, instanceId);
+ }
+ }
+
+ int groupEpoch = group.groupEpoch();
+ Map<String, TopicMetadata> subscriptionMetadata =
group.subscriptionMetadata();
+ final ConsumerPartitionAssignor.Subscription subscription =
deserializeSubscription(protocols);
+ final List<ConsumerGroupHeartbeatRequestData.TopicPartitions>
ownedTopicPartitions =
+ validateGenerationIdAndGetOwnedPartition(member, subscription);
+
+ // 1. Create or update the member. If the member is new or has
changed, a ConsumerGroupMemberMetadataValue
+ // record is written to the __consumer_offsets partition to persist
the change. If the subscriptions have
+ // changed, the subscription metadata is updated and persisted by
writing a ConsumerGroupPartitionMetadataValue
+ // record to the __consumer_offsets partition. Finally, the group
epoch is bumped if the subscriptions have
+ // changed, and persisted by writing a ConsumerGroupMetadataValue
record to the partition.
+ ConsumerGroupMember updatedMember = updatedMemberBuilder
+ .maybeUpdateInstanceId(Optional.ofNullable(instanceId))
+ .maybeUpdateRackId(subscription.rackId())
+
.maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs()))
+ .maybeUpdateServerAssignorName(Optional.empty())
+
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics()))
+ .setClientId(context.clientId())
+ .setClientHost(context.clientAddress.toString())
+ .setSupportedClassicProtocols(protocols)
+ .build();
+
+ boolean bumpGroupEpoch = updateMemberSubscription(groupId, memberId,
member, updatedMember, records);
+ if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+ // The subscription metadata is updated in two cases:
+ // 1) The member has updated its subscriptions;
+ // 2) The refresh deadline has been reached.
+ subscriptionMetadata = group.computeSubscriptionMetadata(
+ member,
+ updatedMember,
+ metadataImage.topics(),
+ metadataImage.cluster()
+ );
+
+ if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+ log.info("[GroupId {}] Computed new subscription metadata:
{}.",
+ groupId, subscriptionMetadata);
+ bumpGroupEpoch = true;
+ records.add(newGroupSubscriptionMetadataRecord(groupId,
subscriptionMetadata));
+ }
+
+ if (bumpGroupEpoch) {
+ groupEpoch += 1;
+ records.add(newGroupEpochRecord(groupId, groupEpoch));
+ log.info("[GroupId {}] Bumped group epoch to {}.", groupId,
groupEpoch);
+ metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
+ }
+
+ group.setMetadataRefreshDeadline(currentTimeMs +
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
+ }
+
+ // 2. Update the target assignment if the group epoch is larger than
the target assignment epoch or a static member
+ // replaces an existing static member. The delta between the existing
and the new target assignment is persisted to the partition.
+ int targetAssignmentEpoch = group.assignmentEpoch();
+ Assignment targetAssignment = group.targetAssignment(memberId);
+ if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
+ String preferredServerAssignor =
group.computePreferredServerAssignor(
+ member,
+ updatedMember
+ ).orElse(defaultAssignor.name());
+ try {
+ TargetAssignmentBuilder assignmentResultBuilder =
+ new TargetAssignmentBuilder(groupId, groupEpoch,
assignors.get(preferredServerAssignor))
+ .withMembers(group.members())
+ .withStaticMembers(group.staticMembers())
+ .withSubscriptionMetadata(subscriptionMetadata)
+ .withTargetAssignment(group.targetAssignment())
+ .addOrUpdateMember(memberId, updatedMember);
+ TargetAssignmentBuilder.TargetAssignmentResult
assignmentResult;
+ // A new static member is replacing an older one with the same
subscriptions.
+ // We just need to remove the older member and add the newer
one. The new member should
+ // reuse the target assignment of the older member.
+ if (staticMemberReplaced) {
+ assignmentResult = assignmentResultBuilder
+ .removeMember(member.memberId())
+ .build();
+ } else {
+ assignmentResult = assignmentResultBuilder
+ .build();
+ }
+
+ log.info("[GroupId {}] Computed a new target assignment for
epoch {} with '{}' assignor: {}.",
+ groupId, groupEpoch, preferredServerAssignor,
assignmentResult.targetAssignment());
+
+ records.addAll(assignmentResult.records());
+ targetAssignment =
assignmentResult.targetAssignment().get(memberId);
+ targetAssignmentEpoch = groupEpoch;
+ } catch (PartitionAssignorException ex) {
+ String msg = String.format("Failed to compute a new target
assignment for epoch %d: %s",
+ groupEpoch, ex.getMessage());
+ log.error("[GroupId {}] {}.", groupId, msg);
+ throw new UnknownServerException(msg, ex);
+ }
+ }
+
+ // 3. Reconcile the member's assignment with the target assignment if
the member is not
+ // fully reconciled yet.
+
+ /**
+ * TODO:
+ * joinGroup - sync timeout
+ * syncGroup - (join timeout)
+ * heartbeat - (join timeout)
+ * => scheduleConsumerGroupRebalanceTimeout is not necessary
+ */
+ updatedMember = maybeReconcile(
+ groupId,
+ updatedMember,
+ group::currentPartitionEpoch,
+ targetAssignmentEpoch,
+ targetAssignment,
+ ownedTopicPartitions,
+ records
+ );
+
+ if (newMemberCreated) {
+ scheduleConsumerGroupSessionTimeout(groupId, memberId);
+ }
+ scheduleConsumerGroupSyncTimeout(groupId, memberId,
request.rebalanceTimeoutMs());
Review Comment:
nit: Should we put a small comment to explain this one?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1169,6 +1175,107 @@ private void
throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri
}
}
+ /**
+ * Validates if the received classic member protocols are supported by the
group.
+ *
+ * @param group The ConsumerGroup.
+ * @param memberId The joining member id.
+ * @param protocolType The joining member protocol type.
+ * @param protocols The joining member protocol collection.
+ */
+ private void throwIfClassicProtocolIsNotSupported(
+ ConsumerGroup group,
+ String memberId,
+ String protocolType,
+ JoinGroupRequestProtocolCollection protocols
+ ) {
+ if (!group.supportsClassicProtocols(protocolType,
ClassicGroupMember.plainProtocolSet(protocols))) {
Review Comment:
I am not sure about the validation here. My concern is that the first
classic member could join without any protocols based on the current
validation. It seems that we only require it to be non empty when the group is
not empty too. Should we also validate that `protocolType` is always
`ConsumerProtocol.PROTOCOL_TYPE` and `protocols` is not empty?
The logic in `supportsClassicProtocols` may not be 100% correct too.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1261,8 +1368,9 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
staticMemberReplaced = true;
updatedMemberBuilder = new
ConsumerGroupMember.Builder(memberId)
.setAssignedPartitions(member.assignedPartitions());
- removeMemberAndCancelTimers(records, group.groupId(),
member.memberId());
- log.info("[GroupId {}] Static member {} with instance id
{} re-joins the consumer group.", groupId, memberId, instanceId);
+ removeMember(records, groupId, member.memberId());
Review Comment:
nit: Could we put a comment here or in `removeMember` explaining why we
don't cancel the timers?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1413,6 +1506,243 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group The group to join.
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group
phase completes.
+ */
+ private CoordinatorResult<Void, Record> classicGroupJoinToConsumerGroup(
+ ConsumerGroup group,
+ RequestContext context,
+ JoinGroupRequestData request,
+ CompletableFuture<JoinGroupResponseData> responseFuture
+ ) throws ApiException {
+ final long currentTimeMs = time.milliseconds();
+ final List<Record> records = new ArrayList<>();
+ final String groupId = request.groupId();
+ String memberId = request.memberId();
+ final String instanceId = request.groupInstanceId();
+ final JoinGroupRequestProtocolCollection protocols =
request.protocols();
+ final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+
+ throwIfConsumerGroupIsFull(group, memberId);
+ throwIfClassicProtocolIsNotSupported(group, memberId,
request.protocolType(), protocols);
+ // TODO: need to throw an exception if group is dead?
+
+ // Get or create the member.
+ if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ ConsumerGroupMember member;
+ ConsumerGroupMember.Builder updatedMemberBuilder;
+ boolean staticMemberReplaced = false;
+ boolean newMemberCreated = false;
+ if (instanceId == null) {
+ // A dynamic member (re-)joins.
+ throwIfRequiresKnownMemberId(groupId, memberId, isUnknownMember,
context);
+ newMemberCreated = !group.hasMember(memberId);
+ member = group.getOrMaybeCreateMember(memberId, true);
+ log.info("[GroupId {}] Member {} joins the consumer group.",
groupId, memberId);
+ updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+ } else {
+ member = group.staticMember(instanceId);
+ // A new static member joins or the existing static member rejoins.
+ if (isUnknownMember) {
+ newMemberCreated = true;
+ if (member == null) {
+ // New static member.
+ member = group.getOrMaybeCreateMember(memberId, true);
+ updatedMemberBuilder = new
ConsumerGroupMember.Builder(member);
+ log.info("[GroupId {}] Static member {} with instance id
{} joins the consumer group.", groupId, memberId, instanceId);
+ } else {
+ // Replace the current static member.
+ staticMemberReplaced = true;
+ updatedMemberBuilder = new
ConsumerGroupMember.Builder(memberId)
+ .setAssignedPartitions(member.assignedPartitions());
+ removeMember(records, groupId, member.memberId());
+ log.info("[GroupId {}] Static member with unknown member
id and instance id {} re-joins the consumer group. " +
+ "Created a new member {} to replace the existing
member {}.", groupId, instanceId, memberId, member.memberId());
+ }
+ } else {
+ // Rejoining static member. Fence the static group with
unmatched member id.
+ throwIfStaticMemberIsUnknown(member, instanceId);
+ throwIfInstanceIdIsFenced(member, groupId, memberId,
instanceId);
+ updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+ log.info("[GroupId {}] Static member {} with instance id {}
re-joins the consumer group.", groupId, memberId, instanceId);
+ }
+ }
+
+ int groupEpoch = group.groupEpoch();
+ Map<String, TopicMetadata> subscriptionMetadata =
group.subscriptionMetadata();
+ final ConsumerPartitionAssignor.Subscription subscription =
deserializeSubscription(protocols);
+ final List<ConsumerGroupHeartbeatRequestData.TopicPartitions>
ownedTopicPartitions =
+ validateGenerationIdAndGetOwnedPartition(member, subscription);
+
+ // 1. Create or update the member. If the member is new or has
changed, a ConsumerGroupMemberMetadataValue
+ // record is written to the __consumer_offsets partition to persist
the change. If the subscriptions have
+ // changed, the subscription metadata is updated and persisted by
writing a ConsumerGroupPartitionMetadataValue
+ // record to the __consumer_offsets partition. Finally, the group
epoch is bumped if the subscriptions have
+ // changed, and persisted by writing a ConsumerGroupMetadataValue
record to the partition.
+ ConsumerGroupMember updatedMember = updatedMemberBuilder
+ .maybeUpdateInstanceId(Optional.ofNullable(instanceId))
+ .maybeUpdateRackId(subscription.rackId())
+
.maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs()))
+ .maybeUpdateServerAssignorName(Optional.empty())
+
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics()))
+ .setClientId(context.clientId())
+ .setClientHost(context.clientAddress.toString())
+ .setSupportedClassicProtocols(protocols)
+ .build();
+
+ boolean bumpGroupEpoch = updateMemberSubscription(groupId, memberId,
member, updatedMember, records);
+ if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+ // The subscription metadata is updated in two cases:
+ // 1) The member has updated its subscriptions;
+ // 2) The refresh deadline has been reached.
+ subscriptionMetadata = group.computeSubscriptionMetadata(
+ member,
+ updatedMember,
+ metadataImage.topics(),
+ metadataImage.cluster()
+ );
+
+ if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+ log.info("[GroupId {}] Computed new subscription metadata:
{}.",
+ groupId, subscriptionMetadata);
+ bumpGroupEpoch = true;
+ records.add(newGroupSubscriptionMetadataRecord(groupId,
subscriptionMetadata));
+ }
+
+ if (bumpGroupEpoch) {
+ groupEpoch += 1;
+ records.add(newGroupEpochRecord(groupId, groupEpoch));
+ log.info("[GroupId {}] Bumped group epoch to {}.", groupId,
groupEpoch);
+ metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
+ }
+
+ group.setMetadataRefreshDeadline(currentTimeMs +
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
+ }
+
+ // 2. Update the target assignment if the group epoch is larger than
the target assignment epoch or a static member
+ // replaces an existing static member. The delta between the existing
and the new target assignment is persisted to the partition.
+ int targetAssignmentEpoch = group.assignmentEpoch();
+ Assignment targetAssignment = group.targetAssignment(memberId);
+ if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
+ String preferredServerAssignor =
group.computePreferredServerAssignor(
+ member,
+ updatedMember
+ ).orElse(defaultAssignor.name());
+ try {
+ TargetAssignmentBuilder assignmentResultBuilder =
+ new TargetAssignmentBuilder(groupId, groupEpoch,
assignors.get(preferredServerAssignor))
+ .withMembers(group.members())
+ .withStaticMembers(group.staticMembers())
+ .withSubscriptionMetadata(subscriptionMetadata)
+ .withTargetAssignment(group.targetAssignment())
+ .addOrUpdateMember(memberId, updatedMember);
+ TargetAssignmentBuilder.TargetAssignmentResult
assignmentResult;
+ // A new static member is replacing an older one with the same
subscriptions.
+ // We just need to remove the older member and add the newer
one. The new member should
+ // reuse the target assignment of the older member.
+ if (staticMemberReplaced) {
+ assignmentResult = assignmentResultBuilder
+ .removeMember(member.memberId())
+ .build();
+ } else {
+ assignmentResult = assignmentResultBuilder
+ .build();
+ }
+
+ log.info("[GroupId {}] Computed a new target assignment for
epoch {} with '{}' assignor: {}.",
+ groupId, groupEpoch, preferredServerAssignor,
assignmentResult.targetAssignment());
+
+ records.addAll(assignmentResult.records());
+ targetAssignment =
assignmentResult.targetAssignment().get(memberId);
+ targetAssignmentEpoch = groupEpoch;
+ } catch (PartitionAssignorException ex) {
+ String msg = String.format("Failed to compute a new target
assignment for epoch %d: %s",
+ groupEpoch, ex.getMessage());
+ log.error("[GroupId {}] {}.", groupId, msg);
+ throw new UnknownServerException(msg, ex);
+ }
+ }
+
+ // 3. Reconcile the member's assignment with the target assignment if
the member is not
+ // fully reconciled yet.
+
+ /**
+ * TODO:
+ * joinGroup - sync timeout
+ * syncGroup - (join timeout)
+ * heartbeat - (join timeout)
+ * => scheduleConsumerGroupRebalanceTimeout is not necessary
+ */
Review Comment:
My understanding is that we need two timers:
1) One when we trigger the rebalance on the heartbeat to ensure that we
receive a join group;
2) One when we receive the join group to ensure that we receive the sync
group.
Is my understanding correct?
btw, we should remove the TODO from the code before we merge the PR. We
usually don't keep TODOs.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1413,6 +1506,243 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group The group to join.
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group
phase completes.
+ */
+ private CoordinatorResult<Void, Record> classicGroupJoinToConsumerGroup(
+ ConsumerGroup group,
+ RequestContext context,
+ JoinGroupRequestData request,
+ CompletableFuture<JoinGroupResponseData> responseFuture
+ ) throws ApiException {
+ final long currentTimeMs = time.milliseconds();
+ final List<Record> records = new ArrayList<>();
+ final String groupId = request.groupId();
+ String memberId = request.memberId();
Review Comment:
nit: I would put this non final one after the final ones.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2227,81 +2611,95 @@ public CoordinatorResult<Void, Record> classicGroupJoin(
RequestContext context,
JoinGroupRequestData request,
CompletableFuture<JoinGroupResponseData> responseFuture
+ ) {
+ throwIfClassicGroupSessionTimeoutInvalid(request.sessionTimeoutMs());
+
+ Group group = groups.get(request.groupId(), Long.MAX_VALUE);
+ if (group != null && group.type() == CONSUMER && !group.isEmpty()) {
Review Comment:
The `!group.isEmpty()` condition is a bit subtile to grasp at first. My
understanding is that we want empty consumer groups to be converted to classic
groups. Is my understanding correct? If it is, it may be good to add a small
comment to the `if` branch to explain it.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1413,6 +1506,243 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group The group to join.
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group
phase completes.
+ */
+ private CoordinatorResult<Void, Record> classicGroupJoinToConsumerGroup(
+ ConsumerGroup group,
+ RequestContext context,
+ JoinGroupRequestData request,
+ CompletableFuture<JoinGroupResponseData> responseFuture
+ ) throws ApiException {
+ final long currentTimeMs = time.milliseconds();
+ final List<Record> records = new ArrayList<>();
+ final String groupId = request.groupId();
+ String memberId = request.memberId();
+ final String instanceId = request.groupInstanceId();
+ final JoinGroupRequestProtocolCollection protocols =
request.protocols();
+ final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+
+ throwIfConsumerGroupIsFull(group, memberId);
+ throwIfClassicProtocolIsNotSupported(group, memberId,
request.protocolType(), protocols);
+ // TODO: need to throw an exception if group is dead?
+
+ // Get or create the member.
+ if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ ConsumerGroupMember member;
+ ConsumerGroupMember.Builder updatedMemberBuilder;
+ boolean staticMemberReplaced = false;
+ boolean newMemberCreated = false;
+ if (instanceId == null) {
+ // A dynamic member (re-)joins.
+ throwIfRequiresKnownMemberId(groupId, memberId, isUnknownMember,
context);
+ newMemberCreated = !group.hasMember(memberId);
+ member = group.getOrMaybeCreateMember(memberId, true);
+ log.info("[GroupId {}] Member {} joins the consumer group.",
groupId, memberId);
+ updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+ } else {
+ member = group.staticMember(instanceId);
+ // A new static member joins or the existing static member rejoins.
+ if (isUnknownMember) {
+ newMemberCreated = true;
+ if (member == null) {
+ // New static member.
+ member = group.getOrMaybeCreateMember(memberId, true);
+ updatedMemberBuilder = new
ConsumerGroupMember.Builder(member);
+ log.info("[GroupId {}] Static member {} with instance id
{} joins the consumer group.", groupId, memberId, instanceId);
+ } else {
+ // Replace the current static member.
+ staticMemberReplaced = true;
+ updatedMemberBuilder = new
ConsumerGroupMember.Builder(memberId)
+ .setAssignedPartitions(member.assignedPartitions());
+ removeMember(records, groupId, member.memberId());
+ log.info("[GroupId {}] Static member with unknown member
id and instance id {} re-joins the consumer group. " +
+ "Created a new member {} to replace the existing
member {}.", groupId, instanceId, memberId, member.memberId());
+ }
+ } else {
+ // Rejoining static member. Fence the static group with
unmatched member id.
+ throwIfStaticMemberIsUnknown(member, instanceId);
+ throwIfInstanceIdIsFenced(member, groupId, memberId,
instanceId);
+ updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+ log.info("[GroupId {}] Static member {} with instance id {}
re-joins the consumer group.", groupId, memberId, instanceId);
+ }
+ }
+
+ int groupEpoch = group.groupEpoch();
+ Map<String, TopicMetadata> subscriptionMetadata =
group.subscriptionMetadata();
+ final ConsumerPartitionAssignor.Subscription subscription =
deserializeSubscription(protocols);
+ final List<ConsumerGroupHeartbeatRequestData.TopicPartitions>
ownedTopicPartitions =
+ validateGenerationIdAndGetOwnedPartition(member, subscription);
+
+ // 1. Create or update the member. If the member is new or has
changed, a ConsumerGroupMemberMetadataValue
+ // record is written to the __consumer_offsets partition to persist
the change. If the subscriptions have
+ // changed, the subscription metadata is updated and persisted by
writing a ConsumerGroupPartitionMetadataValue
+ // record to the __consumer_offsets partition. Finally, the group
epoch is bumped if the subscriptions have
+ // changed, and persisted by writing a ConsumerGroupMetadataValue
record to the partition.
+ ConsumerGroupMember updatedMember = updatedMemberBuilder
+ .maybeUpdateInstanceId(Optional.ofNullable(instanceId))
+ .maybeUpdateRackId(subscription.rackId())
+
.maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs()))
+ .maybeUpdateServerAssignorName(Optional.empty())
+
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics()))
+ .setClientId(context.clientId())
+ .setClientHost(context.clientAddress.toString())
+ .setSupportedClassicProtocols(protocols)
+ .build();
+
+ boolean bumpGroupEpoch = updateMemberSubscription(groupId, memberId,
member, updatedMember, records);
+ if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+ // The subscription metadata is updated in two cases:
+ // 1) The member has updated its subscriptions;
+ // 2) The refresh deadline has been reached.
+ subscriptionMetadata = group.computeSubscriptionMetadata(
+ member,
+ updatedMember,
+ metadataImage.topics(),
+ metadataImage.cluster()
+ );
+
+ if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+ log.info("[GroupId {}] Computed new subscription metadata:
{}.",
+ groupId, subscriptionMetadata);
+ bumpGroupEpoch = true;
+ records.add(newGroupSubscriptionMetadataRecord(groupId,
subscriptionMetadata));
+ }
+
+ if (bumpGroupEpoch) {
+ groupEpoch += 1;
+ records.add(newGroupEpochRecord(groupId, groupEpoch));
+ log.info("[GroupId {}] Bumped group epoch to {}.", groupId,
groupEpoch);
+ metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
+ }
+
+ group.setMetadataRefreshDeadline(currentTimeMs +
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
+ }
+
+ // 2. Update the target assignment if the group epoch is larger than
the target assignment epoch or a static member
+ // replaces an existing static member. The delta between the existing
and the new target assignment is persisted to the partition.
+ int targetAssignmentEpoch = group.assignmentEpoch();
Review Comment:
I wonder if we could to the following in order to also reuse this code:
```
int targetAssignmentEpoch = group.assignmentEpoch();
Assignment targetAssignment = group.targetAssignment(memberId);
if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
targetAssignment = updateTargetAssignment(....);
targetAssignmentEpoch = groupEpoch;
}
```
It seems that it should work.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -10921,6 +10855,544 @@ public void
testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() {
assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
}
+ @Test
+ public void testConsumerGroupJoinThrowsExceptionIfGroupOverMaxSize() {
+ String groupId = "group-id";
+ String memberId = Uuid.randomUuid().toString();
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .build()))
+ .withConsumerGroupMaxSize(1)
+ .build();
+
+ JoinGroupRequestData request = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withDefaultProtocolTypeAndProtocols()
+ .build();
+
+ Exception ex = assertThrows(GroupMaxSizeReachedException.class, () ->
context.sendClassicGroupJoin(request));
+ assertEquals("The consumer group has reached its maximum capacity of 1
members.", ex.getMessage());
+ }
+
+ @Test
+ public void testConsumerGroupJoinInvalidSessionTimeout() throws Exception {
+ int minSessionTimeout = 50;
+ int maxSessionTimeout = 100;
+ String groupId = "group-id";
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withClassicGroupMinSessionTimeoutMs(minSessionTimeout)
+ .withClassicGroupMaxSessionTimeoutMs(maxSessionTimeout)
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10))
+ .build();
+
+ JoinGroupRequestData requestWithSmallSessionTimeout = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withSessionTimeoutMs(minSessionTimeout - 1)
+ .build();
+ assertThrows(InvalidSessionTimeoutException.class, () ->
context.sendClassicGroupJoin(requestWithSmallSessionTimeout));
+
+ JoinGroupRequestData requestWithLargeSessionTimeout = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withSessionTimeoutMs(maxSessionTimeout + 1)
+ .build();
+ assertThrows(InvalidSessionTimeoutException.class, () ->
context.sendClassicGroupJoin(requestWithLargeSessionTimeout));
+ }
+
+ @Test
+ public void testConsumerGroupJoinThrowsExceptionIfProtocolIsNotSupported()
{
+ String groupId = "group-id";
+ String memberId = Uuid.randomUuid().toString();
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+
.setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin"))
+ .build()))
+ .build();
+
+ JoinGroupRequestData requestWithEmptyProtocols = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .withDefaultProtocolTypeAndProtocols()
+ .build();
+ assertThrows(InconsistentGroupProtocolException.class, () ->
context.sendClassicGroupJoin(requestWithEmptyProtocols));
+
+ JoinGroupRequestData requestWithInvalidProtocolType = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withProtocolType("connect")
+ .withDefaultProtocolTypeAndProtocols()
+ .build();
+ assertThrows(InconsistentGroupProtocolException.class, () ->
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
+ }
+
+ @Test
+ public void testConsumerGroupJoinWithNewDynamicMember() throws Exception {
+ String groupId = "group-id";
+ String memberId = Uuid.randomUuid().toString();
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 2)
+ .addTopic(barTopicId, barTopicName, 1)
+ .addRacks()
+ .build())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withSubscriptionMetadata(new HashMap<String, TopicMetadata>()
{
+ {
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+ }
+ })
+ .withMember(new ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .build())
+ .withAssignment(memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1)))
+ .withAssignmentEpoch(10))
+ .build();
+
+ JoinGroupRequestData request = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withProtocols(GroupMetadataManagerTestContext.toRangeProtocol(
+ Arrays.asList(fooTopicName, barTopicName),
+ Collections.emptyList()))
+ .build();
+ assertThrows(MemberIdRequiredException.class, () ->
context.sendClassicGroupJoin(request, true));
+
+ // Simulate getting the new member id from the error response.
+ String newMemberId = Uuid.randomUuid().toString();
+
+ assignor.prepareGroupAssignment(new GroupAssignment(
+ new HashMap<String, MemberAssignment>() {
+ {
+ put(memberId, new MemberAssignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0)
+ )));
+ put(newMemberId, new MemberAssignment(mkAssignment(
+ mkTopicAssignment(barTopicId, 0)
+ )));
+ }
+ }
+ ));
+
+ JoinGroupRequestData secondRequest = new JoinGroupRequestData()
+ .setGroupId(request.groupId())
+ .setMemberId(newMemberId)
+ .setProtocolType(request.protocolType())
+ .setProtocols(request.protocols())
+ .setSessionTimeoutMs(request.sessionTimeoutMs())
+ .setRebalanceTimeoutMs(request.rebalanceTimeoutMs())
+ .setReason(request.reason());
+
+ GroupMetadataManagerTestContext.JoinResult secondJoinResult =
context.sendClassicGroupJoin(
+ secondRequest,
+ true
+ );
+
+ ConsumerGroupMember expectedMember = new
ConsumerGroupMember.Builder(newMemberId)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(0)
+ .setState(MemberState.STABLE)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName))
+ .setRebalanceTimeoutMs(500)
+ .setAssignedPartitions(assignor.targetPartitions(newMemberId))
+ .setSupportedClassicProtocols(request.protocols())
+ .build();
+
+ List<Record> expectedRecords = Arrays.asList(
+ RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember),
+ RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new
HashMap<String, TopicMetadata>() {
+ {
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 1, mkMapOfPartitionRacks(1)));
+ }
+ }),
+ RecordHelpers.newGroupEpochRecord(groupId, 11),
+
+ RecordHelpers.newTargetAssignmentRecord(groupId, memberId,
assignor.targetPartitions(memberId)),
+ RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId,
assignor.targetPartitions(newMemberId)),
+
+ RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+ RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember)
+ );
+ assertRecordsEquals(expectedRecords.subList(0, 3),
secondJoinResult.records.subList(0, 3));
+ assertUnorderedListEquals(expectedRecords.subList(3, 5),
secondJoinResult.records.subList(3, 5));
+ assertRecordsEquals(expectedRecords.subList(5, 7),
secondJoinResult.records.subList(5, 7));
+
+ assertTrue(secondJoinResult.joinFuture.isDone());
+ assertEquals(
+ new JoinGroupResponseData()
+ .setMemberId(newMemberId)
+ .setGenerationId(11)
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .setProtocolName("range"),
+ secondJoinResult.joinFuture.get()
+ );
+
+ context.assertSessionTimeout(groupId, newMemberId, 45000);
+ context.assertSyncTimeout(groupId, newMemberId,
request.rebalanceTimeoutMs());
+ }
+
+ @Test
+ public void testConsumerGroupJoinWithNewStaticMember() throws Exception {
+ String groupId = "group-id";
+ String memberId = Uuid.randomUuid().toString();
+ String instanceId = "instance-id";
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ assignor.prepareGroupAssignment(new
GroupAssignment(Collections.emptyMap()));
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 2)
+ .addTopic(barTopicId, barTopicName, 1)
+ .addRacks()
+ .build())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withSubscriptionMetadata(new HashMap<String, TopicMetadata>()
{
+ {
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+ }
+ })
+ .withMember(new ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .build())
+ .withAssignment(memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1)))
+ .withAssignmentEpoch(10))
+ .build();
+
+ JoinGroupRequestData request = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withGroupInstanceId(instanceId)
+ .withProtocols(GroupMetadataManagerTestContext.toRangeProtocol(
+ Arrays.asList(fooTopicName, barTopicName),
+ Collections.emptyList()))
+ .build();
+
+ GroupMetadataManagerTestContext.JoinResult joinResult =
context.sendClassicGroupJoin(request);
+ String newMemberId = joinResult.joinFuture.get().memberId();
+
+ ConsumerGroupMember expectedMember = new
ConsumerGroupMember.Builder(newMemberId)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(0)
+ .setInstanceId(instanceId)
+ .setState(MemberState.STABLE)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName))
+ .setRebalanceTimeoutMs(500)
+ .setSupportedClassicProtocols(request.protocols())
+ .build();
+
+ List<Record> expectedRecords = Arrays.asList(
+ RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember),
+ RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new
HashMap<String, TopicMetadata>() {
+ {
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 1, mkMapOfPartitionRacks(1)));
+ }
+ }),
+ RecordHelpers.newGroupEpochRecord(groupId, 11),
+
+ RecordHelpers.newTargetAssignmentRecord(groupId, memberId,
Collections.emptyMap()),
+ RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId,
Collections.emptyMap()),
+
+ RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+ RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember)
+ );
+ assertRecordsEquals(expectedRecords.subList(0, 3),
joinResult.records.subList(0, 3));
+ assertUnorderedListEquals(expectedRecords.subList(3, 5),
joinResult.records.subList(3, 5));
+ assertRecordsEquals(expectedRecords.subList(5, 7),
joinResult.records.subList(5, 7));
+
+ assertTrue(joinResult.joinFuture.isDone());
+ assertEquals(
+ new JoinGroupResponseData()
+ .setMemberId(newMemberId)
+ .setGenerationId(11)
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .setProtocolName("range"),
+ joinResult.joinFuture.get()
+ );
+
+ context.assertSessionTimeout(groupId, newMemberId, 45000);
+ context.assertSyncTimeout(groupId, newMemberId,
request.rebalanceTimeoutMs());
+ }
+
+ @Test
+ public void testConsumerGroupJoinReplacingExistingStaticMember() throws
Exception {
+ String groupId = "group-id";
+ String memberId = Uuid.randomUuid().toString();
+ String instanceId = "instance-id";
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ assignor.prepareGroupAssignment(new
GroupAssignment(Collections.emptyMap()));
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 2)
+ .addRacks()
+ .build())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withSubscriptionMetadata(new HashMap<String, TopicMetadata>()
{
+ {
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+ }
+ })
+ .withMember(new ConsumerGroupMember.Builder(memberId)
+ .setInstanceId(instanceId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+
.setSubscribedTopicNames(Collections.singletonList(fooTopicName))
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1)))
+ .build())
+ .withAssignment(memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1)))
+ .withAssignmentEpoch(10))
+ .build();
+
context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE,
10);
+
+ JoinGroupRequestData request = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withGroupInstanceId(instanceId)
+ .withProtocols(GroupMetadataManagerTestContext.toRangeProtocol(
+ Collections.singletonList(fooTopicName),
+ Collections.emptyList()))
+ .build();
+
+ // The static member joins with UNKNOWN_MEMBER_ID.
+ GroupMetadataManagerTestContext.JoinResult joinResult =
context.sendClassicGroupJoin(
+ request,
+ true
+ );
+ String newMemberId = joinResult.joinFuture.get().memberId();
+
+ ConsumerGroupMember expectedMember = new
ConsumerGroupMember.Builder(newMemberId)
+ .setMemberEpoch(0)
+ .setPreviousMemberEpoch(0)
+ .setInstanceId(instanceId)
+ .setState(MemberState.UNREVOKED_PARTITIONS)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setSubscribedTopicNames(Collections.singletonList(fooTopicName))
+ .setRebalanceTimeoutMs(500)
+ .setSupportedClassicProtocols(request.protocols())
+ .setPartitionsPendingRevocation(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1)))
+ .build();
+
+ List<Record> expectedRecords = Arrays.asList(
+ // Remove the old static member.
+ RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId,
memberId),
+ RecordHelpers.newTargetAssignmentTombstoneRecord(groupId,
memberId),
+ RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId,
memberId),
+
+ // Create the new static member.
+ RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember),
+ RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId,
Collections.emptyMap()),
+ RecordHelpers.newTargetAssignmentEpochRecord(groupId, 10),
+ RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember)
+ );
+ assertRecordsEquals(expectedRecords, joinResult.records);
+
+ assertTrue(joinResult.joinFuture.isDone());
+ assertEquals(
+ new JoinGroupResponseData()
+ .setMemberId(newMemberId)
+ .setGenerationId(0)
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .setProtocolName("range"),
+ joinResult.joinFuture.get()
+ );
+
+ context.assertSessionTimeout(groupId, newMemberId, 45000);
+ context.assertSyncTimeout(groupId, newMemberId,
request.rebalanceTimeoutMs());
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"", "instance-id"})
+ public void testConsumerGroupRejoinWithExistingMember(String instanceId)
throws Exception {
+ String groupId = "group-id";
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols =
+ GroupMetadataManagerTestContext.toRangeProtocol(
+ Arrays.asList(fooTopicName, barTopicName),
+ Arrays.asList(new TopicPartition(fooTopicName, 0), new
TopicPartition(fooTopicName, 1))
+ );
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 2)
+ .addTopic(barTopicId, barTopicName, 1)
+ .addRacks()
+ .build())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withSubscriptionMetadata(new HashMap<String, TopicMetadata>()
{
+ {
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 2, mkMapOfPartitionRacks(1)));
+ }
+ })
+ .withMember(new ConsumerGroupMember.Builder(memberId1)
+ .setInstanceId(instanceId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setRebalanceTimeoutMs(500)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setSupportedClassicProtocols(protocols)
+ .setSubscribedTopicNames(Arrays.asList(fooTopicName,
barTopicName))
+ .build())
+ .withMember(new ConsumerGroupMember.Builder(memberId2)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setRebalanceTimeoutMs(500)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setSubscribedTopicNames(Arrays.asList(fooTopicName,
barTopicName))
+ .build())
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1)))
+ .withAssignment(memberId2, mkAssignment(
+ mkTopicAssignment(barTopicId, 0)))
+ .withAssignmentEpoch(10))
+ .build();
+
context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE,
10);
+
+ JoinGroupRequestData request = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(memberId1)
+ .withGroupInstanceId(instanceId)
+ .withProtocols(protocols)
+ .build();
+
+ // The member rejoins with the same member id and protocols.
+ GroupMetadataManagerTestContext.JoinResult joinResult =
context.sendClassicGroupJoin(request);
+ assertEquals(Collections.emptyList(), joinResult.records);
+ assertEquals(
+ new JoinGroupResponseData()
+ .setMemberId(memberId1)
+ .setGenerationId(10)
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .setProtocolName("range"),
+ joinResult.joinFuture.get()
+ );
+ context.assertSyncTimeout(groupId, memberId1,
request.rebalanceTimeoutMs());
+ }
+
+ @Test
+ public void testConsumerGroupJoinStaticMemberWithUnknownInstanceId()
throws Exception {
+ String groupId = "group-id";
+ String instanceId = "instance-id";
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+ String fooTopicName = "foo";
+ String barTopicName = "bar";
+
+ JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols =
+ GroupMetadataManagerTestContext.toRangeProtocol(
+ Arrays.asList(fooTopicName, barTopicName),
+ Arrays.asList(new TopicPartition(fooTopicName, 0), new
TopicPartition(fooTopicName, 1))
+ );
+ // Set up a ConsumerGroup with no static member.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId1)
+ .setSupportedClassicProtocols(protocols)
+ .build())
+ .withMember(new ConsumerGroupMember.Builder(memberId2)
+ .build()))
+ .build();
+
+ // The member joins with an instance id.
+ JoinGroupRequestData request = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(memberId1)
+ .withGroupInstanceId(instanceId)
+ .withProtocols(protocols)
+ .build();
+
+ assertThrows(UnknownMemberIdException.class, () ->
context.sendClassicGroupJoin(request));
+ }
+
+ @Test
+ public void testConsumerGroupJoinStaticMemberWithUnmatchedMemberId()
throws Exception {
+ String groupId = "group-id";
+ String instanceId = "instance-id";
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+ String fooTopicName = "foo";
+ String barTopicName = "bar";
+
+ JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols =
+ GroupMetadataManagerTestContext.toRangeProtocol(
+ Arrays.asList(fooTopicName, barTopicName),
+ Arrays.asList(new TopicPartition(fooTopicName, 0), new
TopicPartition(fooTopicName, 1))
+ );
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId1)
+ .setInstanceId(instanceId)
+ .setSupportedClassicProtocols(protocols)
+ .build())
+ .withMember(new ConsumerGroupMember.Builder(memberId2)
+ .build()))
+ .build();
+
+ // The member joins with the same instance id and a different member
id.
+ JoinGroupRequestData request = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(Uuid.randomUuid().toString())
+ .withGroupInstanceId(instanceId)
+ .withProtocols(protocols)
+ .build();
+
+ assertThrows(FencedInstanceIdException.class, () ->
context.sendClassicGroupJoin(request));
+ }
+
Review Comment:
I wonder if we should add a few more test cases to also validated the
reconciliation part. Have we planned to do so? I can think of the following
ones:
* Test all the versions of the embedded consumer protocol;
* Test with various owned partitions: empty list, incomplete list, etc.;
* Test with the member is different states: Stable, Unrevoked partitions,
Unreleased partitions, etc.
* Test with updated subscriptions (should compute new assignment)
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1413,6 +1506,243 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group The group to join.
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group
phase completes.
+ */
+ private CoordinatorResult<Void, Record> classicGroupJoinToConsumerGroup(
+ ConsumerGroup group,
+ RequestContext context,
+ JoinGroupRequestData request,
+ CompletableFuture<JoinGroupResponseData> responseFuture
+ ) throws ApiException {
+ final long currentTimeMs = time.milliseconds();
+ final List<Record> records = new ArrayList<>();
+ final String groupId = request.groupId();
+ String memberId = request.memberId();
+ final String instanceId = request.groupInstanceId();
+ final JoinGroupRequestProtocolCollection protocols =
request.protocols();
+ final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+
+ throwIfConsumerGroupIsFull(group, memberId);
+ throwIfClassicProtocolIsNotSupported(group, memberId,
request.protocolType(), protocols);
+ // TODO: need to throw an exception if group is dead?
+
+ // Get or create the member.
+ if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ ConsumerGroupMember member;
+ ConsumerGroupMember.Builder updatedMemberBuilder;
+ boolean staticMemberReplaced = false;
+ boolean newMemberCreated = false;
+ if (instanceId == null) {
+ // A dynamic member (re-)joins.
+ throwIfRequiresKnownMemberId(groupId, memberId, isUnknownMember,
context);
+ newMemberCreated = !group.hasMember(memberId);
+ member = group.getOrMaybeCreateMember(memberId, true);
+ log.info("[GroupId {}] Member {} joins the consumer group.",
groupId, memberId);
+ updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+ } else {
+ member = group.staticMember(instanceId);
+ // A new static member joins or the existing static member rejoins.
+ if (isUnknownMember) {
+ newMemberCreated = true;
+ if (member == null) {
+ // New static member.
+ member = group.getOrMaybeCreateMember(memberId, true);
+ updatedMemberBuilder = new
ConsumerGroupMember.Builder(member);
+ log.info("[GroupId {}] Static member {} with instance id
{} joins the consumer group.", groupId, memberId, instanceId);
+ } else {
+ // Replace the current static member.
+ staticMemberReplaced = true;
+ updatedMemberBuilder = new
ConsumerGroupMember.Builder(memberId)
+ .setAssignedPartitions(member.assignedPartitions());
+ removeMember(records, groupId, member.memberId());
+ log.info("[GroupId {}] Static member with unknown member
id and instance id {} re-joins the consumer group. " +
+ "Created a new member {} to replace the existing
member {}.", groupId, instanceId, memberId, member.memberId());
+ }
+ } else {
+ // Rejoining static member. Fence the static group with
unmatched member id.
+ throwIfStaticMemberIsUnknown(member, instanceId);
+ throwIfInstanceIdIsFenced(member, groupId, memberId,
instanceId);
+ updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+ log.info("[GroupId {}] Static member {} with instance id {}
re-joins the consumer group.", groupId, memberId, instanceId);
+ }
+ }
+
+ int groupEpoch = group.groupEpoch();
+ Map<String, TopicMetadata> subscriptionMetadata =
group.subscriptionMetadata();
+ final ConsumerPartitionAssignor.Subscription subscription =
deserializeSubscription(protocols);
+ final List<ConsumerGroupHeartbeatRequestData.TopicPartitions>
ownedTopicPartitions =
+ validateGenerationIdAndGetOwnedPartition(member, subscription);
+
+ // 1. Create or update the member. If the member is new or has
changed, a ConsumerGroupMemberMetadataValue
+ // record is written to the __consumer_offsets partition to persist
the change. If the subscriptions have
+ // changed, the subscription metadata is updated and persisted by
writing a ConsumerGroupPartitionMetadataValue
+ // record to the __consumer_offsets partition. Finally, the group
epoch is bumped if the subscriptions have
+ // changed, and persisted by writing a ConsumerGroupMetadataValue
record to the partition.
+ ConsumerGroupMember updatedMember = updatedMemberBuilder
+ .maybeUpdateInstanceId(Optional.ofNullable(instanceId))
+ .maybeUpdateRackId(subscription.rackId())
+
.maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs()))
+ .maybeUpdateServerAssignorName(Optional.empty())
+
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics()))
+ .setClientId(context.clientId())
+ .setClientHost(context.clientAddress.toString())
+ .setSupportedClassicProtocols(protocols)
+ .build();
+
+ boolean bumpGroupEpoch = updateMemberSubscription(groupId, memberId,
member, updatedMember, records);
Review Comment:
nit: Could we format it as follow?
```
boolean bumpGroupEpoch = updateMemberSubscription(
groupId,
memberId,
member,
updatedMember,
records
);
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2227,81 +2611,95 @@ public CoordinatorResult<Void, Record> classicGroupJoin(
RequestContext context,
JoinGroupRequestData request,
CompletableFuture<JoinGroupResponseData> responseFuture
+ ) {
+ throwIfClassicGroupSessionTimeoutInvalid(request.sessionTimeoutMs());
Review Comment:
Seeing this, I wonder if we should move all the static request validations
to the group coordinator service. I does not make sense to occupy the
coordinator threads for this. What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]