AndrewJSchofield commented on code in PR #19758:
URL: https://github.com/apache/kafka/pull/19758#discussion_r2108655166


##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -398,6 +403,127 @@ protected Admin createAdminClient(Map<String, String> 
configOverrides) throws IO
             props.putAll(configOverrides);
             return Admin.create(props);
         }
+
+        Map<String, Throwable> deleteGroups() {
+            List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+                ? listStreamsGroups()
+                : opts.options.valuesOf(opts.groupOpt);
+
+            Map<String, Throwable> success = new HashMap<>();
+            Map<String, Throwable> failed = new HashMap<>();
+
+            // retrieve internal topics before deleting groups
+            Map<String, List<String>> internalTopics = 
retrieveInternalTopics(groupIds);
+
+            Map<String, KafkaFuture<Void>> groupsToDelete = 
adminClient.deleteStreamsGroups(
+                groupIds
+            ).deletedGroups();
+
+            groupsToDelete.forEach((g, f) -> {
+                try {
+                    f.get();
+                    success.put(g, null);
+                } catch (InterruptedException ie) {
+                    failed.put(g, ie);
+                } catch (ExecutionException e) {
+                    failed.put(g, e.getCause());
+                }
+            });
+
+            // delete internal topics
+            Map<String, Throwable> internalTopicsDeletionFailures = new 
HashMap<>();
+            if (!success.isEmpty()) {
+                for (String groupId : success.keySet()) {
+                    List<String> internalTopicsToDelete = 
internalTopics.get(groupId);
+                    if (internalTopicsToDelete != null && 
!internalTopicsToDelete.isEmpty()) {
+                        try {
+                            DeleteTopicsResult deleteTopicsResult = 
adminClient.deleteTopics(internalTopicsToDelete);
+                            deleteTopicsResult.all().get();
+                        } catch (InterruptedException | ExecutionException e) {
+                            internalTopicsDeletionFailures.put(groupId, 
e.getCause());
+                        }
+                    }
+                }
+            }
+
+            if (failed.isEmpty()) {
+                System.out.println("Deletion of requested streams groups (" + 
"'" + 
success.keySet().stream().map(Object::toString).collect(Collectors.joining(", 
")) + "'" + ") was successful.");
+            } else {
+                printError("Deletion of some streams groups failed:", 
Optional.empty());
+                failed.forEach((group, error) -> System.out.println("* Group 
'" + group + "' could not be deleted due to: " + error));
+
+                if (!success.isEmpty()) {
+                    System.out.println("\nThese streams groups were deleted 
successfully: " + "'" + 
success.keySet().stream().map(Object::toString).collect(Collectors.joining("'"))
 + "', '");
+                }
+            }
+
+            if (!internalTopics.keySet().isEmpty()) {
+                printInternalTopicErrors(internalTopicsDeletionFailures, 
success.keySet(), internalTopics.keySet());
+            }
+
+            failed.putAll(success);
+            failed.putAll(internalTopicsDeletionFailures);

Review Comment:
   If the deletion of a streams group succeeds but the deletion of its internal 
topics fails, it appears as if this failure is reported as a group-level 
failure via the `failed` map. This seems misleading because the group deletion 
succeeded. I suppose a case could be made that this is only used for testing 
purposes, and I could understand that might be useful. If so, please could we 
have a comment making clear what's going on here?



##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -398,6 +403,127 @@ protected Admin createAdminClient(Map<String, String> 
configOverrides) throws IO
             props.putAll(configOverrides);
             return Admin.create(props);
         }
+
+        Map<String, Throwable> deleteGroups() {
+            List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+                ? listStreamsGroups()
+                : opts.options.valuesOf(opts.groupOpt);
+
+            Map<String, Throwable> success = new HashMap<>();
+            Map<String, Throwable> failed = new HashMap<>();
+
+            // retrieve internal topics before deleting groups
+            Map<String, List<String>> internalTopics = 
retrieveInternalTopics(groupIds);
+
+            Map<String, KafkaFuture<Void>> groupsToDelete = 
adminClient.deleteStreamsGroups(
+                groupIds

Review Comment:
   You should apply the configured timeout on this operation.



##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -398,6 +403,127 @@ protected Admin createAdminClient(Map<String, String> 
configOverrides) throws IO
             props.putAll(configOverrides);
             return Admin.create(props);
         }
+
+        Map<String, Throwable> deleteGroups() {
+            List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+                ? listStreamsGroups()
+                : opts.options.valuesOf(opts.groupOpt);
+
+            Map<String, Throwable> success = new HashMap<>();
+            Map<String, Throwable> failed = new HashMap<>();
+
+            // retrieve internal topics before deleting groups
+            Map<String, List<String>> internalTopics = 
retrieveInternalTopics(groupIds);
+
+            Map<String, KafkaFuture<Void>> groupsToDelete = 
adminClient.deleteStreamsGroups(

Review Comment:
   If specific group IDs are provided but they are not streams groups, the 
command still deletes them. In `kafka-share-groups.sh`, there is a specific 
test for group type to guard against this. It seems to me this would be a good 
safety measure here too.



##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -398,6 +403,127 @@ protected Admin createAdminClient(Map<String, String> 
configOverrides) throws IO
             props.putAll(configOverrides);
             return Admin.create(props);
         }
+
+        Map<String, Throwable> deleteGroups() {
+            List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+                ? listStreamsGroups()
+                : opts.options.valuesOf(opts.groupOpt);
+
+            Map<String, Throwable> success = new HashMap<>();
+            Map<String, Throwable> failed = new HashMap<>();
+
+            // retrieve internal topics before deleting groups
+            Map<String, List<String>> internalTopics = 
retrieveInternalTopics(groupIds);
+
+            Map<String, KafkaFuture<Void>> groupsToDelete = 
adminClient.deleteStreamsGroups(
+                groupIds
+            ).deletedGroups();
+
+            groupsToDelete.forEach((g, f) -> {
+                try {
+                    f.get();
+                    success.put(g, null);
+                } catch (InterruptedException ie) {
+                    failed.put(g, ie);
+                } catch (ExecutionException e) {
+                    failed.put(g, e.getCause());
+                }
+            });
+
+            // delete internal topics
+            Map<String, Throwable> internalTopicsDeletionFailures = new 
HashMap<>();
+            if (!success.isEmpty()) {
+                for (String groupId : success.keySet()) {
+                    List<String> internalTopicsToDelete = 
internalTopics.get(groupId);
+                    if (internalTopicsToDelete != null && 
!internalTopicsToDelete.isEmpty()) {
+                        try {
+                            DeleteTopicsResult deleteTopicsResult = 
adminClient.deleteTopics(internalTopicsToDelete);
+                            deleteTopicsResult.all().get();
+                        } catch (InterruptedException | ExecutionException e) {
+                            internalTopicsDeletionFailures.put(groupId, 
e.getCause());
+                        }
+                    }
+                }
+            }
+
+            if (failed.isEmpty()) {
+                System.out.println("Deletion of requested streams groups (" + 
"'" + 
success.keySet().stream().map(Object::toString).collect(Collectors.joining(", 
")) + "'" + ") was successful.");

Review Comment:
   The output here would be more elegant if the group IDs were surrounded by 
the `'`, as opposed to surrounding the list. For example, this strikes as a bit 
odd: `Deletion of requested streams groups ('') was successful.`.



##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -398,6 +403,127 @@ protected Admin createAdminClient(Map<String, String> 
configOverrides) throws IO
             props.putAll(configOverrides);
             return Admin.create(props);
         }
+
+        Map<String, Throwable> deleteGroups() {
+            List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+                ? listStreamsGroups()
+                : opts.options.valuesOf(opts.groupOpt);
+
+            Map<String, Throwable> success = new HashMap<>();
+            Map<String, Throwable> failed = new HashMap<>();
+
+            // retrieve internal topics before deleting groups
+            Map<String, List<String>> internalTopics = 
retrieveInternalTopics(groupIds);
+
+            Map<String, KafkaFuture<Void>> groupsToDelete = 
adminClient.deleteStreamsGroups(
+                groupIds
+            ).deletedGroups();
+
+            groupsToDelete.forEach((g, f) -> {
+                try {
+                    f.get();
+                    success.put(g, null);
+                } catch (InterruptedException ie) {
+                    failed.put(g, ie);
+                } catch (ExecutionException e) {
+                    failed.put(g, e.getCause());
+                }
+            });
+
+            // delete internal topics
+            Map<String, Throwable> internalTopicsDeletionFailures = new 
HashMap<>();
+            if (!success.isEmpty()) {
+                for (String groupId : success.keySet()) {
+                    List<String> internalTopicsToDelete = 
internalTopics.get(groupId);
+                    if (internalTopicsToDelete != null && 
!internalTopicsToDelete.isEmpty()) {
+                        try {
+                            DeleteTopicsResult deleteTopicsResult = 
adminClient.deleteTopics(internalTopicsToDelete);
+                            deleteTopicsResult.all().get();
+                        } catch (InterruptedException | ExecutionException e) {
+                            internalTopicsDeletionFailures.put(groupId, 
e.getCause());
+                        }
+                    }
+                }
+            }
+
+            if (failed.isEmpty()) {
+                System.out.println("Deletion of requested streams groups (" + 
"'" + 
success.keySet().stream().map(Object::toString).collect(Collectors.joining(", 
")) + "'" + ") was successful.");
+            } else {
+                printError("Deletion of some streams groups failed:", 
Optional.empty());
+                failed.forEach((group, error) -> System.out.println("* Group 
'" + group + "' could not be deleted due to: " + error));
+
+                if (!success.isEmpty()) {
+                    System.out.println("\nThese streams groups were deleted 
successfully: " + "'" + 
success.keySet().stream().map(Object::toString).collect(Collectors.joining("'"))
 + "', '");
+                }
+            }
+
+            if (!internalTopics.keySet().isEmpty()) {
+                printInternalTopicErrors(internalTopicsDeletionFailures, 
success.keySet(), internalTopics.keySet());
+            }
+
+            failed.putAll(success);
+            failed.putAll(internalTopicsDeletionFailures);
+            return failed;
+        }
+
+        private void printInternalTopicErrors(Map<String, Throwable> 
internalTopicsDeletionFailures,
+                                              Set<String> deletedGroupIds,
+                                              Set<String> 
groupIdsWithInternalTopics) {
+            if (!deletedGroupIds.isEmpty()) {
+                if (internalTopicsDeletionFailures.isEmpty()) {
+                    List<String> successfulGroups = deletedGroupIds.stream()
+                        .filter(groupIdsWithInternalTopics::contains)
+                        .collect(Collectors.toList());
+                    System.out.println("Deletion of associated internal topics 
of the streams groups ('" +
+                        String.join(", ", successfulGroups) + "') was 
successful.");
+                } else {
+                    System.out.println("Deletion of some associated internal 
topics failed:");
+                    internalTopicsDeletionFailures.forEach((group, error) ->
+                        System.out.println("* Group '" + group + "' could not 
be deleted due to: " + error));
+                }
+            }
+        }
+
+        // Visibility for testing
+        Map<String, List<String>> retrieveInternalTopics(List<String> 
groupIds) {
+            Map<String, List<String>> groupToInternalTopics = new HashMap<>();
+            try {
+                Map<String, StreamsGroupDescription> descriptionMap = 
adminClient.describeStreamsGroups(groupIds).all().get();
+                for (StreamsGroupDescription description : 
descriptionMap.values()) {
+
+                    List<String> sourceTopics = 
description.subtopologies().stream()
+                        .flatMap(subtopology -> 
subtopology.sourceTopics().stream())
+                        .toList();
+
+                    List<String> internalTopics = 
description.subtopologies().stream()
+                        .flatMap(subtopology -> Stream.concat(
+                            
subtopology.repartitionSourceTopics().keySet().stream(),
+                            
subtopology.stateChangelogTopics().keySet().stream()))
+                        .filter(topic -> !sourceTopics.contains(topic))
+                        .collect(Collectors.toList());
+                    if (!internalTopics.isEmpty()) {
+                        groupToInternalTopics.put(description.groupId(), 
internalTopics);
+                    }
+                }
+            } catch (InterruptedException | ExecutionException e) {
+                if (e.getCause() instanceof UnsupportedVersionException) {
+                    printError("Retrieving internal topics is not supported by 
the broker version. " +
+                        "Please execute --delete --internal-topics <topic 
names> to delete the group's associated internal topics.", 
Optional.of(e.getCause()));

Review Comment:
   This is referring to a non-existent option I think (or at least not yet 
implemented). I would prefer not to introduce the error message in advance of 
the code that the user would use to rectify the problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to