lucasbru commented on code in PR #19758:
URL: https://github.com/apache/kafka/pull/19758#discussion_r2123533779
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -398,6 +403,127 @@ protected Admin createAdminClient(Map<String, String>
configOverrides) throws IO
props.putAll(configOverrides);
return Admin.create(props);
}
+
+ Map<String, Throwable> deleteGroups() {
+ List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+ ? listStreamsGroups()
+ : opts.options.valuesOf(opts.groupOpt);
+
+ Map<String, Throwable> success = new HashMap<>();
+ Map<String, Throwable> failed = new HashMap<>();
+
+ // retrieve internal topics before deleting groups
+ Map<String, List<String>> internalTopics =
retrieveInternalTopics(groupIds);
+
+ Map<String, KafkaFuture<Void>> groupsToDelete =
adminClient.deleteStreamsGroups(
+ groupIds
+ ).deletedGroups();
+
+ groupsToDelete.forEach((g, f) -> {
+ try {
+ f.get();
+ success.put(g, null);
+ } catch (InterruptedException ie) {
+ failed.put(g, ie);
+ } catch (ExecutionException e) {
+ failed.put(g, e.getCause());
+ }
+ });
+
+ // delete internal topics
+ Map<String, Throwable> internalTopicsDeletionFailures = new
HashMap<>();
+ if (!success.isEmpty()) {
+ for (String groupId : success.keySet()) {
+ List<String> internalTopicsToDelete =
internalTopics.get(groupId);
+ if (internalTopicsToDelete != null &&
!internalTopicsToDelete.isEmpty()) {
+ try {
+ DeleteTopicsResult deleteTopicsResult =
adminClient.deleteTopics(internalTopicsToDelete);
+ deleteTopicsResult.all().get();
+ } catch (InterruptedException | ExecutionException e) {
+ internalTopicsDeletionFailures.put(groupId,
e.getCause());
+ }
+ }
+ }
+ }
+
+ if (failed.isEmpty()) {
+ System.out.println("Deletion of requested streams groups (" +
"'" +
success.keySet().stream().map(Object::toString).collect(Collectors.joining(",
")) + "'" + ") was successful.");
+ } else {
+ printError("Deletion of some streams groups failed:",
Optional.empty());
+ failed.forEach((group, error) -> System.out.println("* Group
'" + group + "' could not be deleted due to: " + error));
+
+ if (!success.isEmpty()) {
+ System.out.println("\nThese streams groups were deleted
successfully: " + "'" +
success.keySet().stream().map(Object::toString).collect(Collectors.joining("'"))
+ "', '");
+ }
+ }
+
+ if (!internalTopics.keySet().isEmpty()) {
+ printInternalTopicErrors(internalTopicsDeletionFailures,
success.keySet(), internalTopics.keySet());
+ }
+
+ failed.putAll(success);
+ failed.putAll(internalTopicsDeletionFailures);
+ return failed;
+ }
+
+ private void printInternalTopicErrors(Map<String, Throwable>
internalTopicsDeletionFailures,
+ Set<String> deletedGroupIds,
+ Set<String>
groupIdsWithInternalTopics) {
+ if (!deletedGroupIds.isEmpty()) {
+ if (internalTopicsDeletionFailures.isEmpty()) {
+ List<String> successfulGroups = deletedGroupIds.stream()
+ .filter(groupIdsWithInternalTopics::contains)
+ .collect(Collectors.toList());
+ System.out.println("Deletion of associated internal topics
of the streams groups ('" +
+ String.join(", ", successfulGroups) + "') was
successful.");
+ } else {
+ System.out.println("Deletion of some associated internal
topics failed:");
+ internalTopicsDeletionFailures.forEach((group, error) ->
+ System.out.println("* Group '" + group + "' could not
be deleted due to: " + error));
+ }
+ }
+ }
+
+ // Visibility for testing
+ Map<String, List<String>> retrieveInternalTopics(List<String>
groupIds) {
+ Map<String, List<String>> groupToInternalTopics = new HashMap<>();
+ try {
+ Map<String, StreamsGroupDescription> descriptionMap =
adminClient.describeStreamsGroups(groupIds).all().get();
+ for (StreamsGroupDescription description :
descriptionMap.values()) {
+
+ List<String> sourceTopics =
description.subtopologies().stream()
+ .flatMap(subtopology ->
subtopology.sourceTopics().stream())
+ .toList();
+
+ List<String> internalTopics =
description.subtopologies().stream()
+ .flatMap(subtopology -> Stream.concat(
+
subtopology.repartitionSourceTopics().keySet().stream(),
+
subtopology.stateChangelogTopics().keySet().stream()))
+ .filter(topic -> !sourceTopics.contains(topic))
+ .collect(Collectors.toList());
+ if (!internalTopics.isEmpty()) {
+ groupToInternalTopics.put(description.groupId(),
internalTopics);
+ }
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ if (e.getCause() instanceof UnsupportedVersionException) {
+ printError("Retrieving internal topics is not supported by
the broker version. " +
+ "Please execute --delete --internal-topics <topic
names> to delete the group's associated internal topics.",
Optional.of(e.getCause()));
Review Comment:
The way internal topics are named is an application detail, and many users
will not know it, and even on-call may not know it. The user first has to
figure this out from the documentation, and then list the topics, and apply the
filter. If we can do it for the user in a few lines of code, why not? Also, a
list topics request shouldn't hurt. Can you create a little subtask in the AK
ticket for KIP-1071? You could add label `newbie` since it should be trivial to
do, I'm sure somebody will pick it up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]