chia7712 commented on code in PR #16898:
URL: https://github.com/apache/kafka/pull/16898#discussion_r1742205396
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3438,15 +3477,30 @@ public void replay(
String groupId = key.groupId();
String memberId = key.memberId();
- ConsumerGroup group = getOrMaybeCreatePersistedConsumerGroup(groupId,
false);
- ConsumerGroupMember oldMember = group.getOrMaybeCreateMember(memberId,
false);
-
if (value != null) {
+ ConsumerGroup group =
getOrMaybeCreatePersistedConsumerGroup(groupId, true);
Review Comment:
not sure why we decide to create new group. If it does create a new group,
the `getOrMaybeCreateMember` will throw `UnknownMemberIdException` subsequently.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -14285,6 +14285,77 @@ public void testConsumerGroupDynamicConfigs() {
context.assertNoRebalanceTimeout(groupId, memberId);
}
+ @Test
+ public void testReplayConsumerGroupMemberMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group still exists but the member is already gone. Replaying the
+ // ConsumerGroupMemberMetadata tombstone should be a no-op.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo",
10));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("foo",
"m1"));
+
+ // The group may not exist at all. Replaying the
ConsumerGroupMemberMetadata tombstone
+ // should a no-op.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("bar",
"m1"));
+ }
+
+ @Test
+ public void testReplayConsumerGroupMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group may not exist at all. Replaying the ConsumerGroupMetadata
tombstone
+ // should be a no-op.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord("foo"));
+ }
+
+ @Test
+ public void testReplayConsumerGroupPartitionMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group may not exist at all. Replaying the
ConsumerGroupPartitionMetadata tombstone
+ // should be a no-op.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord("foo"));
+ }
+
+ @Test
+ public void testReplayConsumerGroupTargetAssignmentMemberTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group may not exist at all. Replaying the
ConsumerGroupTargetAssignmentMember tombstone
+ // should be a no-op.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord("foo",
"m1"));
+ }
+
+ @Test
+ public void testReplayConsumerGroupTargetAssignmentMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group may not exist at all. Replaying the
ConsumerGroupTargetAssignmentMetadata tombstone
+ // should be a no-op.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord("foo"));
+ }
+
+ @Test
+ public void testReplayConsumerGroupCurrentMemberAssignmentTombstone() {
Review Comment:
Should we add UT for `ConsumerGroupCurrentMemberAssignmentKey`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]