lucasbru commented on code in PR #19895:
URL: https://github.com/apache/kafka/pull/19895#discussion_r2132131515
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -183,12 +197,127 @@ public void describeGroups() throws ExecutionException,
InterruptedException {
}
}
+ Map.Entry<Errors, Map<TopicPartition, Throwable>> deleteOffsets(String
groupId, List<String> topics) {
+ Map<TopicPartition, Throwable> partitionLevelResult = new
HashMap<>();
+ Set<String> topicWithPartitions = new HashSet<>();
+ Set<String> topicWithoutPartitions = new HashSet<>();
+
+ for (String topic : topics) {
+ if (topic.contains(":"))
+ topicWithPartitions.add(topic);
+ else
+ topicWithoutPartitions.add(topic);
+ }
+
+ List<TopicPartition> knownPartitions =
topicWithPartitions.stream().flatMap(this::parseTopicsWithPartitions).collect(Collectors.toList());
+
+ // Get the partitions of topics that the user did not explicitly
specify the partitions
+ DescribeTopicsResult describeTopicsResult =
adminClient.describeTopics(
+ topicWithoutPartitions,
+ withTimeoutMs(new DescribeTopicsOptions()));
+
+ Iterator<TopicPartition> unknownPartitions =
describeTopicsResult.topicNameValues().entrySet().stream().flatMap(e -> {
Review Comment:
can we rename this varialbe to something like `unspecifiedTopicPartitions`
and `specifiedTopicPartition` (above)? This is very confusing.
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -192,12 +205,327 @@ public void describeGroups() throws ExecutionException,
InterruptedException {
}
}
+ Map<String, Throwable> deleteGroups() {
+ List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+ ? new ArrayList<>(listStreamsGroups())
+ : new ArrayList<>(opts.options.valuesOf(opts.groupOpt));
+
+ // pre admin call checks
+ Map<String, Throwable> failed = preAdminCallChecks(groupIds);
+
+ groupIds.removeAll(failed.keySet());
+ Map<String, Throwable> success = new HashMap<>();
+ Map<String, List<String>> internalTopics = new HashMap<>();
+ Map<String, Throwable> internalTopicsDeletionFailures = new
HashMap<>();
+ if (!groupIds.isEmpty()) {
+ // retrieve internal topics before deleting groups
+ internalTopics = retrieveInternalTopics(groupIds);
+
+ // delete streams groups
+ Map<String, KafkaFuture<Void>> groupsToDelete =
adminClient.deleteStreamsGroups(
+ groupIds,
+ withTimeoutMs(new DeleteStreamsGroupsOptions())
+ ).deletedGroups();
+
+ groupsToDelete.forEach((g, f) -> {
+ try {
+ f.get();
+ success.put(g, null);
+ } catch (InterruptedException ie) {
+ failed.put(g, ie);
+ } catch (ExecutionException e) {
+ failed.put(g, e.getCause());
+ }
+ });
+
+ // delete internal topics
+ if (!success.isEmpty()) {
+ for (String groupId : success.keySet()) {
+ List<String> internalTopicsToDelete =
internalTopics.get(groupId);
+ if (internalTopicsToDelete != null &&
!internalTopicsToDelete.isEmpty()) {
+ DeleteTopicsResult deleteTopicsResult = null;
+ try {
+ deleteTopicsResult =
adminClient.deleteTopics(internalTopicsToDelete);
+ deleteTopicsResult.all().get();
+ } catch (InterruptedException | ExecutionException
e) {
+ if (deleteTopicsResult != null) {
+
deleteTopicsResult.topicNameValues().forEach((topic, future) -> {
+ try {
+ future.get();
+ } catch (Exception topicException) {
+ System.out.println("Failed to
delete internal topic: " + topic);
+ }
+ });
+ }
+ internalTopicsDeletionFailures.put(groupId,
e.getCause());
+ }
+ }
+ }
+ }
+ }
+ // display outcome messages based on the results
+ if (failed.isEmpty()) {
+ System.out.println("Deletion of requested streams groups (" +
"'" +
success.keySet().stream().map(Object::toString).collect(Collectors.joining("',
'")) + "') was successful.");
+ } else {
+ printError("Deletion of some streams groups failed:",
Optional.empty());
+ failed.forEach((group, error) -> System.out.println("* Group
'" + group + "' could not be deleted due to: " + error));
+
+ if (!success.isEmpty()) {
+ System.out.println("\nThese streams groups were deleted
successfully: " + "'" +
success.keySet().stream().map(Object::toString).collect(Collectors.joining("',
'")) + "'.");
+ }
+ }
+ if (!internalTopics.keySet().isEmpty()) {
+ printInternalTopicErrors(internalTopicsDeletionFailures,
success.keySet(), internalTopics.keySet());
+ }
+ // for testing purpose: return all failures, including internal
topics deletion failures
+ failed.putAll(success);
+ failed.putAll(internalTopicsDeletionFailures);
+ return failed;
+ }
+
+ private Map<String, Throwable> preAdminCallChecks(List<String>
groupIds) {
+ List<GroupListing> streamsGroupIds = listDetailedStreamsGroups();
+ LinkedHashSet<String> groupIdSet = new LinkedHashSet<>(groupIds);
+
+ Map<String, Throwable> failed = new HashMap<>();
+
+ for (String groupId : groupIdSet) {
+ Optional<GroupListing> listing =
streamsGroupIds.stream().filter(item ->
item.groupId().equals(groupId)).findAny();
+ if (listing.isEmpty()) {
+ failed.put(groupId, new IllegalArgumentException("Group '"
+ groupId + "' does not exist or is not a streams group."));
+ } else {
+ Optional<GroupState> groupState =
listing.get().groupState();
+ groupState.ifPresent(state -> {
+ if (state == GroupState.DEAD) {
+ failed.put(groupId, new
IllegalStateException("Streams group '" + groupId + "' group state is DEAD."));
+ } else if (state != GroupState.EMPTY) {
+ failed.put(groupId, new
GroupNotEmptyException("Streams group '" + groupId + "' is not EMPTY."));
+ }
+ });
+ }
+ }
+ return failed;
+ }
+
+ // Visibility for testing
+ Map<String, List<String>> retrieveInternalTopics(List<String>
groupIds) {
+ Map<String, List<String>> groupToInternalTopics = new HashMap<>();
+ try {
+ Map<String, StreamsGroupDescription> descriptionMap =
adminClient.describeStreamsGroups(groupIds).all().get();
+ for (StreamsGroupDescription description :
descriptionMap.values()) {
+
+ List<String> sourceTopics =
description.subtopologies().stream()
+ .flatMap(subtopology ->
subtopology.sourceTopics().stream()).toList();
+
+ List<String> internalTopics =
description.subtopologies().stream()
+ .flatMap(subtopology -> Stream.concat(
+
subtopology.repartitionSourceTopics().keySet().stream(),
+
subtopology.stateChangelogTopics().keySet().stream()))
+ .filter(topic -> !sourceTopics.contains(topic))
+ .collect(Collectors.toList());
+ internalTopics.removeIf(topic -> {
+ if (!isInferredInternalTopic(topic,
description.groupId())) {
+ printError("The internal topic '" + topic + "' is
not inferred as internal " +
+ "and thus will not be deleted with the group
'" + description.groupId() + "'.", Optional.empty());
+ return true;
+ }
+ return false;
+ });
+ if (!internalTopics.isEmpty()) {
+ groupToInternalTopics.put(description.groupId(),
internalTopics);
+ }
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ if (e.getCause() instanceof UnsupportedVersionException) {
+ printError("Retrieving internal topics is not supported by
the broker version. " +
+ "Use 'kafka-topics.sh' to list and delete the group's
internal topics.", Optional.of(e.getCause()));
+ } else {
+ printError("Retrieving internal topics failed due to " +
e.getMessage(), Optional.of(e));
+ }
+ }
+ return groupToInternalTopics;
+ }
+
+ private boolean isInferredInternalTopic(final String topicName, final
String applicationId) {
+ return topicName.startsWith(applicationId + "-") &&
matchesInternalTopicFormat(topicName);
+ }
+
+ public static boolean matchesInternalTopicFormat(final String
topicName) {
+ return topicName.endsWith("-changelog") ||
topicName.endsWith("-repartition")
+ || topicName.endsWith("-subscription-registration-topic")
+ || topicName.endsWith("-subscription-response-topic")
+ ||
topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-\\d+-topic")
+ ||
topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-\\d+-topic");
+ }
+
+
+ private void printInternalTopicErrors(Map<String, Throwable>
internalTopicsDeletionFailures,
+ Set<String> deletedGroupIds,
+ Set<String>
groupIdsWithInternalTopics) {
+ if (!deletedGroupIds.isEmpty()) {
+ if (internalTopicsDeletionFailures.isEmpty()) {
+ List<String> successfulGroups = deletedGroupIds.stream()
+ .filter(groupIdsWithInternalTopics::contains)
+ .collect(Collectors.toList());
+ System.out.println("Deletion of associated internal topics
of the streams groups ('" +
+ String.join("', '", successfulGroups) + "') was
successful.");
+ } else {
+ System.out.println("Deletion of some associated internal
topics failed:");
+ internalTopicsDeletionFailures.forEach((group, error) ->
+ System.out.println("* Internal topics of the streams
group '" + group + "' could not be deleted due to: " + error));
+ }
+ }
+ }
+
+ List<GroupListing> listDetailedStreamsGroups() {
+ try {
+ ListGroupsResult result = adminClient.listGroups(new
ListGroupsOptions()
+
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
+ .withTypes(Set.of(GroupType.STREAMS)));
+ Collection<GroupListing> listings = result.all().get();
+ return listings.stream().toList();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Map.Entry<Errors, Map<TopicPartition, Throwable>>
deleteOffsets(String groupId, List<String> topics) {
+ Map<TopicPartition, Throwable> partitionLevelResult = new
HashMap<>();
+ Set<String> topicWithPartitions = new HashSet<>();
+ Set<String> topicWithoutPartitions = new HashSet<>();
+
+ for (String topic : topics) {
+ if (topic.contains(":"))
+ topicWithPartitions.add(topic);
+ else
+ topicWithoutPartitions.add(topic);
+ }
+
+ List<TopicPartition> knownPartitions =
topicWithPartitions.stream().flatMap(this::parseTopicsWithPartitions).toList();
+
+ // Get the partitions of topics that the user did not explicitly
specify the partitions
+ DescribeTopicsResult describeTopicsResult =
adminClient.describeTopics(
+ topicWithoutPartitions,
+ withTimeoutMs(new DescribeTopicsOptions()));
+
+ Iterator<TopicPartition> unknownPartitions =
describeTopicsResult.topicNameValues().entrySet().stream().flatMap(e -> {
+ String topic = e.getKey();
+ try {
+ return
e.getValue().get().partitions().stream().map(partition ->
+ new TopicPartition(topic, partition.partition()));
+ } catch (ExecutionException | InterruptedException err) {
+ partitionLevelResult.put(new TopicPartition(topic, -1),
err);
+ return Stream.empty();
+ }
+ }).iterator();
+
+ Set<TopicPartition> partitions = new HashSet<>(knownPartitions);
+
+ unknownPartitions.forEachRemaining(partitions::add);
+
+ return deleteOffsets(groupId, partitions, partitionLevelResult);
+ }
+
+ private Map.Entry<Errors, Map<TopicPartition, Throwable>>
deleteOffsets(String groupId, Set<TopicPartition> partitions,
Map<TopicPartition, Throwable> partitionLevelResult) {
+
+ DeleteStreamsGroupOffsetsResult deleteResult =
adminClient.deleteStreamsGroupOffsets(
+ groupId,
+ partitions,
+ withTimeoutMs(new DeleteStreamsGroupOffsetsOptions())
+ );
+
+ Errors topLevelException = Errors.NONE;
+
+ try {
+ deleteResult.all().get();
+ } catch (ExecutionException | InterruptedException e) {
+ topLevelException = Errors.forException(e.getCause());
+ }
+
+ partitions.forEach(partition -> {
+ try {
+ deleteResult.partitionResult(partition).get();
+ partitionLevelResult.put(partition, null);
+ } catch (ExecutionException | InterruptedException e) {
+ partitionLevelResult.put(partition, e);
+ }
+ });
+
+ return new AbstractMap.SimpleImmutableEntry<>(topLevelException,
partitionLevelResult);
+ }
+
+ Map.Entry<Errors, Map<TopicPartition, Throwable>> deleteOffsets() {
+ String groupId = opts.options.valueOf(opts.groupOpt);
+ Map.Entry<Errors, Map<TopicPartition, Throwable>> res;
+ if (opts.options.has(opts.allInputTopicsOpt)) {
+ Set<TopicPartition> partitions =
getCommittedOffsets(groupId).keySet();
+ res = deleteOffsets(groupId, partitions, new HashMap<>());
+ } else if (opts.options.has(opts.inputTopicOpt)) {
+ List<String> topics =
opts.options.valuesOf(opts.inputTopicOpt);
+ res = deleteOffsets(groupId, topics);
+ } else {
+ CommandLineUtils.printUsageAndExit(opts.parser, "Option
--delete-offsets requires either --all-topics or --topic to be specified.");
Review Comment:
Can you make this `--all-input-topics`?
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -326,10 +469,40 @@ Map<TopicPartition, OffsetsInfo>
getOffsets(StreamsGroupDescription description)
return output;
}
+ private Stream<TopicPartition> parseTopicsWithPartitions(String
topicArg) {
+ ToIntFunction<String> partitionNum = partition -> {
+ try {
+ return Integer.parseInt(partition);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid partition '" +
partition + "' specified in topic arg '" + topicArg + "''");
Review Comment:
yes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]