This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 56be35588a8 KAFKA-20442: Update share group assignment epoch when last
member leaves (#22046)
56be35588a8 is described below
commit 56be35588a8943482c578a657e0c5104e9e2fed2
Author: David Jacot <[email protected]>
AuthorDate: Wed Apr 15 09:40:09 2026 +0200
KAFKA-20442: Update share group assignment epoch when last member leaves
(#22046)
When the last member of a share group is fenced, the group epoch is
bumped but the assignment epoch is not updated. This leaves the empty
group with `groupEpoch > assignmentEpoch`, violating the invariant that
an empty group should have matching epochs. This patch writes a
`ShareGroupTargetAssignmentMetadataRecord` when the fence operation
results in an empty group so that the assignment epoch catches up to the
group epoch.
Reviewers: Sushant Mahajan <[email protected]>, Sean Quah
<[email protected]>, Andrew Schofield <[email protected]>
---
.../apache/kafka/coordinator/group/GroupMetadataManager.java | 11 +++++++++++
.../kafka/coordinator/group/GroupMetadataManagerTest.java | 6 ++++--
2 files changed, 15 insertions(+), 2 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index dc210c74724..630080c2063 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -225,6 +225,7 @@ import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.n
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionRecord;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentMetadataRecord;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord;
import static org.apache.kafka.coordinator.group.Utils.assignmentToString;
import static
org.apache.kafka.coordinator.group.Utils.assignmentWithEpochsToString;
@@ -4432,6 +4433,16 @@ public class GroupMetadataManager {
int groupEpoch = group.groupEpoch() + 1;
records.add(newShareGroupEpochRecord(group.groupId(), groupEpoch,
groupMetadataHash));
+ // If this is the last member, the group becomes empty so we must
+ // also update the assignment epoch to match the group epoch. We
+ // use a timestamp of zero to mimic the behavior of a new group
+ // so that the assignment interval does not delay the next
+ // assignment computation.
+ if (group.members().size() == 1) {
+ records.add(newShareGroupTargetAssignmentMetadataRecord(
+ group.groupId(), groupEpoch, 0L));
+ }
+
cancelGroupSessionTimeout(group.groupId(), member.memberId());
return new CoordinatorResult<>(records, response);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 73937c714fc..8f69a687b51 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -4692,7 +4692,8 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentTombstoneRecord(groupId,
memberId),
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord(groupId,
memberId),
GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord(groupId,
memberId),
-
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 3, 0)
+
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 3, 0),
+
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentMetadataRecord(groupId,
3, 0L)
)
)
)),
@@ -4755,7 +4756,8 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentTombstoneRecord(groupId,
memberId),
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord(groupId,
memberId),
GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord(groupId,
memberId),
-
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11, 0)
+
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11, 0),
+
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentMetadataRecord(groupId,
11, 0L)
)
)
)