This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e08a15091d3 KAFKA-20442: Update streams group assignment epoch when 
last member leaves (#22047)
e08a15091d3 is described below

commit e08a15091d30cfd43d14cfe7ca8bc6260b7a1b5f
Author: David Jacot <[email protected]>
AuthorDate: Wed Apr 15 09:40:32 2026 +0200

    KAFKA-20442: Update streams group assignment epoch when last member leaves 
(#22047)
    
    When the last member of a streams group is fenced, the group epoch is
    bumped but the assignment epoch is not updated. This leaves the empty
    group with `groupEpoch > assignmentEpoch`, violating the invariant that
    an empty group should have matching epochs. This patch writes a
    `StreamsGroupTargetAssignmentMetadataRecord` when the fence operation
    results in an empty group so that the assignment epoch catches up to the
    group epoch.
    
    Reviewers: Lucas Brutschy <[email protected]>, Sean Quah
     <[email protected]>
---
 .../apache/kafka/coordinator/group/GroupMetadataManager.java  | 11 +++++++++++
 .../kafka/coordinator/group/GroupMetadataManagerTest.java     |  4 +++-
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 630080c2063..21fab03083f 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -249,6 +249,7 @@ import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecor
 import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsGroupMember.hasAssignedTasksChanged;
@@ -4505,6 +4506,16 @@ public class GroupMetadataManager {
         int groupEpoch = group.groupEpoch() + 1;
         records.add(newStreamsGroupMetadataRecord(group.groupId(), groupEpoch, 
group.metadataHash(), group.validatedTopologyEpoch(), 
group.lastAssignmentConfigs()));
 
+        // If this is the last member, the group becomes empty so we must
+        // also update the assignment epoch to match the group epoch. We
+        // use a timestamp of zero to mimic the behavior of a new group
+        // so that the assignment interval does not delay the next
+        // assignment computation.
+        if (group.members().size() == 1) {
+            records.add(newStreamsGroupTargetAssignmentMetadataRecord(
+                group.groupId(), groupEpoch, 0L));
+        }
+
         cancelTimers(group.groupId(), member.memberId());
 
         return new CoordinatorResult<>(records, response);
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 8f69a687b51..954b3792830 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -20168,7 +20168,9 @@ public class GroupMetadataManagerTest {
                             new TreeMap<>(Map.of(
                                 "num.standby.replicas", "0"
                             ))
-                        )
+                        ),
+                        
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(
+                            groupId, 3, 0L)
                     )
                 )
             )),

Reply via email to