dajac commented on code in PR #16057:
URL: https://github.com/apache/kafka/pull/16057#discussion_r1613089003
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2041,6 +2027,36 @@ private <T> CoordinatorResult<T, CoordinatorRecord>
consumerGroupFenceMember(
}
}
+ /**
+ * Remove the member and maybe update the subscription metadata without
the removed member.
+ *
+ * @param group The ConsumerGroup.
+ * @param member The ConsumerGroupMember.
+ * @return The list of CoordinatorRecord.
+ */
+ private List<CoordinatorRecord>
removeMemberAndMaybeUpdateSubscriptionMetadata(
+ ConsumerGroup group,
+ ConsumerGroupMember member
+ ) {
+ List<CoordinatorRecord> records = new ArrayList<>();
+ removeMember(records, group.groupId(), member.memberId());
+
+ // We update the subscription metadata without the leaving member.
+ Map<String, TopicMetadata> subscriptionMetadata =
group.computeSubscriptionMetadata(
+ group.computeSubscribedTopicNames(member, null),
+ metadataImage.topics(),
+ metadataImage.cluster()
+ );
+
+ if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+ log.info("[GroupId {}] Computed new subscription metadata: {}.",
+ group.groupId(), subscriptionMetadata);
+ records.add(newGroupSubscriptionMetadataRecord(group.groupId(),
subscriptionMetadata));
+ }
Review Comment:
I think that we should rather do this once, after all the members are
processed. I suppose that we could have a method like
`computeSubscribedTopicNames` but which takes a list of members to remove.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4424,14 +4441,113 @@ private ConsumerGroupMember
validateConsumerGroupMember(
* @param context The request context.
* @param request The actual LeaveGroup request.
*
+ * @return The LeaveGroup response and the records to append.
+ */
+ public CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord>
classicGroupLeave(
+ RequestContext context,
+ LeaveGroupRequestData request
+ ) throws UnknownMemberIdException, GroupIdNotFoundException {
+ Group group = groups.get(request.groupId(), Long.MAX_VALUE);
+
+ if (group == null) {
+ throw new UnknownMemberIdException(String.format("Group %s not
found.", request.groupId()));
+ }
+
+ if (group.type() == CLASSIC) {
+ return classicGroupLeaveToClassicGroup((ClassicGroup) group,
context, request);
+ } else {
+ return classicGroupLeaveToConsumerGroup((ConsumerGroup) group,
context, request);
+ }
+ }
+
+ /**
+ * Handle a classic LeaveGroupRequest to a ConsumerGroup.
+ *
+ * @param group The ConsumerGroup.
+ * @param context The request context.
+ * @param request The actual LeaveGroup request.
+ *
+ * @return The LeaveGroup response and the records to append.
+ */
+ private CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord>
classicGroupLeaveToConsumerGroup(
+ ConsumerGroup group,
+ RequestContext context,
+ LeaveGroupRequestData request
+ ) throws UnknownMemberIdException, GroupIdNotFoundException {
+ List<MemberResponse> memberResponses = new ArrayList<>();
+ List<CoordinatorRecord> records = new ArrayList<>();
+ boolean hasValidLeaveGroupMember = false;
+
+ for (MemberIdentity memberIdentity: request.members()) {
+ String memberId = memberIdentity.memberId();
+ String instanceId = memberIdentity.groupInstanceId();
+ String reason = memberIdentity.reason() != null ?
memberIdentity.reason() : "not provided";
+
+ ConsumerGroupMember member;
+ try {
+ if (instanceId == null) {
+ member = group.getOrMaybeCreateMember(memberId, false);
+ throwIfMemberDoesNotUseClassicProtocol(member);
+
+ log.info("[Group {}] Static Member {} has left group " +
+ "through explicit `LeaveGroup` request; client
reason: {}",
+ group.groupId(), memberId, reason);
+ } else {
+ member = group.staticMember(instanceId);
+ throwIfStaticMemberIsUnknown(member, memberId);
+ // The LeaveGroup API allows administrative removal of
members by GroupInstanceId
+ // in which case we expect the MemberId to be undefined.
+ if (!UNKNOWN_MEMBER_ID.equals(memberId)) {
+ throwIfInstanceIdIsFenced(member, group.groupId(),
memberId, instanceId);
+ }
+ throwIfMemberDoesNotUseClassicProtocol(member);
+
+ log.info("[Group {}] Static Member {} with instance id {}
has left group " +
+ "through explicit `LeaveGroup` request; client
reason: {}",
+ group.groupId(), memberId, instanceId, reason);
+ }
+
+
records.addAll(removeMemberAndMaybeUpdateSubscriptionMetadata(group, member));
Review Comment:
nit: I would be better to pass the list to the method.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4424,14 +4441,113 @@ private ConsumerGroupMember
validateConsumerGroupMember(
* @param context The request context.
* @param request The actual LeaveGroup request.
*
+ * @return The LeaveGroup response and the records to append.
+ */
+ public CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord>
classicGroupLeave(
+ RequestContext context,
+ LeaveGroupRequestData request
+ ) throws UnknownMemberIdException, GroupIdNotFoundException {
+ Group group = groups.get(request.groupId(), Long.MAX_VALUE);
+
+ if (group == null) {
+ throw new UnknownMemberIdException(String.format("Group %s not
found.", request.groupId()));
+ }
+
+ if (group.type() == CLASSIC) {
+ return classicGroupLeaveToClassicGroup((ClassicGroup) group,
context, request);
+ } else {
+ return classicGroupLeaveToConsumerGroup((ConsumerGroup) group,
context, request);
+ }
+ }
+
+ /**
+ * Handle a classic LeaveGroupRequest to a ConsumerGroup.
+ *
+ * @param group The ConsumerGroup.
+ * @param context The request context.
+ * @param request The actual LeaveGroup request.
+ *
+ * @return The LeaveGroup response and the records to append.
+ */
+ private CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord>
classicGroupLeaveToConsumerGroup(
+ ConsumerGroup group,
+ RequestContext context,
+ LeaveGroupRequestData request
+ ) throws UnknownMemberIdException, GroupIdNotFoundException {
+ List<MemberResponse> memberResponses = new ArrayList<>();
+ List<CoordinatorRecord> records = new ArrayList<>();
+ boolean hasValidLeaveGroupMember = false;
+
+ for (MemberIdentity memberIdentity: request.members()) {
+ String memberId = memberIdentity.memberId();
+ String instanceId = memberIdentity.groupInstanceId();
+ String reason = memberIdentity.reason() != null ?
memberIdentity.reason() : "not provided";
+
+ ConsumerGroupMember member;
+ try {
+ if (instanceId == null) {
+ member = group.getOrMaybeCreateMember(memberId, false);
+ throwIfMemberDoesNotUseClassicProtocol(member);
+
+ log.info("[Group {}] Static Member {} has left group " +
+ "through explicit `LeaveGroup` request; client
reason: {}",
+ group.groupId(), memberId, reason);
+ } else {
+ member = group.staticMember(instanceId);
+ throwIfStaticMemberIsUnknown(member, memberId);
+ // The LeaveGroup API allows administrative removal of
members by GroupInstanceId
+ // in which case we expect the MemberId to be undefined.
+ if (!UNKNOWN_MEMBER_ID.equals(memberId)) {
+ throwIfInstanceIdIsFenced(member, group.groupId(),
memberId, instanceId);
+ }
+ throwIfMemberDoesNotUseClassicProtocol(member);
+
+ log.info("[Group {}] Static Member {} with instance id {}
has left group " +
+ "through explicit `LeaveGroup` request; client
reason: {}",
+ group.groupId(), memberId, instanceId, reason);
+ }
+
+
records.addAll(removeMemberAndMaybeUpdateSubscriptionMetadata(group, member));
+ cancelTimers(group.groupId(), member.memberId());
Review Comment:
Yes, I think that we can address it separately.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -12788,19 +12783,263 @@ public void
testConsumerGroupMemberUsingClassicProtocolFencedWhenJoinTimeout() {
MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord> timeout =
timeouts.get(0);
assertEquals(consumerGroupJoinKey(groupId, memberId), timeout.key);
assertRecordsEquals(
- Arrays.asList(
- // The member is removed.
-
CoordinatorRecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId),
-
CoordinatorRecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId),
-
CoordinatorRecordHelpers.newMemberSubscriptionTombstoneRecord(groupId,
memberId),
-
- // The group epoch is bumped.
- CoordinatorRecordHelpers.newGroupEpochRecord(groupId, 11)
- ),
+ Stream.concat(
+ removeMember(groupId, memberId).stream(),
+
Stream.of(CoordinatorRecordHelpers.newGroupEpochRecord(groupId, 11))
+ ).collect(Collectors.toList()),
timeout.result.records()
);
}
+ @Test
+ public void testConsumerGroupMemberUsingClassicProtocolBatchLeaveGroup() {
+ String groupId = "group-id";
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+ String memberId3 = Uuid.randomUuid().toString();
+ String instanceId2 = "instance-id-2";
+ String instanceId3 = "instance-id-3";
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocol1 =
Collections.singletonList(
+ new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+ .setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
+ Arrays.asList(fooTopicName, barTopicName),
+ null,
+ Collections.singletonList(new TopicPartition(fooTopicName,
0))
+ ))))
+ );
+ List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocol2 =
Collections.singletonList(
+ new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+ .setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
+ Arrays.asList(fooTopicName, barTopicName),
+ null,
+ Collections.singletonList(new TopicPartition(fooTopicName,
1))
+ ))))
+ );
+
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setRebalanceTimeoutMs(45000)
+ .setClassicMemberMetadata(
+ new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+ .setSessionTimeoutMs(5000)
+ .setSupportedProtocols(protocol1)
+ )
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0)))
+ .build();
+ ConsumerGroupMember member2 = new
ConsumerGroupMember.Builder(memberId2)
+ .setInstanceId(instanceId2)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(9)
+ .setPreviousMemberEpoch(8)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setRebalanceTimeoutMs(45000)
+ .setClassicMemberMetadata(
+ new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+ .setSessionTimeoutMs(5000)
+ .setSupportedProtocols(protocol2)
+ )
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
1)))
+ .build();
+ ConsumerGroupMember member3 = new
ConsumerGroupMember.Builder(memberId3)
+ .setInstanceId(instanceId3)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setRebalanceTimeoutMs(45000)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId,
0)))
+ .build();
+
+ // Consumer group with three members.
+ // Dynamic member 1 uses the classic protocol;
+ // static member 2 uses the classic protocol;
+ // static member 3 uses the consumer protocol.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(new
MockPartitionAssignor("range")))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 2)
+ .addTopic(barTopicId, barTopicName, 1)
+ .addRacks()
+ .build())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(member1)
+ .withMember(member2)
+ .withMember(member3)
+ .withAssignment(memberId1,
mkAssignment(mkTopicAssignment(fooTopicId, 0)))
+ .withAssignment(memberId2,
mkAssignment(mkTopicAssignment(fooTopicId, 1)))
+ .withAssignment(memberId3,
mkAssignment(mkTopicAssignment(barTopicId, 0)))
+ .withAssignmentEpoch(10))
+ .build();
+
context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE,
10);
+
context.replay(CoordinatorRecordHelpers.newGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
+ {
+ put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName,
2, mkMapOfPartitionRacks(2)));
+ put(barTopicName, new TopicMetadata(barTopicId, barTopicName,
1, mkMapOfPartitionRacks(1)));
+ }
+ }));
+
+ // Member 1 joins to schedule the sync timeout and the heartbeat
timeout.
+ context.sendClassicGroupJoin(
+ new GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(memberId1)
+ .withRebalanceTimeoutMs(member1.rebalanceTimeoutMs())
+
.withSessionTimeoutMs(member1.classicMemberMetadata().get().sessionTimeoutMs())
+
.withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol(
+ Arrays.asList(fooTopicName, barTopicName),
+ Collections.singletonList(new TopicPartition(fooTopicName,
0))))
+ .build()
+ ).appendFuture.complete(null);
+ context.assertSyncTimeout(groupId, memberId1,
member1.rebalanceTimeoutMs());
+ context.assertSessionTimeout(groupId, memberId1,
member1.classicMemberMetadata().get().sessionTimeoutMs());
+
+ // Member 2 heartbeats to schedule the join timeout and the heartbeat
timeout.
+ context.sendClassicGroupHeartbeat(
+ new HeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setGenerationId(9)
+ );
+ context.assertJoinTimeout(groupId, memberId2,
member2.rebalanceTimeoutMs());
+ context.assertSessionTimeout(groupId, memberId2,
member2.classicMemberMetadata().get().sessionTimeoutMs());
+
+ // Member 1 and member 2 leave the group.
+ CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord>
leaveResult = context.sendClassicGroupLeave(
+ new LeaveGroupRequestData()
+ .setGroupId("group-id")
+ .setMembers(Arrays.asList(
+ // Valid member id.
+ new MemberIdentity()
+ .setMemberId(memberId1),
+ new MemberIdentity()
+ .setGroupInstanceId(instanceId2),
+ // Member that doesn't use the classic protocol.
+ new MemberIdentity()
+ .setMemberId(memberId3)
+ .setGroupInstanceId(instanceId3),
+ // Unknown member ids.
+ new MemberIdentity()
+ .setMemberId("unknown-member-id"),
+ new MemberIdentity()
+ .setGroupInstanceId("unknown-instance-id"),
+ // Fenced instance id.
+ new MemberIdentity()
+ .setMemberId("unknown-member-id")
+ .setGroupInstanceId(instanceId3)
+ ))
+ );
+
+ assertEquals(
+ new LeaveGroupResponseData()
+ .setMembers(Arrays.asList(
+ new LeaveGroupResponseData.MemberResponse()
+ .setGroupInstanceId(null)
+ .setMemberId(memberId1),
+ new LeaveGroupResponseData.MemberResponse()
+ .setGroupInstanceId(instanceId2)
+ .setMemberId(memberId2),
+ new LeaveGroupResponseData.MemberResponse()
+ .setGroupInstanceId(instanceId3)
+ .setMemberId(memberId3)
+ .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()),
+ new LeaveGroupResponseData.MemberResponse()
+ .setGroupInstanceId(null)
+ .setMemberId("unknown-member-id")
+ .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()),
+ new LeaveGroupResponseData.MemberResponse()
+ .setGroupInstanceId("unknown-instance-id")
+ .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()),
+ new LeaveGroupResponseData.MemberResponse()
+ .setGroupInstanceId(instanceId3)
+ .setMemberId("unknown-member-id")
+ .setErrorCode(Errors.FENCED_INSTANCE_ID.code())
+ )),
+ leaveResult.response()
+ );
+
+ List<CoordinatorRecord> expectedRecords = new ArrayList<>();
+ expectedRecords.addAll(removeMember(groupId, memberId1));
+ expectedRecords.addAll(removeMember(groupId, memberId2));
+
expectedRecords.add(CoordinatorRecordHelpers.newGroupEpochRecord(groupId, 11));
Review Comment:
Should we also have a case which re-generates the subscription metadata?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -12722,15 +12722,10 @@ public void
testConsumerGroupMemberUsingClassicProtocolFencedWhenSessionTimeout(
MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord> timeout =
timeouts.get(0);
assertEquals(consumerGroupSessionTimeoutKey(groupId, memberId),
timeout.key);
assertRecordsEquals(
- Arrays.asList(
- // The member is removed.
-
CoordinatorRecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId),
-
CoordinatorRecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId),
-
CoordinatorRecordHelpers.newMemberSubscriptionTombstoneRecord(groupId,
memberId),
-
- // The group epoch is bumped.
- CoordinatorRecordHelpers.newGroupEpochRecord(groupId, 11)
- ),
+ Stream.concat(
Review Comment:
Personally, I prefer the previous way. It repeats things but it is easier to
read.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4424,14 +4441,113 @@ private ConsumerGroupMember
validateConsumerGroupMember(
* @param context The request context.
* @param request The actual LeaveGroup request.
*
+ * @return The LeaveGroup response and the records to append.
+ */
+ public CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord>
classicGroupLeave(
+ RequestContext context,
+ LeaveGroupRequestData request
+ ) throws UnknownMemberIdException, GroupIdNotFoundException {
+ Group group = groups.get(request.groupId(), Long.MAX_VALUE);
+
+ if (group == null) {
+ throw new UnknownMemberIdException(String.format("Group %s not
found.", request.groupId()));
+ }
+
+ if (group.type() == CLASSIC) {
+ return classicGroupLeaveToClassicGroup((ClassicGroup) group,
context, request);
+ } else {
+ return classicGroupLeaveToConsumerGroup((ConsumerGroup) group,
context, request);
+ }
+ }
+
+ /**
+ * Handle a classic LeaveGroupRequest to a ConsumerGroup.
+ *
+ * @param group The ConsumerGroup.
+ * @param context The request context.
+ * @param request The actual LeaveGroup request.
+ *
+ * @return The LeaveGroup response and the records to append.
+ */
+ private CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord>
classicGroupLeaveToConsumerGroup(
+ ConsumerGroup group,
+ RequestContext context,
+ LeaveGroupRequestData request
+ ) throws UnknownMemberIdException, GroupIdNotFoundException {
+ List<MemberResponse> memberResponses = new ArrayList<>();
+ List<CoordinatorRecord> records = new ArrayList<>();
+ boolean hasValidLeaveGroupMember = false;
+
+ for (MemberIdentity memberIdentity: request.members()) {
+ String memberId = memberIdentity.memberId();
+ String instanceId = memberIdentity.groupInstanceId();
+ String reason = memberIdentity.reason() != null ?
memberIdentity.reason() : "not provided";
+
+ ConsumerGroupMember member;
+ try {
+ if (instanceId == null) {
+ member = group.getOrMaybeCreateMember(memberId, false);
+ throwIfMemberDoesNotUseClassicProtocol(member);
+
+ log.info("[Group {}] Static Member {} has left group " +
+ "through explicit `LeaveGroup` request; client
reason: {}",
+ group.groupId(), memberId, reason);
+ } else {
+ member = group.staticMember(instanceId);
+ throwIfStaticMemberIsUnknown(member, memberId);
+ // The LeaveGroup API allows administrative removal of
members by GroupInstanceId
+ // in which case we expect the MemberId to be undefined.
+ if (!UNKNOWN_MEMBER_ID.equals(memberId)) {
+ throwIfInstanceIdIsFenced(member, group.groupId(),
memberId, instanceId);
+ }
+ throwIfMemberDoesNotUseClassicProtocol(member);
+
+ log.info("[Group {}] Static Member {} with instance id {}
has left group " +
+ "through explicit `LeaveGroup` request; client
reason: {}",
+ group.groupId(), memberId, instanceId, reason);
+ }
+
+
records.addAll(removeMemberAndMaybeUpdateSubscriptionMetadata(group, member));
+ cancelTimers(group.groupId(), member.memberId());
+ memberResponses.add(
+ new MemberResponse()
+ .setMemberId(member.memberId())
+ .setGroupInstanceId(instanceId)
+ );
+ hasValidLeaveGroupMember = true;
+ } catch (KafkaException e) {
+ memberResponses.add(
+ new MemberResponse()
+ .setMemberId(memberId)
+ .setGroupInstanceId(instanceId)
+ .setErrorCode(Errors.forException(e).code())
+ );
+ }
+ }
+
+ if (hasValidLeaveGroupMember) {
Review Comment:
nit: It seems that we could also use `!records.isEmpty()`.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4424,14 +4441,113 @@ private ConsumerGroupMember
validateConsumerGroupMember(
* @param context The request context.
* @param request The actual LeaveGroup request.
*
+ * @return The LeaveGroup response and the records to append.
+ */
+ public CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord>
classicGroupLeave(
+ RequestContext context,
+ LeaveGroupRequestData request
+ ) throws UnknownMemberIdException, GroupIdNotFoundException {
+ Group group = groups.get(request.groupId(), Long.MAX_VALUE);
+
+ if (group == null) {
+ throw new UnknownMemberIdException(String.format("Group %s not
found.", request.groupId()));
+ }
+
+ if (group.type() == CLASSIC) {
+ return classicGroupLeaveToClassicGroup((ClassicGroup) group,
context, request);
+ } else {
+ return classicGroupLeaveToConsumerGroup((ConsumerGroup) group,
context, request);
+ }
+ }
+
+ /**
+ * Handle a classic LeaveGroupRequest to a ConsumerGroup.
+ *
+ * @param group The ConsumerGroup.
+ * @param context The request context.
+ * @param request The actual LeaveGroup request.
+ *
+ * @return The LeaveGroup response and the records to append.
+ */
+ private CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord>
classicGroupLeaveToConsumerGroup(
+ ConsumerGroup group,
+ RequestContext context,
+ LeaveGroupRequestData request
+ ) throws UnknownMemberIdException, GroupIdNotFoundException {
+ List<MemberResponse> memberResponses = new ArrayList<>();
+ List<CoordinatorRecord> records = new ArrayList<>();
+ boolean hasValidLeaveGroupMember = false;
+
+ for (MemberIdentity memberIdentity: request.members()) {
+ String memberId = memberIdentity.memberId();
+ String instanceId = memberIdentity.groupInstanceId();
+ String reason = memberIdentity.reason() != null ?
memberIdentity.reason() : "not provided";
+
+ ConsumerGroupMember member;
+ try {
+ if (instanceId == null) {
+ member = group.getOrMaybeCreateMember(memberId, false);
+ throwIfMemberDoesNotUseClassicProtocol(member);
+
+ log.info("[Group {}] Static Member {} has left group " +
+ "through explicit `LeaveGroup` request; client
reason: {}",
+ group.groupId(), memberId, reason);
+ } else {
+ member = group.staticMember(instanceId);
+ throwIfStaticMemberIsUnknown(member, memberId);
Review Comment:
Should we rather pass `instanceId` instead of `memberId`?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4424,14 +4441,113 @@ private ConsumerGroupMember
validateConsumerGroupMember(
* @param context The request context.
* @param request The actual LeaveGroup request.
*
+ * @return The LeaveGroup response and the records to append.
+ */
+ public CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord>
classicGroupLeave(
+ RequestContext context,
+ LeaveGroupRequestData request
+ ) throws UnknownMemberIdException, GroupIdNotFoundException {
+ Group group = groups.get(request.groupId(), Long.MAX_VALUE);
+
+ if (group == null) {
+ throw new UnknownMemberIdException(String.format("Group %s not
found.", request.groupId()));
+ }
+
+ if (group.type() == CLASSIC) {
+ return classicGroupLeaveToClassicGroup((ClassicGroup) group,
context, request);
+ } else {
+ return classicGroupLeaveToConsumerGroup((ConsumerGroup) group,
context, request);
+ }
+ }
+
+ /**
+ * Handle a classic LeaveGroupRequest to a ConsumerGroup.
+ *
+ * @param group The ConsumerGroup.
+ * @param context The request context.
+ * @param request The actual LeaveGroup request.
+ *
+ * @return The LeaveGroup response and the records to append.
+ */
+ private CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord>
classicGroupLeaveToConsumerGroup(
+ ConsumerGroup group,
+ RequestContext context,
+ LeaveGroupRequestData request
+ ) throws UnknownMemberIdException, GroupIdNotFoundException {
Review Comment:
Could we really throw `GroupIdNotFoundException`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]