lucasbru commented on code in PR #19758:
URL: https://github.com/apache/kafka/pull/19758#discussion_r2121834766


##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -398,8 +407,201 @@ protected Admin createAdminClient(Map<String, String> 
configOverrides) throws IO
             props.putAll(configOverrides);
             return Admin.create(props);
         }
+
+        Map<String, Throwable> deleteGroups() {
+            List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+                ? new ArrayList<>(listStreamsGroups())
+                : new ArrayList<>(opts.options.valuesOf(opts.groupOpt));
+
+            // pre admin call checks
+            Map<String, Throwable> failed = preAdminCallChecks(groupIds);
+
+            groupIds.removeAll(failed.keySet());
+            Map<String, Throwable> success = new HashMap<>();
+            Map<String, List<String>> internalTopics = new HashMap<>();
+            Map<String, Throwable> internalTopicsDeletionFailures = new 
HashMap<>();
+            if (!groupIds.isEmpty()) {
+                // retrieve internal topics before deleting groups
+                internalTopics = retrieveInternalTopics(groupIds);
+
+                // delete streams groups
+                Map<String, KafkaFuture<Void>> groupsToDelete = 
adminClient.deleteStreamsGroups(
+                    groupIds,
+                    withTimeoutMs(new DeleteStreamsGroupsOptions())
+                ).deletedGroups();
+
+                groupsToDelete.forEach((g, f) -> {
+                    try {
+                        f.get();
+                        success.put(g, null);
+                    } catch (InterruptedException ie) {
+                        failed.put(g, ie);
+                    } catch (ExecutionException e) {
+                        failed.put(g, e.getCause());
+                    }
+                });
+
+                // delete internal topics
+                if (!success.isEmpty()) {
+                    for (String groupId : success.keySet()) {
+                        List<String> internalTopicsToDelete = 
internalTopics.get(groupId);
+                        if (internalTopicsToDelete != null && 
!internalTopicsToDelete.isEmpty()) {
+                            try {
+                                DeleteTopicsResult deleteTopicsResult = 
adminClient.deleteTopics(internalTopicsToDelete);
+                                deleteTopicsResult.all().get();
+                            } catch (InterruptedException | ExecutionException 
e) {
+                                internalTopicsDeletionFailures.put(groupId, 
e.getCause());
+                            }
+                        }
+                    }
+                }
+            }
+
+            // display outcome messages based on the results
+            if (failed.isEmpty()) {
+                System.out.println("Deletion of requested streams groups (" + 
"'" + 
success.keySet().stream().map(Object::toString).collect(Collectors.joining("', 
'")) + "') was successful.");
+            } else {
+                printError("Deletion of some streams groups failed:", 
Optional.empty());
+                failed.forEach((group, error) -> System.out.println("* Group 
'" + group + "' could not be deleted due to: " + error));
+
+                if (!success.isEmpty()) {
+                    System.out.println("\nThese streams groups were deleted 
successfully: " + "'" + 
success.keySet().stream().map(Object::toString).collect(Collectors.joining("', 
'")) + "'.");
+                }
+            }
+            if (!internalTopics.keySet().isEmpty()) {
+                printInternalTopicErrors(internalTopicsDeletionFailures, 
success.keySet(), internalTopics.keySet());
+            }
+            // for testing purpose: return all failures, including internal 
topics deletion failures
+            failed.putAll(success);
+            failed.putAll(internalTopicsDeletionFailures);
+            return failed;
+        }
+
+        private Map<String, Throwable> preAdminCallChecks(List<String> 
groupIds) {
+            List<GroupListing> streamsGroupIds = listDetailedStreamsGroups();
+            LinkedHashSet<String> groupIdSet = new LinkedHashSet<>(groupIds);
+
+            Map<String, Throwable> failed = new HashMap<>();
+
+            for (String groupId : groupIdSet) {
+                Optional<GroupListing> listing = 
streamsGroupIds.stream().filter(item -> 
item.groupId().equals(groupId)).findAny();
+                if (listing.isEmpty()) {
+                    failed.put(groupId, new IllegalArgumentException("Group '" 
+ groupId + "' does not exist or is not a streams group."));
+                } else {
+                    Optional<GroupState> groupState = 
listing.get().groupState();
+                    groupState.ifPresent(state -> {
+                        if (state == GroupState.DEAD) {
+                            failed.put(groupId, new 
IllegalStateException("Streams group '" + groupId + "' group state is DEAD."));
+                        } else if (state != GroupState.EMPTY) {
+                            failed.put(groupId, new 
GroupNotEmptyException("Streams group '" + groupId + "' is not EMPTY."));
+                        }
+                    });
+                }
+            }
+            return failed;
+        }
+
+        List<GroupListing> listDetailedStreamsGroups() {
+            try {
+                ListGroupsResult result = adminClient.listGroups(new 
ListGroupsOptions()
+                    
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
+                    .withTypes(Set.of(GroupType.STREAMS)));
+                Collection<GroupListing> listings = result.all().get();
+                return listings.stream().toList();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        private void printInternalTopicErrors(Map<String, Throwable> 
internalTopicsDeletionFailures,
+                                              Set<String> deletedGroupIds,
+                                              Set<String> 
groupIdsWithInternalTopics) {
+            if (!deletedGroupIds.isEmpty()) {
+                if (internalTopicsDeletionFailures.isEmpty()) {
+                    List<String> successfulGroups = deletedGroupIds.stream()
+                        .filter(groupIdsWithInternalTopics::contains)
+                        .collect(Collectors.toList());
+                    System.out.println("Deletion of associated internal topics 
of the streams groups ('" +
+                        String.join("', '", successfulGroups) + "') was 
successful.");
+                } else {
+                    System.out.println("Deletion of some associated internal 
topics failed:");
+                    internalTopicsDeletionFailures.forEach((group, error) ->
+                        System.out.println("* Internal topics of the streams 
group '" + group + "' could not be deleted due to: " + error));
+                }
+            }
+        }
+
+        // Visibility for testing
+        Map<String, List<String>> retrieveInternalTopics(List<String> 
groupIds) {
+            Map<String, List<String>> groupToInternalTopics = new HashMap<>();
+            try {
+                Map<String, StreamsGroupDescription> descriptionMap = 
adminClient.describeStreamsGroups(groupIds).all().get();
+                for (StreamsGroupDescription description : 
descriptionMap.values()) {
+
+                    List<String> nonInternalTopics = 
description.subtopologies().stream()
+                        .flatMap(subtopology -> Stream.concat(
+                            subtopology.sourceTopics().stream(),
+                            subtopology.repartitionSinkTopics().stream()))

Review Comment:
   This is not correct. I think in the previous version you only filtered out 
sourceTopics, which was correct. repartitionSinkTopics are _always_ internal 
topics, and must also be repartitionSourceTopics (of some other subtopology).



##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -398,8 +407,201 @@ protected Admin createAdminClient(Map<String, String> 
configOverrides) throws IO
             props.putAll(configOverrides);
             return Admin.create(props);
         }
+
+        Map<String, Throwable> deleteGroups() {
+            List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+                ? new ArrayList<>(listStreamsGroups())
+                : new ArrayList<>(opts.options.valuesOf(opts.groupOpt));
+
+            // pre admin call checks
+            Map<String, Throwable> failed = preAdminCallChecks(groupIds);
+
+            groupIds.removeAll(failed.keySet());
+            Map<String, Throwable> success = new HashMap<>();
+            Map<String, List<String>> internalTopics = new HashMap<>();
+            Map<String, Throwable> internalTopicsDeletionFailures = new 
HashMap<>();
+            if (!groupIds.isEmpty()) {
+                // retrieve internal topics before deleting groups
+                internalTopics = retrieveInternalTopics(groupIds);
+
+                // delete streams groups
+                Map<String, KafkaFuture<Void>> groupsToDelete = 
adminClient.deleteStreamsGroups(
+                    groupIds,
+                    withTimeoutMs(new DeleteStreamsGroupsOptions())
+                ).deletedGroups();
+
+                groupsToDelete.forEach((g, f) -> {
+                    try {
+                        f.get();
+                        success.put(g, null);
+                    } catch (InterruptedException ie) {
+                        failed.put(g, ie);
+                    } catch (ExecutionException e) {
+                        failed.put(g, e.getCause());
+                    }
+                });
+
+                // delete internal topics
+                if (!success.isEmpty()) {
+                    for (String groupId : success.keySet()) {
+                        List<String> internalTopicsToDelete = 
internalTopics.get(groupId);
+                        if (internalTopicsToDelete != null && 
!internalTopicsToDelete.isEmpty()) {
+                            try {
+                                DeleteTopicsResult deleteTopicsResult = 
adminClient.deleteTopics(internalTopicsToDelete);
+                                deleteTopicsResult.all().get();
+                            } catch (InterruptedException | ExecutionException 
e) {
+                                internalTopicsDeletionFailures.put(groupId, 
e.getCause());
+                            }
+                        }
+                    }
+                }
+            }
+
+            // display outcome messages based on the results
+            if (failed.isEmpty()) {
+                System.out.println("Deletion of requested streams groups (" + 
"'" + 
success.keySet().stream().map(Object::toString).collect(Collectors.joining("', 
'")) + "') was successful.");
+            } else {
+                printError("Deletion of some streams groups failed:", 
Optional.empty());
+                failed.forEach((group, error) -> System.out.println("* Group 
'" + group + "' could not be deleted due to: " + error));
+
+                if (!success.isEmpty()) {
+                    System.out.println("\nThese streams groups were deleted 
successfully: " + "'" + 
success.keySet().stream().map(Object::toString).collect(Collectors.joining("', 
'")) + "'.");
+                }
+            }
+            if (!internalTopics.keySet().isEmpty()) {
+                printInternalTopicErrors(internalTopicsDeletionFailures, 
success.keySet(), internalTopics.keySet());
+            }
+            // for testing purpose: return all failures, including internal 
topics deletion failures
+            failed.putAll(success);
+            failed.putAll(internalTopicsDeletionFailures);
+            return failed;
+        }
+
+        private Map<String, Throwable> preAdminCallChecks(List<String> 
groupIds) {
+            List<GroupListing> streamsGroupIds = listDetailedStreamsGroups();
+            LinkedHashSet<String> groupIdSet = new LinkedHashSet<>(groupIds);
+
+            Map<String, Throwable> failed = new HashMap<>();
+
+            for (String groupId : groupIdSet) {
+                Optional<GroupListing> listing = 
streamsGroupIds.stream().filter(item -> 
item.groupId().equals(groupId)).findAny();
+                if (listing.isEmpty()) {
+                    failed.put(groupId, new IllegalArgumentException("Group '" 
+ groupId + "' does not exist or is not a streams group."));
+                } else {
+                    Optional<GroupState> groupState = 
listing.get().groupState();
+                    groupState.ifPresent(state -> {
+                        if (state == GroupState.DEAD) {
+                            failed.put(groupId, new 
IllegalStateException("Streams group '" + groupId + "' group state is DEAD."));
+                        } else if (state != GroupState.EMPTY) {
+                            failed.put(groupId, new 
GroupNotEmptyException("Streams group '" + groupId + "' is not EMPTY."));
+                        }
+                    });
+                }
+            }
+            return failed;
+        }
+
+        List<GroupListing> listDetailedStreamsGroups() {
+            try {
+                ListGroupsResult result = adminClient.listGroups(new 
ListGroupsOptions()
+                    
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
+                    .withTypes(Set.of(GroupType.STREAMS)));
+                Collection<GroupListing> listings = result.all().get();
+                return listings.stream().toList();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        private void printInternalTopicErrors(Map<String, Throwable> 
internalTopicsDeletionFailures,
+                                              Set<String> deletedGroupIds,
+                                              Set<String> 
groupIdsWithInternalTopics) {
+            if (!deletedGroupIds.isEmpty()) {
+                if (internalTopicsDeletionFailures.isEmpty()) {
+                    List<String> successfulGroups = deletedGroupIds.stream()
+                        .filter(groupIdsWithInternalTopics::contains)
+                        .collect(Collectors.toList());
+                    System.out.println("Deletion of associated internal topics 
of the streams groups ('" +
+                        String.join("', '", successfulGroups) + "') was 
successful.");
+                } else {
+                    System.out.println("Deletion of some associated internal 
topics failed:");
+                    internalTopicsDeletionFailures.forEach((group, error) ->
+                        System.out.println("* Internal topics of the streams 
group '" + group + "' could not be deleted due to: " + error));
+                }
+            }
+        }
+
+        // Visibility for testing
+        Map<String, List<String>> retrieveInternalTopics(List<String> 
groupIds) {
+            Map<String, List<String>> groupToInternalTopics = new HashMap<>();
+            try {
+                Map<String, StreamsGroupDescription> descriptionMap = 
adminClient.describeStreamsGroups(groupIds).all().get();
+                for (StreamsGroupDescription description : 
descriptionMap.values()) {
+
+                    List<String> nonInternalTopics = 
description.subtopologies().stream()
+                        .flatMap(subtopology -> Stream.concat(
+                            subtopology.sourceTopics().stream(),
+                            subtopology.repartitionSinkTopics().stream()))
+                        .distinct()
+                        .toList();
+
+
+                    List<String> internalTopics = 
description.subtopologies().stream()
+                        .flatMap(subtopology -> Stream.concat(
+                            
subtopology.repartitionSourceTopics().keySet().stream(),
+                            
subtopology.stateChangelogTopics().keySet().stream()))
+                        .filter(topic -> !nonInternalTopics.contains(topic))
+                        .collect(Collectors.toList());
+                    internalTopics.removeIf(topic -> {
+                        if (!isInferredInternalTopic(topic, 
description.groupId())) {

Review Comment:
   Thank you.



##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java:
##########
@@ -52,13 +59,19 @@ public class StreamsGroupCommandOptions extends 
CommandDefaultOptions {
     public final OptionSpec<String> groupOpt;
     public final OptionSpec<Void> listOpt;
     public final OptionSpec<Void> describeOpt;
+    final OptionSpec<Void> allGroupsOpt;
+    final OptionSpec<Void> deleteOpt;
     public final OptionSpec<Long> timeoutMsOpt;
     public final OptionSpec<String> commandConfigOpt;
     public final OptionSpec<String> stateOpt;
     public final OptionSpec<Void> membersOpt;
     public final OptionSpec<Void> offsetsOpt;
     public final OptionSpec<Void> verboseOpt;
 
+    final Set<OptionSpec<?>> allGroupSelectionScopeOpts;
+    final Set<OptionSpec<?>> allStreamsGroupLevelOpts;
+

Review Comment:
   ```suggestion
   ```



##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -398,8 +407,201 @@ protected Admin createAdminClient(Map<String, String> 
configOverrides) throws IO
             props.putAll(configOverrides);
             return Admin.create(props);
         }
+
+        Map<String, Throwable> deleteGroups() {
+            List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+                ? new ArrayList<>(listStreamsGroups())
+                : new ArrayList<>(opts.options.valuesOf(opts.groupOpt));
+
+            // pre admin call checks
+            Map<String, Throwable> failed = preAdminCallChecks(groupIds);
+
+            groupIds.removeAll(failed.keySet());
+            Map<String, Throwable> success = new HashMap<>();
+            Map<String, List<String>> internalTopics = new HashMap<>();
+            Map<String, Throwable> internalTopicsDeletionFailures = new 
HashMap<>();
+            if (!groupIds.isEmpty()) {
+                // retrieve internal topics before deleting groups
+                internalTopics = retrieveInternalTopics(groupIds);
+
+                // delete streams groups
+                Map<String, KafkaFuture<Void>> groupsToDelete = 
adminClient.deleteStreamsGroups(
+                    groupIds,
+                    withTimeoutMs(new DeleteStreamsGroupsOptions())
+                ).deletedGroups();
+
+                groupsToDelete.forEach((g, f) -> {
+                    try {
+                        f.get();
+                        success.put(g, null);
+                    } catch (InterruptedException ie) {
+                        failed.put(g, ie);
+                    } catch (ExecutionException e) {
+                        failed.put(g, e.getCause());
+                    }
+                });
+
+                // delete internal topics
+                if (!success.isEmpty()) {
+                    for (String groupId : success.keySet()) {
+                        List<String> internalTopicsToDelete = 
internalTopics.get(groupId);
+                        if (internalTopicsToDelete != null && 
!internalTopicsToDelete.isEmpty()) {
+                            try {
+                                DeleteTopicsResult deleteTopicsResult = 
adminClient.deleteTopics(internalTopicsToDelete);
+                                deleteTopicsResult.all().get();
+                            } catch (InterruptedException | ExecutionException 
e) {
+                                internalTopicsDeletionFailures.put(groupId, 
e.getCause());
+                            }
+                        }
+                    }
+                }
+            }
+
+            // display outcome messages based on the results
+            if (failed.isEmpty()) {
+                System.out.println("Deletion of requested streams groups (" + 
"'" + 
success.keySet().stream().map(Object::toString).collect(Collectors.joining("', 
'")) + "') was successful.");
+            } else {
+                printError("Deletion of some streams groups failed:", 
Optional.empty());
+                failed.forEach((group, error) -> System.out.println("* Group 
'" + group + "' could not be deleted due to: " + error));
+
+                if (!success.isEmpty()) {
+                    System.out.println("\nThese streams groups were deleted 
successfully: " + "'" + 
success.keySet().stream().map(Object::toString).collect(Collectors.joining("', 
'")) + "'.");
+                }
+            }
+            if (!internalTopics.keySet().isEmpty()) {
+                printInternalTopicErrors(internalTopicsDeletionFailures, 
success.keySet(), internalTopics.keySet());
+            }
+            // for testing purpose: return all failures, including internal 
topics deletion failures
+            failed.putAll(success);
+            failed.putAll(internalTopicsDeletionFailures);
+            return failed;
+        }
+
+        private Map<String, Throwable> preAdminCallChecks(List<String> 
groupIds) {
+            List<GroupListing> streamsGroupIds = listDetailedStreamsGroups();
+            LinkedHashSet<String> groupIdSet = new LinkedHashSet<>(groupIds);
+
+            Map<String, Throwable> failed = new HashMap<>();
+
+            for (String groupId : groupIdSet) {
+                Optional<GroupListing> listing = 
streamsGroupIds.stream().filter(item -> 
item.groupId().equals(groupId)).findAny();
+                if (listing.isEmpty()) {
+                    failed.put(groupId, new IllegalArgumentException("Group '" 
+ groupId + "' does not exist or is not a streams group."));
+                } else {
+                    Optional<GroupState> groupState = 
listing.get().groupState();
+                    groupState.ifPresent(state -> {
+                        if (state == GroupState.DEAD) {
+                            failed.put(groupId, new 
IllegalStateException("Streams group '" + groupId + "' group state is DEAD."));
+                        } else if (state != GroupState.EMPTY) {
+                            failed.put(groupId, new 
GroupNotEmptyException("Streams group '" + groupId + "' is not EMPTY."));
+                        }
+                    });
+                }
+            }
+            return failed;
+        }
+
+        List<GroupListing> listDetailedStreamsGroups() {
+            try {
+                ListGroupsResult result = adminClient.listGroups(new 
ListGroupsOptions()
+                    
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
+                    .withTypes(Set.of(GroupType.STREAMS)));
+                Collection<GroupListing> listings = result.all().get();
+                return listings.stream().toList();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        private void printInternalTopicErrors(Map<String, Throwable> 
internalTopicsDeletionFailures,
+                                              Set<String> deletedGroupIds,
+                                              Set<String> 
groupIdsWithInternalTopics) {
+            if (!deletedGroupIds.isEmpty()) {
+                if (internalTopicsDeletionFailures.isEmpty()) {
+                    List<String> successfulGroups = deletedGroupIds.stream()
+                        .filter(groupIdsWithInternalTopics::contains)
+                        .collect(Collectors.toList());
+                    System.out.println("Deletion of associated internal topics 
of the streams groups ('" +
+                        String.join("', '", successfulGroups) + "') was 
successful.");
+                } else {
+                    System.out.println("Deletion of some associated internal 
topics failed:");
+                    internalTopicsDeletionFailures.forEach((group, error) ->
+                        System.out.println("* Internal topics of the streams 
group '" + group + "' could not be deleted due to: " + error));
+                }
+            }
+        }
+
+        // Visibility for testing
+        Map<String, List<String>> retrieveInternalTopics(List<String> 
groupIds) {
+            Map<String, List<String>> groupToInternalTopics = new HashMap<>();
+            try {
+                Map<String, StreamsGroupDescription> descriptionMap = 
adminClient.describeStreamsGroups(groupIds).all().get();
+                for (StreamsGroupDescription description : 
descriptionMap.values()) {
+
+                    List<String> nonInternalTopics = 
description.subtopologies().stream()
+                        .flatMap(subtopology -> Stream.concat(
+                            subtopology.sourceTopics().stream(),
+                            subtopology.repartitionSinkTopics().stream()))
+                        .distinct()
+                        .toList();
+

Review Comment:
   ```suggestion
   ```



##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -398,8 +407,201 @@ protected Admin createAdminClient(Map<String, String> 
configOverrides) throws IO
             props.putAll(configOverrides);
             return Admin.create(props);
         }
+
+        Map<String, Throwable> deleteGroups() {
+            List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+                ? new ArrayList<>(listStreamsGroups())
+                : new ArrayList<>(opts.options.valuesOf(opts.groupOpt));
+
+            // pre admin call checks
+            Map<String, Throwable> failed = preAdminCallChecks(groupIds);
+
+            groupIds.removeAll(failed.keySet());
+            Map<String, Throwable> success = new HashMap<>();
+            Map<String, List<String>> internalTopics = new HashMap<>();
+            Map<String, Throwable> internalTopicsDeletionFailures = new 
HashMap<>();
+            if (!groupIds.isEmpty()) {
+                // retrieve internal topics before deleting groups
+                internalTopics = retrieveInternalTopics(groupIds);
+
+                // delete streams groups
+                Map<String, KafkaFuture<Void>> groupsToDelete = 
adminClient.deleteStreamsGroups(
+                    groupIds,
+                    withTimeoutMs(new DeleteStreamsGroupsOptions())
+                ).deletedGroups();
+
+                groupsToDelete.forEach((g, f) -> {
+                    try {
+                        f.get();
+                        success.put(g, null);
+                    } catch (InterruptedException ie) {
+                        failed.put(g, ie);
+                    } catch (ExecutionException e) {
+                        failed.put(g, e.getCause());
+                    }
+                });
+
+                // delete internal topics
+                if (!success.isEmpty()) {
+                    for (String groupId : success.keySet()) {
+                        List<String> internalTopicsToDelete = 
internalTopics.get(groupId);
+                        if (internalTopicsToDelete != null && 
!internalTopicsToDelete.isEmpty()) {
+                            try {
+                                DeleteTopicsResult deleteTopicsResult = 
adminClient.deleteTopics(internalTopicsToDelete);
+                                deleteTopicsResult.all().get();
+                            } catch (InterruptedException | ExecutionException 
e) {
+                                internalTopicsDeletionFailures.put(groupId, 
e.getCause());
+                            }
+                        }
+                    }
+                }
+            }
+
+            // display outcome messages based on the results
+            if (failed.isEmpty()) {
+                System.out.println("Deletion of requested streams groups (" + 
"'" + 
success.keySet().stream().map(Object::toString).collect(Collectors.joining("', 
'")) + "') was successful.");
+            } else {
+                printError("Deletion of some streams groups failed:", 
Optional.empty());
+                failed.forEach((group, error) -> System.out.println("* Group 
'" + group + "' could not be deleted due to: " + error));
+
+                if (!success.isEmpty()) {
+                    System.out.println("\nThese streams groups were deleted 
successfully: " + "'" + 
success.keySet().stream().map(Object::toString).collect(Collectors.joining("', 
'")) + "'.");
+                }
+            }
+            if (!internalTopics.keySet().isEmpty()) {
+                printInternalTopicErrors(internalTopicsDeletionFailures, 
success.keySet(), internalTopics.keySet());
+            }
+            // for testing purpose: return all failures, including internal 
topics deletion failures
+            failed.putAll(success);
+            failed.putAll(internalTopicsDeletionFailures);
+            return failed;
+        }
+
+        private Map<String, Throwable> preAdminCallChecks(List<String> 
groupIds) {
+            List<GroupListing> streamsGroupIds = listDetailedStreamsGroups();
+            LinkedHashSet<String> groupIdSet = new LinkedHashSet<>(groupIds);
+
+            Map<String, Throwable> failed = new HashMap<>();
+
+            for (String groupId : groupIdSet) {
+                Optional<GroupListing> listing = 
streamsGroupIds.stream().filter(item -> 
item.groupId().equals(groupId)).findAny();
+                if (listing.isEmpty()) {
+                    failed.put(groupId, new IllegalArgumentException("Group '" 
+ groupId + "' does not exist or is not a streams group."));
+                } else {
+                    Optional<GroupState> groupState = 
listing.get().groupState();
+                    groupState.ifPresent(state -> {
+                        if (state == GroupState.DEAD) {
+                            failed.put(groupId, new 
IllegalStateException("Streams group '" + groupId + "' group state is DEAD."));
+                        } else if (state != GroupState.EMPTY) {
+                            failed.put(groupId, new 
GroupNotEmptyException("Streams group '" + groupId + "' is not EMPTY."));
+                        }
+                    });
+                }
+            }
+            return failed;
+        }
+
+        List<GroupListing> listDetailedStreamsGroups() {
+            try {
+                ListGroupsResult result = adminClient.listGroups(new 
ListGroupsOptions()
+                    
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
+                    .withTypes(Set.of(GroupType.STREAMS)));
+                Collection<GroupListing> listings = result.all().get();
+                return listings.stream().toList();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        private void printInternalTopicErrors(Map<String, Throwable> 
internalTopicsDeletionFailures,
+                                              Set<String> deletedGroupIds,
+                                              Set<String> 
groupIdsWithInternalTopics) {
+            if (!deletedGroupIds.isEmpty()) {
+                if (internalTopicsDeletionFailures.isEmpty()) {
+                    List<String> successfulGroups = deletedGroupIds.stream()
+                        .filter(groupIdsWithInternalTopics::contains)
+                        .collect(Collectors.toList());
+                    System.out.println("Deletion of associated internal topics 
of the streams groups ('" +
+                        String.join("', '", successfulGroups) + "') was 
successful.");
+                } else {
+                    System.out.println("Deletion of some associated internal 
topics failed:");
+                    internalTopicsDeletionFailures.forEach((group, error) ->
+                        System.out.println("* Internal topics of the streams 
group '" + group + "' could not be deleted due to: " + error));
+                }
+            }
+        }
+
+        // Visibility for testing
+        Map<String, List<String>> retrieveInternalTopics(List<String> 
groupIds) {
+            Map<String, List<String>> groupToInternalTopics = new HashMap<>();
+            try {
+                Map<String, StreamsGroupDescription> descriptionMap = 
adminClient.describeStreamsGroups(groupIds).all().get();
+                for (StreamsGroupDescription description : 
descriptionMap.values()) {
+
+                    List<String> nonInternalTopics = 
description.subtopologies().stream()
+                        .flatMap(subtopology -> Stream.concat(
+                            subtopology.sourceTopics().stream(),
+                            subtopology.repartitionSinkTopics().stream()))
+                        .distinct()
+                        .toList();
+
+
+                    List<String> internalTopics = 
description.subtopologies().stream()
+                        .flatMap(subtopology -> Stream.concat(
+                            
subtopology.repartitionSourceTopics().keySet().stream(),
+                            
subtopology.stateChangelogTopics().keySet().stream()))
+                        .filter(topic -> !nonInternalTopics.contains(topic))
+                        .collect(Collectors.toList());
+                    internalTopics.removeIf(topic -> {
+                        if (!isInferredInternalTopic(topic, 
description.groupId())) {
+                            printError("The internal topic '" + topic + "' is 
not inferred as internal " +
+                                "and thus will not be deleted with the group 
'" + description.groupId() + "'.", Optional.empty());
+                            return true;
+                        }
+                        return false;
+                    });
+                    if (!internalTopics.isEmpty()) {
+                        groupToInternalTopics.put(description.groupId(), 
internalTopics);
+                    }
+                }
+            } catch (InterruptedException | ExecutionException e) {
+                if (e.getCause() instanceof UnsupportedVersionException) {
+                    printError("Retrieving internal topics is not supported by 
the broker version. " +
+                        "Use 'kafka-topics.sh' to list and delete the group's 
internal topics.", Optional.of(e.getCause()));
+                } else {
+                    printError("Retrieving internal topics failed due to " + 
e.getMessage(), Optional.of(e));
+                }
+            }
+            return groupToInternalTopics;
+        }
+
+        private boolean isInferredInternalTopic(final String topicName, final 
String applicationId) {
+            return topicName.startsWith(applicationId + "-") && 
matchesInternalTopicFormat(topicName);
+        }
+
+        public static boolean matchesInternalTopicFormat(final String 
topicName) {
+            return topicName.endsWith("-changelog") || 
topicName.endsWith("-repartition")
+                || topicName.endsWith("-subscription-registration-topic")
+                || topicName.endsWith("-subscription-response-topic")
+                || 
topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-\\d+-topic")
+                || 
topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-\\d+-topic");
+        }
+
+        Collection<StreamsGroupMemberDescription> collectGroupMembers(String 
groupId) throws Exception {
+            return getDescribeGroup(groupId).members();
+        }
+
+        GroupState collectGroupState(String groupId) throws Exception {
+            return getDescribeGroup(groupId).groupState();
+        }
+
+        private <T extends AbstractOptions<T>> T withTimeoutMs(T options) {
+            int t = opts.options.valueOf(opts.timeoutMsOpt).intValue();
+            return options.timeoutMs(t);
+        }
     }
 
+

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to