This is an automated email from the ASF dual-hosted git repository.
payang pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new 277663463c7 KAFKA-20306: StreamsGroupCommand should display committed
offsets for repartition topics (#21817)
277663463c7 is described below
commit 277663463c7143ceae348b3b87b2f63dc20b397a
Author: Alieh Saeedi <[email protected]>
AuthorDate: Fri Mar 20 10:09:59 2026 +0100
KAFKA-20306: StreamsGroupCommand should display committed offsets for
repartition topics (#21817)
This PR fixes `StreamsGroupCommand` to include repartition source
topics when retrieving committed offsets for streams groups. Offset
resets and deletions are still limited exclusively to source topics.
Testing - unit tests:
- testGetCommittedOffsetsIncludesRepartitionTopics: Verifies that
repartition topics are included while changelog and output topics are
excluded - testGetCommittedOffsetsWithMultipleSubtopologies: Verifies
correct behavior across multiple subtopologies
Reviewers: Lucas Brutschy <[email protected]>
(cherry picked from commit 4ce6c0467f10b26cf18be7ef2a64be2eda0d13a7)
---
.../kafka/tools/streams/StreamsGroupCommand.java | 30 ++++-
.../tools/streams/StreamsGroupCommandTest.java | 130 +++++++++++++++++++++
2 files changed, 156 insertions(+), 4 deletions(-)
diff --git
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
index 11d52f63dc7..72771d85fda 100644
---
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
@@ -463,8 +463,30 @@ public class StreamsGroupCommand {
var sourceTopics = adminClient.describeStreamsGroups(
List.of(groupId),
withTimeoutMs(new DescribeStreamsGroupsOptions())
- ).all().get().get(groupId)
- .subtopologies().stream()
+ ).all().get().get(groupId).subtopologies().stream()
+ .flatMap(subtopology -> Stream.concat(
+ subtopology.sourceTopics().stream(),
+
subtopology.repartitionSourceTopics().keySet().stream()))
+ .collect(Collectors.toSet());
+
+ var allTopicPartitions = adminClient.listStreamsGroupOffsets(
+ Map.of(groupId, new ListStreamsGroupOffsetsSpec()),
+ withTimeoutMs(new ListStreamsGroupOffsetsOptions())
+ ).partitionsToOffsetAndMetadata(groupId).get();
+
+ allTopicPartitions.keySet().removeIf(tp ->
!sourceTopics.contains(tp.topic()));
+ return allTopicPartitions;
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ Map<TopicPartition, OffsetAndMetadata> getInputTopicOffsets(String
groupId) {
+ try {
+ var sourceTopics = adminClient.describeStreamsGroups(
+ List.of(groupId),
+ withTimeoutMs(new DescribeStreamsGroupsOptions())
+ ).all().get().get(groupId).subtopologies().stream()
.flatMap(subtopology ->
subtopology.sourceTopics().stream())
.collect(Collectors.toSet());
@@ -639,7 +661,7 @@ public class StreamsGroupCommand {
String groupId = opts.options.valueOf(opts.groupOpt);
Map.Entry<Errors, Map<TopicPartition, Throwable>> res;
if (opts.options.has(opts.allInputTopicsOpt)) {
- Set<TopicPartition> partitions =
getCommittedOffsets(groupId).keySet();
+ Set<TopicPartition> partitions =
getInputTopicOffsets(groupId).keySet();
res = deleteOffsets(groupId, partitions, new HashMap<>());
} else if (opts.options.has(opts.inputTopicOpt)) {
List<String> topics =
opts.options.valuesOf(opts.inputTopicOpt);
@@ -925,7 +947,7 @@ public class StreamsGroupCommand {
private Collection<TopicPartition> getPartitionsToReset(String
groupId) throws ExecutionException, InterruptedException {
if (opts.options.has(opts.allInputTopicsOpt)) {
- return getCommittedOffsets(groupId).keySet();
+ return getInputTopicOffsets(groupId).keySet();
} else if (opts.options.has(opts.inputTopicOpt)) {
List<String> topics =
opts.options.valuesOf(opts.inputTopicOpt);
diff --git
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
index 47f8605218d..bd9d81d30a9 100644
---
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
@@ -515,6 +515,136 @@ public class StreamsGroupCommandTest {
}
}
+ @Test
+ public void testGetCommittedOffsetsIncludesRepartitionTopics() {
+ String groupId = "test-group";
+ String sourceTopic = "source-topic";
+ String repartitionTopic = "repartition-topic";
+ String changelogTopic = "changelog-topic";
+ String outputTopic = "output-topic";
+
+ // Set up describe streams groups to return both source and
repartition topics
+ DescribeStreamsGroupsResult describeResult =
mock(DescribeStreamsGroupsResult.class);
+ StreamsGroupDescription groupDescription = new StreamsGroupDescription(
+ groupId,
+ 0,
+ 0,
+ 0,
+ List.of(new StreamsGroupSubtopologyDescription(
+ "subtopology-0",
+ List.of(sourceTopic),
+ List.of(),
+ Map.of(changelogTopic,
mock(StreamsGroupSubtopologyDescription.TopicInfo.class)),
+ Map.of(repartitionTopic,
mock(StreamsGroupSubtopologyDescription.TopicInfo.class))
+ )),
+ List.of(),
+ GroupState.STABLE,
+ new Node(0, "localhost", 9092),
+ null
+ );
+
when(describeResult.all()).thenReturn(KafkaFuture.completedFuture(Map.of(groupId,
groupDescription)));
+ when(ADMIN_CLIENT.describeStreamsGroups(eq(List.of(groupId)),
any(DescribeStreamsGroupsOptions.class)))
+ .thenReturn(describeResult);
+
+ // Set up list offsets to return offsets for all topics including
those that should be filtered
+ ListStreamsGroupOffsetsResult offsetsResult =
mock(ListStreamsGroupOffsetsResult.class);
+ Map<TopicPartition, OffsetAndMetadata> allOffsets = new HashMap<>();
+ allOffsets.put(new TopicPartition(sourceTopic, 0), new
OffsetAndMetadata(10, Optional.of(0), ""));
+ allOffsets.put(new TopicPartition(repartitionTopic, 0), new
OffsetAndMetadata(20, Optional.of(1), ""));
+ allOffsets.put(new TopicPartition(changelogTopic, 0), new
OffsetAndMetadata(30, Optional.of(2), ""));
+ allOffsets.put(new TopicPartition(outputTopic, 0), new
OffsetAndMetadata(40, Optional.of(3), ""));
+
+
when(offsetsResult.partitionsToOffsetAndMetadata(groupId)).thenReturn(KafkaFuture.completedFuture(allOffsets));
+ when(ADMIN_CLIENT.listStreamsGroupOffsets(anyMap(),
any(ListStreamsGroupOffsetsOptions.class)))
+ .thenReturn(offsetsResult);
+
+ StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(
+ new String[]{"--bootstrap-server", BOOTSTRAP_SERVERS, "--group",
groupId, "--describe"});
+ StreamsGroupCommand.StreamsGroupService service = new
StreamsGroupCommand.StreamsGroupService(opts, ADMIN_CLIENT);
+
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets =
service.getCommittedOffsets(groupId);
+
+ // Should include source topic and repartition topic, but not
changelog or output topics
+ assertEquals(2, committedOffsets.size());
+ assertTrue(committedOffsets.containsKey(new
TopicPartition(sourceTopic, 0)));
+ assertTrue(committedOffsets.containsKey(new
TopicPartition(repartitionTopic, 0)));
+ assertFalse(committedOffsets.containsKey(new
TopicPartition(changelogTopic, 0)));
+ assertFalse(committedOffsets.containsKey(new
TopicPartition(outputTopic, 0)));
+
+ assertEquals(10, committedOffsets.get(new TopicPartition(sourceTopic,
0)).offset());
+ assertEquals(20, committedOffsets.get(new
TopicPartition(repartitionTopic, 0)).offset());
+
+ service.close();
+ }
+
+ @Test
+ public void testGetCommittedOffsetsWithMultipleSubtopologies() {
+ String groupId = "multi-subtopology-group";
+ String source1 = "source-1";
+ String source2 = "source-2";
+ String repartition1 = "repartition-1";
+ String repartition2 = "repartition-2";
+
+ // Set up describe streams groups with multiple subtopologies
+ DescribeStreamsGroupsResult describeResult =
mock(DescribeStreamsGroupsResult.class);
+ StreamsGroupDescription groupDescription = new StreamsGroupDescription(
+ groupId,
+ 0,
+ 0,
+ 0,
+ List.of(
+ new StreamsGroupSubtopologyDescription(
+ "subtopology-0",
+ List.of(source1),
+ List.of(),
+ Map.of(),
+ Map.of(repartition1,
mock(StreamsGroupSubtopologyDescription.TopicInfo.class))
+ ),
+ new StreamsGroupSubtopologyDescription(
+ "subtopology-1",
+ List.of(source2),
+ List.of(),
+ Map.of(),
+ Map.of(repartition2,
mock(StreamsGroupSubtopologyDescription.TopicInfo.class))
+ )
+ ),
+ List.of(),
+ GroupState.STABLE,
+ new Node(0, "localhost", 9092),
+ null
+ );
+
when(describeResult.all()).thenReturn(KafkaFuture.completedFuture(Map.of(groupId,
groupDescription)));
+ when(ADMIN_CLIENT.describeStreamsGroups(eq(List.of(groupId)),
any(DescribeStreamsGroupsOptions.class)))
+ .thenReturn(describeResult);
+
+ // Set up list offsets to return offsets for all source and
repartition topics
+ ListStreamsGroupOffsetsResult offsetsResult =
mock(ListStreamsGroupOffsetsResult.class);
+ Map<TopicPartition, OffsetAndMetadata> allOffsets = new HashMap<>();
+ allOffsets.put(new TopicPartition(source1, 0), new
OffsetAndMetadata(10, Optional.of(0), ""));
+ allOffsets.put(new TopicPartition(source2, 0), new
OffsetAndMetadata(20, Optional.of(1), ""));
+ allOffsets.put(new TopicPartition(repartition1, 0), new
OffsetAndMetadata(30, Optional.of(2), ""));
+ allOffsets.put(new TopicPartition(repartition2, 0), new
OffsetAndMetadata(40, Optional.of(3), ""));
+
+
when(offsetsResult.partitionsToOffsetAndMetadata(groupId)).thenReturn(KafkaFuture.completedFuture(allOffsets));
+ when(ADMIN_CLIENT.listStreamsGroupOffsets(anyMap(),
any(ListStreamsGroupOffsetsOptions.class)))
+ .thenReturn(offsetsResult);
+
+ StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(
+ new String[]{"--bootstrap-server", BOOTSTRAP_SERVERS, "--group",
groupId, "--describe"});
+ StreamsGroupCommand.StreamsGroupService service = new
StreamsGroupCommand.StreamsGroupService(opts, ADMIN_CLIENT);
+
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets =
service.getCommittedOffsets(groupId);
+
+ // Should include all source topics and repartition topics from both
subtopologies
+ assertEquals(4, committedOffsets.size());
+ assertTrue(committedOffsets.containsKey(new TopicPartition(source1,
0)));
+ assertTrue(committedOffsets.containsKey(new TopicPartition(source2,
0)));
+ assertTrue(committedOffsets.containsKey(new
TopicPartition(repartition1, 0)));
+ assertTrue(committedOffsets.containsKey(new
TopicPartition(repartition2, 0)));
+
+ service.close();
+ }
+
private ListGroupsResult listGroupResult(String groupId) {
ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);
when(listGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(List.of(