dajac commented on code in PR #15442:
URL: https://github.com/apache/kafka/pull/15442#discussion_r1517690579
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -9607,6 +9607,151 @@ public void
testOnConsumerGroupStateTransitionOnLoading() {
verify(context.metrics,
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY,
null);
}
+ @Test
+ public void testMaybeUpgradeEmptyGroup() {
+ String classicGroupId = "classic-group-id";
+ String consumerGroupId = "consumer-group-id";
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ ClassicGroup classicGroup = new ClassicGroup(
+ new LogContext(),
+ classicGroupId,
+ EMPTY,
+ context.time,
+ context.metrics
+ );
+ context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup,
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+ context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10));
+
+ // A consumer group can't be upgraded.
+ List<Record> records = new ArrayList<>();
+ context.groupMetadataManager.maybeUpgradeEmptyGroup(consumerGroupId,
records);
+ assertEquals(Collections.emptyList(), records);
+
+ // A non-empty classic group can't be upgraded.
+
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId,
false).transitionTo(PREPARING_REBALANCE);
+ context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId,
records);
+ assertEquals(Collections.emptyList(), records);
+
+ // An empty classic group can be upgraded.
+
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId,
false).transitionTo(EMPTY);
+ context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId,
records);
+
assertEquals(Arrays.asList(RecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId)),
records);
+ }
+
+ @Test
+ public void testMaybeDowngradeEmptyGroup() {
+ String classicGroupId = "classic-group-id";
+ String consumerGroupId = "consumer-group-id";
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ ClassicGroup classicGroup = new ClassicGroup(
+ new LogContext(),
+ classicGroupId,
+ EMPTY,
+ context.time,
+ context.metrics
+ );
+ context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup,
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+ context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10));
+
+ List<Record> records = new ArrayList<>();
+ context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId,
records);
+ assertEquals(Collections.emptyList(), records);
+
+ // A classic group can't be downgraded.
+ context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId,
records);
+ assertEquals(Collections.emptyList(), records);
+
+ // An empty consumer group can be upgraded.
+ context.groupMetadataManager.maybeDowngradeEmptyGroup(consumerGroupId,
records);
+ assertEquals(Arrays.asList(
+
RecordHelpers.newTargetAssignmentEpochTombstoneRecord(consumerGroupId),
+
RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(consumerGroupId),
+ RecordHelpers.newGroupEpochTombstoneRecord(consumerGroupId)),
records);
+ records.clear();
+
+ // A non-empty consumer group can't be downgraded.
+ ConsumerGroupMember.Builder memberBuilder = new
ConsumerGroupMember.Builder(Uuid.randomUuid().toString());
+
context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId,
memberBuilder.build()));
+ context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId,
records);
+ assertEquals(Collections.emptyList(), records);
+ }
+
+ @Test
+ public void testConsumerGroupHeartbeatWithEmptyClassicGroup() {
+ String classicGroupId = "classic-group-id";
+ String memberId = Uuid.randomUuid().toString();
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ assignor.prepareGroupAssignment(new
GroupAssignment(Collections.emptyMap()));
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(assignor))
+ .build();
+ ClassicGroup classicGroup = new ClassicGroup(
+ new LogContext(),
+ classicGroupId,
+ EMPTY,
+ context.time,
+ context.metrics
+ );
+ context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup,
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+
+
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId,
false).transitionTo(PREPARING_REBALANCE);
+ assertThrows(GroupIdNotFoundException.class, () ->
+ context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(classicGroupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setServerAssignor("range")
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setTopicPartitions(Collections.emptyList())));
+
+
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId,
false).transitionTo(EMPTY);
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
context.consumerGroupHeartbeat(
Review Comment:
I think that the weirdness comes from the fact that
`getOrMaybeCreateConsumerGroup` actually put the new groups into the map.
Ideally, the entire consumerGroupHeartbeat method should not alter the timeline
data structure and only rely on the replay to update it. I wonder if we should
fix this. If we do, they the replay in this case would be just fine. If we do
this, it also means that we don't have to remove the classic group from the map
too. We only need to generate the tombstone.
We don't have this issue on the classic group case because we don't replay
records.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]