This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 56be35588a8 KAFKA-20442: Update share group assignment epoch when last 
member leaves (#22046)
56be35588a8 is described below

commit 56be35588a8943482c578a657e0c5104e9e2fed2
Author: David Jacot <[email protected]>
AuthorDate: Wed Apr 15 09:40:09 2026 +0200

    KAFKA-20442: Update share group assignment epoch when last member leaves 
(#22046)
    
    When the last member of a share group is fenced, the group epoch is
    bumped but the assignment epoch is not updated. This leaves the empty
    group with `groupEpoch > assignmentEpoch`, violating the invariant that
    an empty group should have matching epochs. This patch writes a
    `ShareGroupTargetAssignmentMetadataRecord` when the fence operation
    results in an empty group so that the assignment epoch catches up to the
    group epoch.
    
    Reviewers: Sushant Mahajan <[email protected]>, Sean Quah
     <[email protected]>, Andrew Schofield <[email protected]>
---
 .../apache/kafka/coordinator/group/GroupMetadataManager.java  | 11 +++++++++++
 .../kafka/coordinator/group/GroupMetadataManagerTest.java     |  6 ++++--
 2 files changed, 15 insertions(+), 2 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index dc210c74724..630080c2063 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -225,6 +225,7 @@ import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.n
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentMetadataRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord;
 import static org.apache.kafka.coordinator.group.Utils.assignmentToString;
 import static 
org.apache.kafka.coordinator.group.Utils.assignmentWithEpochsToString;
@@ -4432,6 +4433,16 @@ public class GroupMetadataManager {
         int groupEpoch = group.groupEpoch() + 1;
         records.add(newShareGroupEpochRecord(group.groupId(), groupEpoch, 
groupMetadataHash));
 
+        // If this is the last member, the group becomes empty so we must
+        // also update the assignment epoch to match the group epoch. We
+        // use a timestamp of zero to mimic the behavior of a new group
+        // so that the assignment interval does not delay the next
+        // assignment computation.
+        if (group.members().size() == 1) {
+            records.add(newShareGroupTargetAssignmentMetadataRecord(
+                group.groupId(), groupEpoch, 0L));
+        }
+
         cancelGroupSessionTimeout(group.groupId(), member.memberId());
 
         return new CoordinatorResult<>(records, response);
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 73937c714fc..8f69a687b51 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -4692,7 +4692,8 @@ public class GroupMetadataManagerTest {
                         
GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId),
                         
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord(groupId,
 memberId),
                         
GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId),
-                        
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 3, 0)
+                        
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 3, 0),
+                        
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentMetadataRecord(groupId,
 3, 0L)
                     )
                 )
             )),
@@ -4755,7 +4756,8 @@ public class GroupMetadataManagerTest {
                             
GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId),
                             
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord(groupId,
 memberId),
                             
GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId),
-                            
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11, 0)
+                            
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11, 0),
+                            
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentMetadataRecord(groupId,
 11, 0L)
                         )
                     )
                 )

Reply via email to