aliehsaeedii commented on code in PR #19646:
URL: https://github.com/apache/kafka/pull/19646#discussion_r2107042451
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -330,13 +400,433 @@ Map<TopicPartition, OffsetsInfo>
getOffsets(StreamsGroupDescription description)
Map<TopicPartition, OffsetAndMetadata> getCommittedOffsets(String
groupId) {
try {
- return adminClient.listConsumerGroupOffsets(
- Map.of(groupId, new
ListConsumerGroupOffsetsSpec())).partitionsToOffsetAndMetadata(groupId).get();
+ return adminClient.listStreamsGroupOffsets(
+ Map.of(groupId, new
ListStreamsGroupOffsetsSpec())).partitionsToOffsetAndMetadata(groupId).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
+ Map<String, Map<TopicPartition, OffsetAndMetadata>> resetOffsets() {
+ Map<String, Map<TopicPartition, OffsetAndMetadata>> result = new
HashMap<>();
+ List<String> groupIds = listStreamsGroups();
+ if (!groupIds.isEmpty()) {
+ Map<String, KafkaFuture<StreamsGroupDescription>>
streamsGroups = adminClient.describeStreamsGroups(
+ groupIds
+ ).describedGroups();
+
+ streamsGroups.forEach((groupId, groupDescription) -> {
+ try {
+ String state =
groupDescription.get().groupState().toString();
+ switch (state) {
+ case "Empty":
+ case "Dead":
+ result.put(groupId,
resetOffsetsForInactiveGroup(groupId));
+ break;
+ default:
+ printError("Assignments can only be reset if
the group '" + groupId + "' is inactive, but the current state is " + state +
".", Optional.empty());
+ result.put(groupId, Collections.emptyMap());
+ }
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ } catch (ExecutionException ee) {
+ if (ee.getCause() instanceof GroupIdNotFoundException)
{
+ result.put(groupId,
resetOffsetsForInactiveGroup(groupId));
+ } else {
+ throw new RuntimeException(ee);
+ }
+ }
+ });
+ }
+ return result;
+ }
+
+ private Map<TopicPartition, OffsetAndMetadata>
resetOffsetsForInactiveGroup(String groupId) {
+ try {
+ Collection<TopicPartition> partitionsToReset =
getPartitionsToReset(groupId);
+ Map<TopicPartition, OffsetAndMetadata> preparedOffsets =
prepareOffsetsToReset(groupId, partitionsToReset);
+
+ // Dry-run is the default behavior if --execute is not
specified
+ boolean dryRun = opts.options.has(opts.dryRunOpt) ||
!opts.options.has(opts.executeOpt);
+ if (!dryRun) {
+ adminClient.alterStreamsGroupOffsets(
+ groupId,
+ preparedOffsets
+ ).all().get();
+ }
+
+ return preparedOffsets;
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ } catch (ExecutionException ee) {
+ Throwable cause = ee.getCause();
+ if (cause instanceof KafkaException) {
+ throw (KafkaException) cause;
+ } else {
+ throw new RuntimeException(cause);
+ }
+ }
+ }
+
+ private Collection<TopicPartition> getPartitionsToReset(String
groupId) throws ExecutionException, InterruptedException {
+ if (opts.options.has(opts.allTopicsOpt)) {
+ return getCommittedOffsets(groupId).keySet();
+ } else if (opts.options.has(opts.topicOpt)) {
+ List<String> topics = opts.options.valuesOf(opts.topicOpt);
+ return parseTopicPartitionsToReset(topics);
+ } else {
+ if (!opts.options.has(opts.resetFromFileOpt))
+ CommandLineUtils.printUsageAndExit(opts.parser, "One of
the reset scopes should be defined: --all-topics, --topic.");
+
+ return Collections.emptyList();
+ }
+ }
+
+ private List<TopicPartition> parseTopicPartitionsToReset(List<String>
topicArgs) throws ExecutionException, InterruptedException {
+ List<String> topicsWithPartitions = new ArrayList<>();
+ List<String> topics = new ArrayList<>();
+
+ topicArgs.forEach(topicArg -> {
+ if (topicArg.contains(":"))
+ topicsWithPartitions.add(topicArg);
+ else
+ topics.add(topicArg);
+ });
+
+ List<TopicPartition> specifiedPartitions =
+
topicsWithPartitions.stream().flatMap(this::parseTopicsWithPartitions).collect(Collectors.toList());
+
+ List<TopicPartition> unspecifiedPartitions = new ArrayList<>();
+
+ if (!topics.isEmpty()) {
+ Map<String, TopicDescription> descriptionMap =
adminClient.describeTopics(
+ topics
+ ).allTopicNames().get();
+
+ descriptionMap.forEach((topic, description) ->
+ description.partitions().forEach(tpInfo ->
unspecifiedPartitions.add(new TopicPartition(topic, tpInfo.partition())))
+ );
+ }
+
+ specifiedPartitions.addAll(unspecifiedPartitions);
+
+ return specifiedPartitions;
+ }
+
+ private Stream<TopicPartition> parseTopicsWithPartitions(String
topicArg) {
+ ToIntFunction<String> partitionNum = partition -> {
+ try {
+ return Integer.parseInt(partition);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid partition '" +
partition + "' specified in topic arg '" + topicArg + "''");
+ }
+ };
+
+ String[] arr = topicArg.split(":");
+
+ if (arr.length != 2)
+ throw new IllegalArgumentException("Invalid topic arg '" +
topicArg + "', expected topic name and partitions");
+
+ String topic = arr[0];
+ String partitions = arr[1];
+
+ return Arrays.stream(partitions.split(",")).
+ map(partition -> new TopicPartition(topic,
partitionNum.applyAsInt(partition)));
+ }
+
+ @SuppressWarnings("CyclomaticComplexity")
+ private Map<TopicPartition, OffsetAndMetadata>
prepareOffsetsToReset(String groupId, Collection<TopicPartition>
partitionsToReset) {
+ if (opts.options.has(opts.resetToOffsetOpt)) {
+ long offset = opts.options.valueOf(opts.resetToOffsetOpt);
+ return
checkOffsetsRange(partitionsToReset.stream().collect(Collectors.toMap(Function.identity(),
tp -> offset)))
+
.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new
OffsetAndMetadata(e.getValue())));
+ } else if (opts.options.has(opts.resetToEarliestOpt)) {
+ Map<TopicPartition, LogOffsetResult> logStartOffsets =
getLogStartOffsets(partitionsToReset);
+ return
partitionsToReset.stream().collect(Collectors.toMap(Function.identity(),
topicPartition -> {
+ LogOffsetResult logOffsetResult =
logStartOffsets.get(topicPartition);
+
+ if (!(logOffsetResult instanceof LogOffset)) {
+ CommandLineUtils.printUsageAndExit(opts.parser, "Error
getting starting offset of topic partition: " + topicPartition);
+ }
+
+ return new OffsetAndMetadata(((LogOffset)
logOffsetResult).value);
+ }));
+ } else if (opts.options.has(opts.resetToLatestOpt)) {
+ Map<TopicPartition, LogOffsetResult> logEndOffsets =
getLogEndOffsets(partitionsToReset);
+ return
partitionsToReset.stream().collect(Collectors.toMap(Function.identity(),
topicPartition -> {
+ LogOffsetResult logOffsetResult =
logEndOffsets.get(topicPartition);
+
+ if (!(logOffsetResult instanceof LogOffset)) {
+ CommandLineUtils.printUsageAndExit(opts.parser, "Error
getting ending offset of topic partition: " + topicPartition);
+ }
+
+ return new OffsetAndMetadata(((LogOffset)
logOffsetResult).value);
+ }));
+ } else if (opts.options.has(opts.resetShiftByOpt)) {
+ Map<TopicPartition, OffsetAndMetadata> currentCommittedOffsets
= getCommittedOffsets(groupId);
+ Map<TopicPartition, Long> requestedOffsets =
partitionsToReset.stream().collect(Collectors.toMap(Function.identity(),
topicPartition -> {
+ long shiftBy = opts.options.valueOf(opts.resetShiftByOpt);
+ OffsetAndMetadata currentOffset =
currentCommittedOffsets.get(topicPartition);
+
+ if (currentOffset == null) {
+ throw new IllegalArgumentException("Cannot shift
offset for partition " + topicPartition + " since there is no current committed
offset");
+ }
+
+ return currentOffset.offset() + shiftBy;
Review Comment:
I assume any long value should be accepted. Good point. I'll add some tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]