This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e08a15091d3 KAFKA-20442: Update streams group assignment epoch when
last member leaves (#22047)
e08a15091d3 is described below
commit e08a15091d30cfd43d14cfe7ca8bc6260b7a1b5f
Author: David Jacot <[email protected]>
AuthorDate: Wed Apr 15 09:40:32 2026 +0200
KAFKA-20442: Update streams group assignment epoch when last member leaves
(#22047)
When the last member of a streams group is fenced, the group epoch is
bumped but the assignment epoch is not updated. This leaves the empty
group with `groupEpoch > assignmentEpoch`, violating the invariant that
an empty group should have matching epochs. This patch writes a
`StreamsGroupTargetAssignmentMetadataRecord` when the fence operation
results in an empty group so that the assignment epoch catches up to the
group epoch.
Reviewers: Lucas Brutschy <[email protected]>, Sean Quah
<[email protected]>
---
.../apache/kafka/coordinator/group/GroupMetadataManager.java | 11 +++++++++++
.../kafka/coordinator/group/GroupMetadataManagerTest.java | 4 +++-
2 files changed, 14 insertions(+), 1 deletion(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 630080c2063..21fab03083f 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -249,6 +249,7 @@ import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecor
import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord;
import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord;
import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord;
+import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord;
import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord;
import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord;
import static
org.apache.kafka.coordinator.group.streams.StreamsGroupMember.hasAssignedTasksChanged;
@@ -4505,6 +4506,16 @@ public class GroupMetadataManager {
int groupEpoch = group.groupEpoch() + 1;
records.add(newStreamsGroupMetadataRecord(group.groupId(), groupEpoch,
group.metadataHash(), group.validatedTopologyEpoch(),
group.lastAssignmentConfigs()));
+ // If this is the last member, the group becomes empty so we must
+ // also update the assignment epoch to match the group epoch. We
+ // use a timestamp of zero to mimic the behavior of a new group
+ // so that the assignment interval does not delay the next
+ // assignment computation.
+ if (group.members().size() == 1) {
+ records.add(newStreamsGroupTargetAssignmentMetadataRecord(
+ group.groupId(), groupEpoch, 0L));
+ }
+
cancelTimers(group.groupId(), member.memberId());
return new CoordinatorResult<>(records, response);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 8f69a687b51..954b3792830 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -20168,7 +20168,9 @@ public class GroupMetadataManagerTest {
new TreeMap<>(Map.of(
"num.standby.replicas", "0"
))
- )
+ ),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(
+ groupId, 3, 0L)
)
)
)),