fonsdant commented on code in PR #17038:
URL: https://github.com/apache/kafka/pull/17038#discussion_r1746786740
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##########
@@ -262,12 +264,13 @@ private void createInternalTopics() {
);
}
- Map<TopicPartition, OffsetAndMetadata> listConsumerGroupOffsets(String
group)
+ ListConsumerGroupOffsetsResult listConsumerGroupOffsets(List<String>
groups)
throws InterruptedException, ExecutionException {
+ Map<String, ListConsumerGroupOffsetsSpec> groupSpecs = groups.stream()
+ .collect(Collectors.toMap(group -> group, group -> new
ListConsumerGroupOffsetsSpec()));
return adminCall(
- () ->
sourceAdminClient.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get(),
- () -> String.format("list offsets for consumer group %s on %s
cluster", group,
- config.sourceClusterAlias())
+ () -> sourceAdminClient.listConsumerGroupOffsets(groupSpecs),
+ () -> String.format("list offsets for consumer groups %s on %s
cluster", groups, config.sourceClusterAlias())
);
Review Comment:
While looping, we need to get topic partitions and metadata by group. For
this reason, I have chosen `ListConsumerGroupOffsetsResult` as the return of
listConsumerGroupOffsets as it allows us to do
`result.partitionsToOffsetAndMetadata(group).get()` after calling
`listConsumerGroups(filteredGroups)`.
In `.all().get()` inside `listConsumerGroups` approach, once we have all
topic partitions and metadata for all consumer groups, we will not be able to
distinguish each of them by group while looping.
An alternative could be to make a copy of
`ListConsumerGroupOffsetsResult.all()` that groups future results by group.
This way, we could continue to get them batched and still have how to
distinguish what group each one belongs to. But I think it would need KIP,
right? So it does not seem to be the case.
Let me know if I have missed something.
Many thanks for your review, @gharris1727! :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]