This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 52687216c9a KAFKA-20442: Update consumer group assignment epoch when 
last member leaves (#22045)
52687216c9a is described below

commit 52687216c9adc5cac16a0ba393c85a77d919be73
Author: David Jacot <[email protected]>
AuthorDate: Wed Apr 15 09:39:48 2026 +0200

    KAFKA-20442: Update consumer group assignment epoch when last member leaves 
(#22045)
    
    When all members of a consumer group are fenced, the group epoch is
    bumped but the assignment epoch is not updated. This leaves the empty
    group with `groupEpoch > assignmentEpoch`, violating the invariant that
    an empty group should have matching epochs. This patch writes a
    `ConsumerGroupTargetAssignmentMetadataRecord` when the fence operation
    results in an empty group so that the assignment epoch catches up to the
    group epoch.
    
    Reviewers: Sean Quah <[email protected]>, Andrew Schofield
     <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    | 11 ++++++++++
 .../group/GroupMetadataManagerTest.java            | 24 ++++++++++++++--------
 2 files changed, 27 insertions(+), 8 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index a3663f34527..dc210c74724 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -217,6 +217,7 @@ import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.n
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentTombstoneRecord;
@@ -4379,6 +4380,16 @@ public class GroupMetadataManager {
             records.add(newConsumerGroupEpochRecord(group.groupId(), 
groupEpoch, groupMetadataHash));
             log.info("[GroupId {}] Bumped group epoch to {} with metadata hash 
{}.", group.groupId(), groupEpoch, groupMetadataHash);
 
+            // If all members are being fenced, the group becomes empty so
+            // we must also update the assignment epoch to match the group
+            // epoch. We use a timestamp of zero to mimic the behavior of
+            // a new group so that the assignment interval does not delay
+            // the next assignment computation.
+            if (group.members().size() == members.size()) {
+                records.add(newConsumerGroupTargetAssignmentMetadataRecord(
+                    group.groupId(), groupEpoch, 0L));
+            }
+
             for (ConsumerGroupMember member : members) {
                 cancelTimers(group.groupId(), member.memberId());
             }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 8bfcc7dae54..73937c714fc 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -1403,7 +1403,8 @@ public class GroupMetadataManagerTest {
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId),
             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId),
-            GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
3, 0)
+            GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
3, 0),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 3, 0L)
         );
         assertRecordsEquals(expectedRecords, result.records());
         assertEquals(Map.of(), context.groupMetadataManager.topicHashCache());
@@ -4563,7 +4564,8 @@ public class GroupMetadataManagerTest {
                         
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId),
                         
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId),
                         
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId),
-                        
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 3, 0)
+                        
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 3, 0),
+                        
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 3, 0L)
                     )
                 )
             )),
@@ -4628,7 +4630,8 @@ public class GroupMetadataManagerTest {
                             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId),
                             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId),
                             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId),
-                            
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
+                            
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0),
+                            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 11, 0L)
                         )
                     )
                 )
@@ -4832,7 +4835,8 @@ public class GroupMetadataManagerTest {
                         
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId),
                         
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId),
                         
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId),
-                        
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 3, 0)
+                        
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 3, 0),
+                        
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 3, 0L)
                     )
                 )
             )),
@@ -16129,7 +16133,8 @@ public class GroupMetadataManagerTest {
                 
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId),
 
                 // The group epoch is bumped.
-                
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
+                
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 11, 0L)
             ),
             timeout.result().records()
         );
@@ -16195,7 +16200,8 @@ public class GroupMetadataManagerTest {
                 
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId),
 
                 // The group epoch is bumped.
-                
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)
+                
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 11, 0L)
             ),
             timeout.result().records()
         );
@@ -16405,7 +16411,8 @@ public class GroupMetadataManagerTest {
                 
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId3)
             ),
             // Bump the group epoch.
-            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 
0))
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 
0)),
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 11, 0L))
         );
         assertUnorderedRecordsEquals(expectedRecords, leaveResult.records());
 
@@ -24822,7 +24829,8 @@ public class GroupMetadataManagerTest {
                         
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId2),
                         
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId2),
                         
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
 "bar*"),
-                        
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12, 0)
+                        
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12, 0),
+                        
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 12, 0L)
                     )
                 )
             )),

Reply via email to