This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 56dc6acdcf0 KAFKA-20306: StreamsGroupCommand should display committed 
offsets for repartition topics (#21817)
56dc6acdcf0 is described below

commit 56dc6acdcf0c6d32ad782e01bac7d747f6b7128e
Author: Alieh Saeedi <[email protected]>
AuthorDate: Fri Mar 20 10:09:59 2026 +0100

    KAFKA-20306: StreamsGroupCommand should display committed offsets for 
repartition topics (#21817)
    
    This PR fixes `StreamsGroupCommand` to include repartition source
    topics when retrieving committed offsets for streams groups.   Offset
    resets and deletions are still limited exclusively to source topics.
    
    Testing - unit tests:
    - testGetCommittedOffsetsIncludesRepartitionTopics: Verifies that
    repartition topics are included while changelog and output topics are
    excluded    - testGetCommittedOffsetsWithMultipleSubtopologies: Verifies
    correct behavior across multiple subtopologies
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../kafka/tools/streams/StreamsGroupCommand.java   |  30 ++++-
 .../tools/streams/StreamsGroupCommandTest.java     | 130 +++++++++++++++++++++
 2 files changed, 156 insertions(+), 4 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
index 11d52f63dc7..72771d85fda 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
@@ -463,8 +463,30 @@ public class StreamsGroupCommand {
                 var sourceTopics = adminClient.describeStreamsGroups(
                     List.of(groupId),
                     withTimeoutMs(new DescribeStreamsGroupsOptions())
-                ).all().get().get(groupId)
-                    .subtopologies().stream()
+                ).all().get().get(groupId).subtopologies().stream()
+                    .flatMap(subtopology -> Stream.concat(
+                        subtopology.sourceTopics().stream(),
+                        
subtopology.repartitionSourceTopics().keySet().stream()))
+                    .collect(Collectors.toSet());
+
+                var allTopicPartitions = adminClient.listStreamsGroupOffsets(
+                    Map.of(groupId, new ListStreamsGroupOffsetsSpec()),
+                    withTimeoutMs(new ListStreamsGroupOffsetsOptions())
+                ).partitionsToOffsetAndMetadata(groupId).get();
+
+                allTopicPartitions.keySet().removeIf(tp -> 
!sourceTopics.contains(tp.topic()));
+                return allTopicPartitions;
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        Map<TopicPartition, OffsetAndMetadata> getInputTopicOffsets(String 
groupId) {
+            try {
+                var sourceTopics = adminClient.describeStreamsGroups(
+                    List.of(groupId),
+                    withTimeoutMs(new DescribeStreamsGroupsOptions())
+                ).all().get().get(groupId).subtopologies().stream()
                     .flatMap(subtopology -> 
subtopology.sourceTopics().stream())
                     .collect(Collectors.toSet());
 
@@ -639,7 +661,7 @@ public class StreamsGroupCommand {
             String groupId = opts.options.valueOf(opts.groupOpt);
             Map.Entry<Errors, Map<TopicPartition, Throwable>> res;
             if (opts.options.has(opts.allInputTopicsOpt)) {
-                Set<TopicPartition> partitions = 
getCommittedOffsets(groupId).keySet();
+                Set<TopicPartition> partitions = 
getInputTopicOffsets(groupId).keySet();
                 res = deleteOffsets(groupId, partitions, new HashMap<>());
             } else if (opts.options.has(opts.inputTopicOpt)) {
                 List<String> topics = 
opts.options.valuesOf(opts.inputTopicOpt);
@@ -925,7 +947,7 @@ public class StreamsGroupCommand {
 
         private Collection<TopicPartition> getPartitionsToReset(String 
groupId) throws ExecutionException, InterruptedException {
             if (opts.options.has(opts.allInputTopicsOpt)) {
-                return getCommittedOffsets(groupId).keySet();
+                return getInputTopicOffsets(groupId).keySet();
             } else if (opts.options.has(opts.inputTopicOpt)) {
                 List<String> topics = 
opts.options.valuesOf(opts.inputTopicOpt);
 
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
index 1bb3a906828..6f087f13f09 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
@@ -515,6 +515,136 @@ public class StreamsGroupCommandTest {
         }
     }
 
+    @Test
+    public void testGetCommittedOffsetsIncludesRepartitionTopics() {
+        String groupId = "test-group";
+        String sourceTopic = "source-topic";
+        String repartitionTopic = "repartition-topic";
+        String changelogTopic = "changelog-topic";
+        String outputTopic = "output-topic";
+
+        // Set up describe streams groups to return both source and 
repartition topics
+        DescribeStreamsGroupsResult describeResult = 
mock(DescribeStreamsGroupsResult.class);
+        StreamsGroupDescription groupDescription = new StreamsGroupDescription(
+            groupId,
+            0,
+            0,
+            0,
+            List.of(new StreamsGroupSubtopologyDescription(
+                "subtopology-0",
+                List.of(sourceTopic),
+                List.of(),
+                Map.of(changelogTopic, 
mock(StreamsGroupSubtopologyDescription.TopicInfo.class)),
+                Map.of(repartitionTopic, 
mock(StreamsGroupSubtopologyDescription.TopicInfo.class))
+            )),
+            List.of(),
+            GroupState.STABLE,
+            new Node(0, "localhost", 9092),
+            null
+        );
+        
when(describeResult.all()).thenReturn(KafkaFuture.completedFuture(Map.of(groupId,
 groupDescription)));
+        when(ADMIN_CLIENT.describeStreamsGroups(eq(List.of(groupId)), 
any(DescribeStreamsGroupsOptions.class)))
+            .thenReturn(describeResult);
+
+        // Set up list offsets to return offsets for all topics including 
those that should be filtered
+        ListStreamsGroupOffsetsResult offsetsResult = 
mock(ListStreamsGroupOffsetsResult.class);
+        Map<TopicPartition, OffsetAndMetadata> allOffsets = new HashMap<>();
+        allOffsets.put(new TopicPartition(sourceTopic, 0), new 
OffsetAndMetadata(10, Optional.of(0), ""));
+        allOffsets.put(new TopicPartition(repartitionTopic, 0), new 
OffsetAndMetadata(20, Optional.of(1), ""));
+        allOffsets.put(new TopicPartition(changelogTopic, 0), new 
OffsetAndMetadata(30, Optional.of(2), ""));
+        allOffsets.put(new TopicPartition(outputTopic, 0), new 
OffsetAndMetadata(40, Optional.of(3), ""));
+
+        
when(offsetsResult.partitionsToOffsetAndMetadata(groupId)).thenReturn(KafkaFuture.completedFuture(allOffsets));
+        when(ADMIN_CLIENT.listStreamsGroupOffsets(anyMap(), 
any(ListStreamsGroupOffsetsOptions.class)))
+            .thenReturn(offsetsResult);
+
+        StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(
+            new String[]{"--bootstrap-server", BOOTSTRAP_SERVERS, "--group", 
groupId, "--describe"});
+        StreamsGroupCommand.StreamsGroupService service = new 
StreamsGroupCommand.StreamsGroupService(opts, ADMIN_CLIENT);
+
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
service.getCommittedOffsets(groupId);
+
+        // Should include source topic and repartition topic, but not 
changelog or output topics
+        assertEquals(2, committedOffsets.size());
+        assertTrue(committedOffsets.containsKey(new 
TopicPartition(sourceTopic, 0)));
+        assertTrue(committedOffsets.containsKey(new 
TopicPartition(repartitionTopic, 0)));
+        assertFalse(committedOffsets.containsKey(new 
TopicPartition(changelogTopic, 0)));
+        assertFalse(committedOffsets.containsKey(new 
TopicPartition(outputTopic, 0)));
+
+        assertEquals(10, committedOffsets.get(new TopicPartition(sourceTopic, 
0)).offset());
+        assertEquals(20, committedOffsets.get(new 
TopicPartition(repartitionTopic, 0)).offset());
+
+        service.close();
+    }
+
+    @Test
+    public void testGetCommittedOffsetsWithMultipleSubtopologies() {
+        String groupId = "multi-subtopology-group";
+        String source1 = "source-1";
+        String source2 = "source-2";
+        String repartition1 = "repartition-1";
+        String repartition2 = "repartition-2";
+
+        // Set up describe streams groups with multiple subtopologies
+        DescribeStreamsGroupsResult describeResult = 
mock(DescribeStreamsGroupsResult.class);
+        StreamsGroupDescription groupDescription = new StreamsGroupDescription(
+            groupId,
+            0,
+            0,
+            0,
+            List.of(
+                new StreamsGroupSubtopologyDescription(
+                    "subtopology-0",
+                    List.of(source1),
+                    List.of(),
+                    Map.of(),
+                    Map.of(repartition1, 
mock(StreamsGroupSubtopologyDescription.TopicInfo.class))
+                ),
+                new StreamsGroupSubtopologyDescription(
+                    "subtopology-1",
+                    List.of(source2),
+                    List.of(),
+                    Map.of(),
+                    Map.of(repartition2, 
mock(StreamsGroupSubtopologyDescription.TopicInfo.class))
+                )
+            ),
+            List.of(),
+            GroupState.STABLE,
+            new Node(0, "localhost", 9092),
+            null
+        );
+        
when(describeResult.all()).thenReturn(KafkaFuture.completedFuture(Map.of(groupId,
 groupDescription)));
+        when(ADMIN_CLIENT.describeStreamsGroups(eq(List.of(groupId)), 
any(DescribeStreamsGroupsOptions.class)))
+            .thenReturn(describeResult);
+
+        // Set up list offsets to return offsets for all source and 
repartition topics
+        ListStreamsGroupOffsetsResult offsetsResult = 
mock(ListStreamsGroupOffsetsResult.class);
+        Map<TopicPartition, OffsetAndMetadata> allOffsets = new HashMap<>();
+        allOffsets.put(new TopicPartition(source1, 0), new 
OffsetAndMetadata(10, Optional.of(0), ""));
+        allOffsets.put(new TopicPartition(source2, 0), new 
OffsetAndMetadata(20, Optional.of(1), ""));
+        allOffsets.put(new TopicPartition(repartition1, 0), new 
OffsetAndMetadata(30, Optional.of(2), ""));
+        allOffsets.put(new TopicPartition(repartition2, 0), new 
OffsetAndMetadata(40, Optional.of(3), ""));
+
+        
when(offsetsResult.partitionsToOffsetAndMetadata(groupId)).thenReturn(KafkaFuture.completedFuture(allOffsets));
+        when(ADMIN_CLIENT.listStreamsGroupOffsets(anyMap(), 
any(ListStreamsGroupOffsetsOptions.class)))
+            .thenReturn(offsetsResult);
+
+        StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(
+            new String[]{"--bootstrap-server", BOOTSTRAP_SERVERS, "--group", 
groupId, "--describe"});
+        StreamsGroupCommand.StreamsGroupService service = new 
StreamsGroupCommand.StreamsGroupService(opts, ADMIN_CLIENT);
+
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
service.getCommittedOffsets(groupId);
+
+        // Should include all source topics and repartition topics from both 
subtopologies
+        assertEquals(4, committedOffsets.size());
+        assertTrue(committedOffsets.containsKey(new TopicPartition(source1, 
0)));
+        assertTrue(committedOffsets.containsKey(new TopicPartition(source2, 
0)));
+        assertTrue(committedOffsets.containsKey(new 
TopicPartition(repartition1, 0)));
+        assertTrue(committedOffsets.containsKey(new 
TopicPartition(repartition2, 0)));
+
+        service.close();
+    }
+
     private ListGroupsResult listGroupResult(String groupId) {
         ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);
         
when(listGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(List.of(

Reply via email to