This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 52687216c9a KAFKA-20442: Update consumer group assignment epoch when
last member leaves (#22045)
52687216c9a is described below
commit 52687216c9adc5cac16a0ba393c85a77d919be73
Author: David Jacot <[email protected]>
AuthorDate: Wed Apr 15 09:39:48 2026 +0200
KAFKA-20442: Update consumer group assignment epoch when last member leaves
(#22045)
When all members of a consumer group are fenced, the group epoch is
bumped but the assignment epoch is not updated. This leaves the empty
group with `groupEpoch > assignmentEpoch`, violating the invariant that
an empty group should have matching epochs. This patch writes a
`ConsumerGroupTargetAssignmentMetadataRecord` when the fence operation
results in an empty group so that the assignment epoch catches up to the
group epoch.
Reviewers: Sean Quah <[email protected]>, Andrew Schofield
<[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 11 ++++++++++
.../group/GroupMetadataManagerTest.java | 24 ++++++++++++++--------
2 files changed, 27 insertions(+), 8 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index a3663f34527..dc210c74724 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -217,6 +217,7 @@ import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.n
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentRecord;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentTombstoneRecord;
@@ -4379,6 +4380,16 @@ public class GroupMetadataManager {
records.add(newConsumerGroupEpochRecord(group.groupId(),
groupEpoch, groupMetadataHash));
log.info("[GroupId {}] Bumped group epoch to {} with metadata hash
{}.", group.groupId(), groupEpoch, groupMetadataHash);
+ // If all members are being fenced, the group becomes empty so
+ // we must also update the assignment epoch to match the group
+ // epoch. We use a timestamp of zero to mimic the behavior of
+ // a new group so that the assignment interval does not delay
+ // the next assignment computation.
+ if (group.members().size() == members.size()) {
+ records.add(newConsumerGroupTargetAssignmentMetadataRecord(
+ group.groupId(), groupEpoch, 0L));
+ }
+
for (ConsumerGroupMember member : members) {
cancelTimers(group.groupId(), member.memberId());
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 8bfcc7dae54..73937c714fc 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -1403,7 +1403,8 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId),
- GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
3, 0)
+ GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
3, 0),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
3, 0L)
);
assertRecordsEquals(expectedRecords, result.records());
assertEquals(Map.of(), context.groupMetadataManager.topicHashCache());
@@ -4563,7 +4564,8 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId),
-
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 3, 0)
+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 3, 0),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
3, 0L)
)
)
)),
@@ -4628,7 +4630,8 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId),
-
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
11, 0L)
)
)
)
@@ -4832,7 +4835,8 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId),
-
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 3, 0)
+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 3, 0),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
3, 0L)
)
)
)),
@@ -16129,7 +16133,8 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId),
// The group epoch is bumped.
-
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
11, 0L)
),
timeout.result().records()
);
@@ -16195,7 +16200,8 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId),
// The group epoch is bumped.
-
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
11, 0L)
),
timeout.result().records()
);
@@ -16405,7 +16411,8 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId3)
),
// Bump the group epoch.
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11,
0))
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11,
0)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
11, 0L))
);
assertUnorderedRecordsEquals(expectedRecords, leaveResult.records());
@@ -24822,7 +24829,8 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId2),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId2),
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
"bar*"),
-
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12, 0)
+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12, 0),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
12, 0L)
)
)
)),