AndrewJSchofield commented on code in PR #19820:
URL: https://github.com/apache/kafka/pull/19820#discussion_r2218503657
##########
tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java:
##########
@@ -366,6 +378,75 @@ Entry<Throwable, Map<String, Throwable>>
sendDeleteShareGroupOffsetsRequest(Stri
return new SimpleImmutableEntry<>(topLevelException,
topicLevelResult);
}
+ void resetOffsets() {
+ String groupId = opts.options.valueOf(opts.groupOpt);
+ try {
+ ShareGroupDescription shareGroupDescription =
describeShareGroups(List.of(groupId)).get(groupId);
+ if
(!GroupState.EMPTY.equals(shareGroupDescription.groupState())) {
+ CommandLineUtils.printErrorAndExit(String.format("Share
group '%s' is not empty.", groupId));
+ }
+ Map<TopicPartition, OffsetAndMetadata> offsetsToReset =
prepareOffsetsToReset(groupId);
+ if (offsetsToReset == null) {
+ return;
+ }
+ boolean dryRun = opts.options.has(opts.dryRunOpt) ||
!opts.options.has(opts.executeOpt);
+ if (!dryRun) {
+ adminClient.alterShareGroupOffsets(groupId,
+ offsetsToReset.entrySet().stream()
+ .collect(Collectors.toMap(
+ Entry::getKey, entry ->
entry.getValue().offset()
+ ))
+ ).all().get();
+ }
+ OffsetsUtils.printOffsetsToReset(Map.of(groupId,
offsetsToReset));
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ } catch (ExecutionException ee) {
+ Throwable cause = ee.getCause();
+ if (cause instanceof KafkaException) {
+ CommandLineUtils.printErrorAndExit(cause.getMessage());
+ } else {
+ throw new RuntimeException(cause);
+ }
+ }
+ }
+
+ protected Map<TopicPartition, OffsetAndMetadata>
prepareOffsetsToReset(String groupId) throws ExecutionException,
InterruptedException {
+ Map<String, ListShareGroupOffsetsSpec> groupSpecs =
Map.of(groupId, new ListShareGroupOffsetsSpec());
+ Map<TopicPartition, OffsetAndMetadata> offsetsByTopicPartitions =
adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
+ Set<String> existsTopics = adminClient.listTopics().names().get();
+
+ if (opts.options.has(opts.topicOpt)) {
Review Comment:
Yes, this is a good point. You cannot delete the offset for individual
topic-partitions. You can reset for individual topic-partitions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]