dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1587735026
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -10921,6 +10823,1158 @@ public void
testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() {
assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
}
+ @Test
+ public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() {
+ String groupId = "group-id";
+ String memberId = Uuid.randomUuid().toString();
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .build()))
+ .withConsumerGroupMaxSize(1)
+ .build();
+
+ JoinGroupRequestData request = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withDefaultProtocolTypeAndProtocols()
+ .build();
+
+ Exception ex = assertThrows(GroupMaxSizeReachedException.class, () ->
context.sendClassicGroupJoin(request));
+ assertEquals("The consumer group has reached its maximum capacity of 1
members.", ex.getMessage());
+ }
+
+ @Test
+ public void
testJoiningConsumerGroupThrowsExceptionIfProtocolIsNotSupported() {
+ String groupId = "group-id";
+ String memberId = Uuid.randomUuid().toString();
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+
.setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin"))
+ .build()))
+ .build();
+
+ JoinGroupRequestData requestWithEmptyProtocols = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .withDefaultProtocolTypeAndProtocols()
+ .build();
+ assertThrows(InconsistentGroupProtocolException.class, () ->
context.sendClassicGroupJoin(requestWithEmptyProtocols));
+
+ JoinGroupRequestData requestWithInvalidProtocolType = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withProtocolType("connect")
+ .withDefaultProtocolTypeAndProtocols()
+ .build();
+ assertThrows(InconsistentGroupProtocolException.class, () ->
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testJoiningConsumerGroupWithNewDynamicMember(boolean
replaySuccessfully) throws Exception {
+ String groupId = "group-id";
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ for (short version =
ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <=
ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) {
+ String memberId = Uuid.randomUuid().toString();
+ MockPartitionAssignor assignor = new
MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 2)
+ .addTopic(barTopicId, barTopicName, 1)
+ .addRacks()
+ .build())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withSubscriptionMetadata(new HashMap<String,
TopicMetadata>() {
+ {
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+ }
+ })
+ .withMember(new ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .build())
+ .withAssignment(memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1)))
+ .withAssignmentEpoch(10))
+ .build();
+
+ JoinGroupRequestData request = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(UNKNOWN_MEMBER_ID)
+
.withProtocols(GroupMetadataManagerTestContext.toConsumerProtocol(
+ Arrays.asList(fooTopicName, barTopicName),
+ Collections.emptyList(),
+ version))
+ .build();
+
+ // The first round of join request gets the new member id.
+ GroupMetadataManagerTestContext.JoinResult firstJoinResult =
context.sendClassicGroupJoin(
+ request,
+ true
+ );
+ assertTrue(firstJoinResult.records.isEmpty());
+ // Simulate a successful write to the log.
+ firstJoinResult.appendFuture.complete(null);
+
+ assertTrue(firstJoinResult.joinFuture.isDone());
+ assertEquals(Errors.MEMBER_ID_REQUIRED.code(),
firstJoinResult.joinFuture.get().errorCode());
+ String newMemberId = firstJoinResult.joinFuture.get().memberId();
+
+ assignor.prepareGroupAssignment(new GroupAssignment(
+ new HashMap<String, MemberAssignment>() {
+ {
+ put(memberId, new MemberAssignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0)
+ )));
+ put(newMemberId, new MemberAssignment(mkAssignment(
+ mkTopicAssignment(barTopicId, 0)
+ )));
+ }
+ }
+ ));
+
+ JoinGroupRequestData secondRequest = new JoinGroupRequestData()
+ .setGroupId(request.groupId())
+ .setMemberId(newMemberId)
+ .setProtocolType(request.protocolType())
+ .setProtocols(request.protocols())
+ .setSessionTimeoutMs(request.sessionTimeoutMs())
+ .setRebalanceTimeoutMs(request.rebalanceTimeoutMs())
+ .setReason(request.reason());
+
+ GroupMetadataManagerTestContext.JoinResult secondJoinResult =
context.sendClassicGroupJoin(
+ secondRequest,
+ true
+ );
+
+ ConsumerGroupMember expectedMember = new
ConsumerGroupMember.Builder(newMemberId)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(0)
+ .setState(MemberState.STABLE)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setSubscribedTopicNames(Arrays.asList(fooTopicName,
barTopicName))
+ .setRebalanceTimeoutMs(500)
+ .setAssignedPartitions(assignor.targetPartitions(newMemberId))
+ .setSupportedClassicProtocols(request.protocols())
+ .build();
+
+ List<Record> expectedRecords = Arrays.asList(
+ RecordHelpers.newMemberSubscriptionRecord(groupId,
expectedMember),
+ RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new
HashMap<String, TopicMetadata>() {
+ {
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 1, mkMapOfPartitionRacks(1)));
+ }
+ }),
+ RecordHelpers.newGroupEpochRecord(groupId, 11),
+
+ RecordHelpers.newTargetAssignmentRecord(groupId, memberId,
assignor.targetPartitions(memberId)),
+ RecordHelpers.newTargetAssignmentRecord(groupId, newMemberId,
assignor.targetPartitions(newMemberId)),
+
+ RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+ RecordHelpers.newCurrentAssignmentRecord(groupId,
expectedMember)
+ );
+ assertRecordsEquals(expectedRecords.subList(0, 3),
secondJoinResult.records.subList(0, 3));
+ assertUnorderedListEquals(expectedRecords.subList(3, 5),
secondJoinResult.records.subList(3, 5));
+ assertRecordsEquals(expectedRecords.subList(5, 7),
secondJoinResult.records.subList(5, 7));
+
+ if (replaySuccessfully) {
+ secondJoinResult.appendFuture.complete(null);
+ assertTrue(secondJoinResult.joinFuture.isDone());
+ assertEquals(
+ new JoinGroupResponseData()
+ .setMemberId(newMemberId)
+ .setGenerationId(11)
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .setProtocolName("range"),
+ secondJoinResult.joinFuture.get()
+ );
+
+ context.assertSessionTimeout(groupId, newMemberId,
request.sessionTimeoutMs());
+ context.assertSyncTimeout(groupId, newMemberId,
request.rebalanceTimeoutMs());
+ } else {
+ secondJoinResult.appendFuture.completeExceptionally(new
NotLeaderOrFollowerException());
+ assertEquals(
+ new JoinGroupResponseData()
+ .setErrorCode(Errors.NOT_COORDINATOR.code()),
+ secondJoinResult.joinFuture.get()
+ );
+ context.assertNoSessionTimeout(groupId, newMemberId);
+ context.assertNoSyncTimeout(groupId, newMemberId);
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testJoiningConsumerGroupWithNewStaticMember(boolean
replaySuccessfully) throws Exception {
+ String groupId = "group-id";
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ for (short version =
ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <=
ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) {
+ String memberId = Uuid.randomUuid().toString();
+ String instanceId = "instance-id-" + version;
+ MockPartitionAssignor assignor = new
MockPartitionAssignor("range");
+ assignor.prepareGroupAssignment(new
GroupAssignment(Collections.emptyMap()));
Review Comment:
This is because the new static member uses the newly generated member id. We
don't know what the member id will be when preparing the assignment.
We might need another test that makes the existing static member join and
compute a new target assignment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]