lucasbru commented on code in PR #19758:
URL: https://github.com/apache/kafka/pull/19758#discussion_r2121834766
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -398,8 +407,201 @@ protected Admin createAdminClient(Map<String, String>
configOverrides) throws IO
props.putAll(configOverrides);
return Admin.create(props);
}
+
+ Map<String, Throwable> deleteGroups() {
+ List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+ ? new ArrayList<>(listStreamsGroups())
+ : new ArrayList<>(opts.options.valuesOf(opts.groupOpt));
+
+ // pre admin call checks
+ Map<String, Throwable> failed = preAdminCallChecks(groupIds);
+
+ groupIds.removeAll(failed.keySet());
+ Map<String, Throwable> success = new HashMap<>();
+ Map<String, List<String>> internalTopics = new HashMap<>();
+ Map<String, Throwable> internalTopicsDeletionFailures = new
HashMap<>();
+ if (!groupIds.isEmpty()) {
+ // retrieve internal topics before deleting groups
+ internalTopics = retrieveInternalTopics(groupIds);
+
+ // delete streams groups
+ Map<String, KafkaFuture<Void>> groupsToDelete =
adminClient.deleteStreamsGroups(
+ groupIds,
+ withTimeoutMs(new DeleteStreamsGroupsOptions())
+ ).deletedGroups();
+
+ groupsToDelete.forEach((g, f) -> {
+ try {
+ f.get();
+ success.put(g, null);
+ } catch (InterruptedException ie) {
+ failed.put(g, ie);
+ } catch (ExecutionException e) {
+ failed.put(g, e.getCause());
+ }
+ });
+
+ // delete internal topics
+ if (!success.isEmpty()) {
+ for (String groupId : success.keySet()) {
+ List<String> internalTopicsToDelete =
internalTopics.get(groupId);
+ if (internalTopicsToDelete != null &&
!internalTopicsToDelete.isEmpty()) {
+ try {
+ DeleteTopicsResult deleteTopicsResult =
adminClient.deleteTopics(internalTopicsToDelete);
+ deleteTopicsResult.all().get();
+ } catch (InterruptedException | ExecutionException
e) {
+ internalTopicsDeletionFailures.put(groupId,
e.getCause());
+ }
+ }
+ }
+ }
+ }
+
+ // display outcome messages based on the results
+ if (failed.isEmpty()) {
+ System.out.println("Deletion of requested streams groups (" +
"'" +
success.keySet().stream().map(Object::toString).collect(Collectors.joining("',
'")) + "') was successful.");
+ } else {
+ printError("Deletion of some streams groups failed:",
Optional.empty());
+ failed.forEach((group, error) -> System.out.println("* Group
'" + group + "' could not be deleted due to: " + error));
+
+ if (!success.isEmpty()) {
+ System.out.println("\nThese streams groups were deleted
successfully: " + "'" +
success.keySet().stream().map(Object::toString).collect(Collectors.joining("',
'")) + "'.");
+ }
+ }
+ if (!internalTopics.keySet().isEmpty()) {
+ printInternalTopicErrors(internalTopicsDeletionFailures,
success.keySet(), internalTopics.keySet());
+ }
+ // for testing purpose: return all failures, including internal
topics deletion failures
+ failed.putAll(success);
+ failed.putAll(internalTopicsDeletionFailures);
+ return failed;
+ }
+
+ private Map<String, Throwable> preAdminCallChecks(List<String>
groupIds) {
+ List<GroupListing> streamsGroupIds = listDetailedStreamsGroups();
+ LinkedHashSet<String> groupIdSet = new LinkedHashSet<>(groupIds);
+
+ Map<String, Throwable> failed = new HashMap<>();
+
+ for (String groupId : groupIdSet) {
+ Optional<GroupListing> listing =
streamsGroupIds.stream().filter(item ->
item.groupId().equals(groupId)).findAny();
+ if (listing.isEmpty()) {
+ failed.put(groupId, new IllegalArgumentException("Group '"
+ groupId + "' does not exist or is not a streams group."));
+ } else {
+ Optional<GroupState> groupState =
listing.get().groupState();
+ groupState.ifPresent(state -> {
+ if (state == GroupState.DEAD) {
+ failed.put(groupId, new
IllegalStateException("Streams group '" + groupId + "' group state is DEAD."));
+ } else if (state != GroupState.EMPTY) {
+ failed.put(groupId, new
GroupNotEmptyException("Streams group '" + groupId + "' is not EMPTY."));
+ }
+ });
+ }
+ }
+ return failed;
+ }
+
+ List<GroupListing> listDetailedStreamsGroups() {
+ try {
+ ListGroupsResult result = adminClient.listGroups(new
ListGroupsOptions()
+
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
+ .withTypes(Set.of(GroupType.STREAMS)));
+ Collection<GroupListing> listings = result.all().get();
+ return listings.stream().toList();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void printInternalTopicErrors(Map<String, Throwable>
internalTopicsDeletionFailures,
+ Set<String> deletedGroupIds,
+ Set<String>
groupIdsWithInternalTopics) {
+ if (!deletedGroupIds.isEmpty()) {
+ if (internalTopicsDeletionFailures.isEmpty()) {
+ List<String> successfulGroups = deletedGroupIds.stream()
+ .filter(groupIdsWithInternalTopics::contains)
+ .collect(Collectors.toList());
+ System.out.println("Deletion of associated internal topics
of the streams groups ('" +
+ String.join("', '", successfulGroups) + "') was
successful.");
+ } else {
+ System.out.println("Deletion of some associated internal
topics failed:");
+ internalTopicsDeletionFailures.forEach((group, error) ->
+ System.out.println("* Internal topics of the streams
group '" + group + "' could not be deleted due to: " + error));
+ }
+ }
+ }
+
+ // Visibility for testing
+ Map<String, List<String>> retrieveInternalTopics(List<String>
groupIds) {
+ Map<String, List<String>> groupToInternalTopics = new HashMap<>();
+ try {
+ Map<String, StreamsGroupDescription> descriptionMap =
adminClient.describeStreamsGroups(groupIds).all().get();
+ for (StreamsGroupDescription description :
descriptionMap.values()) {
+
+ List<String> nonInternalTopics =
description.subtopologies().stream()
+ .flatMap(subtopology -> Stream.concat(
+ subtopology.sourceTopics().stream(),
+ subtopology.repartitionSinkTopics().stream()))
Review Comment:
This is not correct. I think in the previous version you only filtered out
sourceTopics, which was correct. repartitionSinkTopics are _always_ internal
topics, and must also be repartitionSourceTopics (of some other subtopology).
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -398,8 +407,201 @@ protected Admin createAdminClient(Map<String, String>
configOverrides) throws IO
props.putAll(configOverrides);
return Admin.create(props);
}
+
+ Map<String, Throwable> deleteGroups() {
+ List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+ ? new ArrayList<>(listStreamsGroups())
+ : new ArrayList<>(opts.options.valuesOf(opts.groupOpt));
+
+ // pre admin call checks
+ Map<String, Throwable> failed = preAdminCallChecks(groupIds);
+
+ groupIds.removeAll(failed.keySet());
+ Map<String, Throwable> success = new HashMap<>();
+ Map<String, List<String>> internalTopics = new HashMap<>();
+ Map<String, Throwable> internalTopicsDeletionFailures = new
HashMap<>();
+ if (!groupIds.isEmpty()) {
+ // retrieve internal topics before deleting groups
+ internalTopics = retrieveInternalTopics(groupIds);
+
+ // delete streams groups
+ Map<String, KafkaFuture<Void>> groupsToDelete =
adminClient.deleteStreamsGroups(
+ groupIds,
+ withTimeoutMs(new DeleteStreamsGroupsOptions())
+ ).deletedGroups();
+
+ groupsToDelete.forEach((g, f) -> {
+ try {
+ f.get();
+ success.put(g, null);
+ } catch (InterruptedException ie) {
+ failed.put(g, ie);
+ } catch (ExecutionException e) {
+ failed.put(g, e.getCause());
+ }
+ });
+
+ // delete internal topics
+ if (!success.isEmpty()) {
+ for (String groupId : success.keySet()) {
+ List<String> internalTopicsToDelete =
internalTopics.get(groupId);
+ if (internalTopicsToDelete != null &&
!internalTopicsToDelete.isEmpty()) {
+ try {
+ DeleteTopicsResult deleteTopicsResult =
adminClient.deleteTopics(internalTopicsToDelete);
+ deleteTopicsResult.all().get();
+ } catch (InterruptedException | ExecutionException
e) {
+ internalTopicsDeletionFailures.put(groupId,
e.getCause());
+ }
+ }
+ }
+ }
+ }
+
+ // display outcome messages based on the results
+ if (failed.isEmpty()) {
+ System.out.println("Deletion of requested streams groups (" +
"'" +
success.keySet().stream().map(Object::toString).collect(Collectors.joining("',
'")) + "') was successful.");
+ } else {
+ printError("Deletion of some streams groups failed:",
Optional.empty());
+ failed.forEach((group, error) -> System.out.println("* Group
'" + group + "' could not be deleted due to: " + error));
+
+ if (!success.isEmpty()) {
+ System.out.println("\nThese streams groups were deleted
successfully: " + "'" +
success.keySet().stream().map(Object::toString).collect(Collectors.joining("',
'")) + "'.");
+ }
+ }
+ if (!internalTopics.keySet().isEmpty()) {
+ printInternalTopicErrors(internalTopicsDeletionFailures,
success.keySet(), internalTopics.keySet());
+ }
+ // for testing purpose: return all failures, including internal
topics deletion failures
+ failed.putAll(success);
+ failed.putAll(internalTopicsDeletionFailures);
+ return failed;
+ }
+
+ private Map<String, Throwable> preAdminCallChecks(List<String>
groupIds) {
+ List<GroupListing> streamsGroupIds = listDetailedStreamsGroups();
+ LinkedHashSet<String> groupIdSet = new LinkedHashSet<>(groupIds);
+
+ Map<String, Throwable> failed = new HashMap<>();
+
+ for (String groupId : groupIdSet) {
+ Optional<GroupListing> listing =
streamsGroupIds.stream().filter(item ->
item.groupId().equals(groupId)).findAny();
+ if (listing.isEmpty()) {
+ failed.put(groupId, new IllegalArgumentException("Group '"
+ groupId + "' does not exist or is not a streams group."));
+ } else {
+ Optional<GroupState> groupState =
listing.get().groupState();
+ groupState.ifPresent(state -> {
+ if (state == GroupState.DEAD) {
+ failed.put(groupId, new
IllegalStateException("Streams group '" + groupId + "' group state is DEAD."));
+ } else if (state != GroupState.EMPTY) {
+ failed.put(groupId, new
GroupNotEmptyException("Streams group '" + groupId + "' is not EMPTY."));
+ }
+ });
+ }
+ }
+ return failed;
+ }
+
+ List<GroupListing> listDetailedStreamsGroups() {
+ try {
+ ListGroupsResult result = adminClient.listGroups(new
ListGroupsOptions()
+
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
+ .withTypes(Set.of(GroupType.STREAMS)));
+ Collection<GroupListing> listings = result.all().get();
+ return listings.stream().toList();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void printInternalTopicErrors(Map<String, Throwable>
internalTopicsDeletionFailures,
+ Set<String> deletedGroupIds,
+ Set<String>
groupIdsWithInternalTopics) {
+ if (!deletedGroupIds.isEmpty()) {
+ if (internalTopicsDeletionFailures.isEmpty()) {
+ List<String> successfulGroups = deletedGroupIds.stream()
+ .filter(groupIdsWithInternalTopics::contains)
+ .collect(Collectors.toList());
+ System.out.println("Deletion of associated internal topics
of the streams groups ('" +
+ String.join("', '", successfulGroups) + "') was
successful.");
+ } else {
+ System.out.println("Deletion of some associated internal
topics failed:");
+ internalTopicsDeletionFailures.forEach((group, error) ->
+ System.out.println("* Internal topics of the streams
group '" + group + "' could not be deleted due to: " + error));
+ }
+ }
+ }
+
+ // Visibility for testing
+ Map<String, List<String>> retrieveInternalTopics(List<String>
groupIds) {
+ Map<String, List<String>> groupToInternalTopics = new HashMap<>();
+ try {
+ Map<String, StreamsGroupDescription> descriptionMap =
adminClient.describeStreamsGroups(groupIds).all().get();
+ for (StreamsGroupDescription description :
descriptionMap.values()) {
+
+ List<String> nonInternalTopics =
description.subtopologies().stream()
+ .flatMap(subtopology -> Stream.concat(
+ subtopology.sourceTopics().stream(),
+ subtopology.repartitionSinkTopics().stream()))
+ .distinct()
+ .toList();
+
+
+ List<String> internalTopics =
description.subtopologies().stream()
+ .flatMap(subtopology -> Stream.concat(
+
subtopology.repartitionSourceTopics().keySet().stream(),
+
subtopology.stateChangelogTopics().keySet().stream()))
+ .filter(topic -> !nonInternalTopics.contains(topic))
+ .collect(Collectors.toList());
+ internalTopics.removeIf(topic -> {
+ if (!isInferredInternalTopic(topic,
description.groupId())) {
Review Comment:
Thank you.
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java:
##########
@@ -52,13 +59,19 @@ public class StreamsGroupCommandOptions extends
CommandDefaultOptions {
public final OptionSpec<String> groupOpt;
public final OptionSpec<Void> listOpt;
public final OptionSpec<Void> describeOpt;
+ final OptionSpec<Void> allGroupsOpt;
+ final OptionSpec<Void> deleteOpt;
public final OptionSpec<Long> timeoutMsOpt;
public final OptionSpec<String> commandConfigOpt;
public final OptionSpec<String> stateOpt;
public final OptionSpec<Void> membersOpt;
public final OptionSpec<Void> offsetsOpt;
public final OptionSpec<Void> verboseOpt;
+ final Set<OptionSpec<?>> allGroupSelectionScopeOpts;
+ final Set<OptionSpec<?>> allStreamsGroupLevelOpts;
+
Review Comment:
```suggestion
```
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -398,8 +407,201 @@ protected Admin createAdminClient(Map<String, String>
configOverrides) throws IO
props.putAll(configOverrides);
return Admin.create(props);
}
+
+ Map<String, Throwable> deleteGroups() {
+ List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+ ? new ArrayList<>(listStreamsGroups())
+ : new ArrayList<>(opts.options.valuesOf(opts.groupOpt));
+
+ // pre admin call checks
+ Map<String, Throwable> failed = preAdminCallChecks(groupIds);
+
+ groupIds.removeAll(failed.keySet());
+ Map<String, Throwable> success = new HashMap<>();
+ Map<String, List<String>> internalTopics = new HashMap<>();
+ Map<String, Throwable> internalTopicsDeletionFailures = new
HashMap<>();
+ if (!groupIds.isEmpty()) {
+ // retrieve internal topics before deleting groups
+ internalTopics = retrieveInternalTopics(groupIds);
+
+ // delete streams groups
+ Map<String, KafkaFuture<Void>> groupsToDelete =
adminClient.deleteStreamsGroups(
+ groupIds,
+ withTimeoutMs(new DeleteStreamsGroupsOptions())
+ ).deletedGroups();
+
+ groupsToDelete.forEach((g, f) -> {
+ try {
+ f.get();
+ success.put(g, null);
+ } catch (InterruptedException ie) {
+ failed.put(g, ie);
+ } catch (ExecutionException e) {
+ failed.put(g, e.getCause());
+ }
+ });
+
+ // delete internal topics
+ if (!success.isEmpty()) {
+ for (String groupId : success.keySet()) {
+ List<String> internalTopicsToDelete =
internalTopics.get(groupId);
+ if (internalTopicsToDelete != null &&
!internalTopicsToDelete.isEmpty()) {
+ try {
+ DeleteTopicsResult deleteTopicsResult =
adminClient.deleteTopics(internalTopicsToDelete);
+ deleteTopicsResult.all().get();
+ } catch (InterruptedException | ExecutionException
e) {
+ internalTopicsDeletionFailures.put(groupId,
e.getCause());
+ }
+ }
+ }
+ }
+ }
+
+ // display outcome messages based on the results
+ if (failed.isEmpty()) {
+ System.out.println("Deletion of requested streams groups (" +
"'" +
success.keySet().stream().map(Object::toString).collect(Collectors.joining("',
'")) + "') was successful.");
+ } else {
+ printError("Deletion of some streams groups failed:",
Optional.empty());
+ failed.forEach((group, error) -> System.out.println("* Group
'" + group + "' could not be deleted due to: " + error));
+
+ if (!success.isEmpty()) {
+ System.out.println("\nThese streams groups were deleted
successfully: " + "'" +
success.keySet().stream().map(Object::toString).collect(Collectors.joining("',
'")) + "'.");
+ }
+ }
+ if (!internalTopics.keySet().isEmpty()) {
+ printInternalTopicErrors(internalTopicsDeletionFailures,
success.keySet(), internalTopics.keySet());
+ }
+ // for testing purpose: return all failures, including internal
topics deletion failures
+ failed.putAll(success);
+ failed.putAll(internalTopicsDeletionFailures);
+ return failed;
+ }
+
+ private Map<String, Throwable> preAdminCallChecks(List<String>
groupIds) {
+ List<GroupListing> streamsGroupIds = listDetailedStreamsGroups();
+ LinkedHashSet<String> groupIdSet = new LinkedHashSet<>(groupIds);
+
+ Map<String, Throwable> failed = new HashMap<>();
+
+ for (String groupId : groupIdSet) {
+ Optional<GroupListing> listing =
streamsGroupIds.stream().filter(item ->
item.groupId().equals(groupId)).findAny();
+ if (listing.isEmpty()) {
+ failed.put(groupId, new IllegalArgumentException("Group '"
+ groupId + "' does not exist or is not a streams group."));
+ } else {
+ Optional<GroupState> groupState =
listing.get().groupState();
+ groupState.ifPresent(state -> {
+ if (state == GroupState.DEAD) {
+ failed.put(groupId, new
IllegalStateException("Streams group '" + groupId + "' group state is DEAD."));
+ } else if (state != GroupState.EMPTY) {
+ failed.put(groupId, new
GroupNotEmptyException("Streams group '" + groupId + "' is not EMPTY."));
+ }
+ });
+ }
+ }
+ return failed;
+ }
+
+ List<GroupListing> listDetailedStreamsGroups() {
+ try {
+ ListGroupsResult result = adminClient.listGroups(new
ListGroupsOptions()
+
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
+ .withTypes(Set.of(GroupType.STREAMS)));
+ Collection<GroupListing> listings = result.all().get();
+ return listings.stream().toList();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void printInternalTopicErrors(Map<String, Throwable>
internalTopicsDeletionFailures,
+ Set<String> deletedGroupIds,
+ Set<String>
groupIdsWithInternalTopics) {
+ if (!deletedGroupIds.isEmpty()) {
+ if (internalTopicsDeletionFailures.isEmpty()) {
+ List<String> successfulGroups = deletedGroupIds.stream()
+ .filter(groupIdsWithInternalTopics::contains)
+ .collect(Collectors.toList());
+ System.out.println("Deletion of associated internal topics
of the streams groups ('" +
+ String.join("', '", successfulGroups) + "') was
successful.");
+ } else {
+ System.out.println("Deletion of some associated internal
topics failed:");
+ internalTopicsDeletionFailures.forEach((group, error) ->
+ System.out.println("* Internal topics of the streams
group '" + group + "' could not be deleted due to: " + error));
+ }
+ }
+ }
+
+ // Visibility for testing
+ Map<String, List<String>> retrieveInternalTopics(List<String>
groupIds) {
+ Map<String, List<String>> groupToInternalTopics = new HashMap<>();
+ try {
+ Map<String, StreamsGroupDescription> descriptionMap =
adminClient.describeStreamsGroups(groupIds).all().get();
+ for (StreamsGroupDescription description :
descriptionMap.values()) {
+
+ List<String> nonInternalTopics =
description.subtopologies().stream()
+ .flatMap(subtopology -> Stream.concat(
+ subtopology.sourceTopics().stream(),
+ subtopology.repartitionSinkTopics().stream()))
+ .distinct()
+ .toList();
+
Review Comment:
```suggestion
```
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -398,8 +407,201 @@ protected Admin createAdminClient(Map<String, String>
configOverrides) throws IO
props.putAll(configOverrides);
return Admin.create(props);
}
+
+ Map<String, Throwable> deleteGroups() {
+ List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+ ? new ArrayList<>(listStreamsGroups())
+ : new ArrayList<>(opts.options.valuesOf(opts.groupOpt));
+
+ // pre admin call checks
+ Map<String, Throwable> failed = preAdminCallChecks(groupIds);
+
+ groupIds.removeAll(failed.keySet());
+ Map<String, Throwable> success = new HashMap<>();
+ Map<String, List<String>> internalTopics = new HashMap<>();
+ Map<String, Throwable> internalTopicsDeletionFailures = new
HashMap<>();
+ if (!groupIds.isEmpty()) {
+ // retrieve internal topics before deleting groups
+ internalTopics = retrieveInternalTopics(groupIds);
+
+ // delete streams groups
+ Map<String, KafkaFuture<Void>> groupsToDelete =
adminClient.deleteStreamsGroups(
+ groupIds,
+ withTimeoutMs(new DeleteStreamsGroupsOptions())
+ ).deletedGroups();
+
+ groupsToDelete.forEach((g, f) -> {
+ try {
+ f.get();
+ success.put(g, null);
+ } catch (InterruptedException ie) {
+ failed.put(g, ie);
+ } catch (ExecutionException e) {
+ failed.put(g, e.getCause());
+ }
+ });
+
+ // delete internal topics
+ if (!success.isEmpty()) {
+ for (String groupId : success.keySet()) {
+ List<String> internalTopicsToDelete =
internalTopics.get(groupId);
+ if (internalTopicsToDelete != null &&
!internalTopicsToDelete.isEmpty()) {
+ try {
+ DeleteTopicsResult deleteTopicsResult =
adminClient.deleteTopics(internalTopicsToDelete);
+ deleteTopicsResult.all().get();
+ } catch (InterruptedException | ExecutionException
e) {
+ internalTopicsDeletionFailures.put(groupId,
e.getCause());
+ }
+ }
+ }
+ }
+ }
+
+ // display outcome messages based on the results
+ if (failed.isEmpty()) {
+ System.out.println("Deletion of requested streams groups (" +
"'" +
success.keySet().stream().map(Object::toString).collect(Collectors.joining("',
'")) + "') was successful.");
+ } else {
+ printError("Deletion of some streams groups failed:",
Optional.empty());
+ failed.forEach((group, error) -> System.out.println("* Group
'" + group + "' could not be deleted due to: " + error));
+
+ if (!success.isEmpty()) {
+ System.out.println("\nThese streams groups were deleted
successfully: " + "'" +
success.keySet().stream().map(Object::toString).collect(Collectors.joining("',
'")) + "'.");
+ }
+ }
+ if (!internalTopics.keySet().isEmpty()) {
+ printInternalTopicErrors(internalTopicsDeletionFailures,
success.keySet(), internalTopics.keySet());
+ }
+ // for testing purpose: return all failures, including internal
topics deletion failures
+ failed.putAll(success);
+ failed.putAll(internalTopicsDeletionFailures);
+ return failed;
+ }
+
+ private Map<String, Throwable> preAdminCallChecks(List<String>
groupIds) {
+ List<GroupListing> streamsGroupIds = listDetailedStreamsGroups();
+ LinkedHashSet<String> groupIdSet = new LinkedHashSet<>(groupIds);
+
+ Map<String, Throwable> failed = new HashMap<>();
+
+ for (String groupId : groupIdSet) {
+ Optional<GroupListing> listing =
streamsGroupIds.stream().filter(item ->
item.groupId().equals(groupId)).findAny();
+ if (listing.isEmpty()) {
+ failed.put(groupId, new IllegalArgumentException("Group '"
+ groupId + "' does not exist or is not a streams group."));
+ } else {
+ Optional<GroupState> groupState =
listing.get().groupState();
+ groupState.ifPresent(state -> {
+ if (state == GroupState.DEAD) {
+ failed.put(groupId, new
IllegalStateException("Streams group '" + groupId + "' group state is DEAD."));
+ } else if (state != GroupState.EMPTY) {
+ failed.put(groupId, new
GroupNotEmptyException("Streams group '" + groupId + "' is not EMPTY."));
+ }
+ });
+ }
+ }
+ return failed;
+ }
+
+ List<GroupListing> listDetailedStreamsGroups() {
+ try {
+ ListGroupsResult result = adminClient.listGroups(new
ListGroupsOptions()
+
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
+ .withTypes(Set.of(GroupType.STREAMS)));
+ Collection<GroupListing> listings = result.all().get();
+ return listings.stream().toList();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void printInternalTopicErrors(Map<String, Throwable>
internalTopicsDeletionFailures,
+ Set<String> deletedGroupIds,
+ Set<String>
groupIdsWithInternalTopics) {
+ if (!deletedGroupIds.isEmpty()) {
+ if (internalTopicsDeletionFailures.isEmpty()) {
+ List<String> successfulGroups = deletedGroupIds.stream()
+ .filter(groupIdsWithInternalTopics::contains)
+ .collect(Collectors.toList());
+ System.out.println("Deletion of associated internal topics
of the streams groups ('" +
+ String.join("', '", successfulGroups) + "') was
successful.");
+ } else {
+ System.out.println("Deletion of some associated internal
topics failed:");
+ internalTopicsDeletionFailures.forEach((group, error) ->
+ System.out.println("* Internal topics of the streams
group '" + group + "' could not be deleted due to: " + error));
+ }
+ }
+ }
+
+ // Visibility for testing
+ Map<String, List<String>> retrieveInternalTopics(List<String>
groupIds) {
+ Map<String, List<String>> groupToInternalTopics = new HashMap<>();
+ try {
+ Map<String, StreamsGroupDescription> descriptionMap =
adminClient.describeStreamsGroups(groupIds).all().get();
+ for (StreamsGroupDescription description :
descriptionMap.values()) {
+
+ List<String> nonInternalTopics =
description.subtopologies().stream()
+ .flatMap(subtopology -> Stream.concat(
+ subtopology.sourceTopics().stream(),
+ subtopology.repartitionSinkTopics().stream()))
+ .distinct()
+ .toList();
+
+
+ List<String> internalTopics =
description.subtopologies().stream()
+ .flatMap(subtopology -> Stream.concat(
+
subtopology.repartitionSourceTopics().keySet().stream(),
+
subtopology.stateChangelogTopics().keySet().stream()))
+ .filter(topic -> !nonInternalTopics.contains(topic))
+ .collect(Collectors.toList());
+ internalTopics.removeIf(topic -> {
+ if (!isInferredInternalTopic(topic,
description.groupId())) {
+ printError("The internal topic '" + topic + "' is
not inferred as internal " +
+ "and thus will not be deleted with the group
'" + description.groupId() + "'.", Optional.empty());
+ return true;
+ }
+ return false;
+ });
+ if (!internalTopics.isEmpty()) {
+ groupToInternalTopics.put(description.groupId(),
internalTopics);
+ }
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ if (e.getCause() instanceof UnsupportedVersionException) {
+ printError("Retrieving internal topics is not supported by
the broker version. " +
+ "Use 'kafka-topics.sh' to list and delete the group's
internal topics.", Optional.of(e.getCause()));
+ } else {
+ printError("Retrieving internal topics failed due to " +
e.getMessage(), Optional.of(e));
+ }
+ }
+ return groupToInternalTopics;
+ }
+
+ private boolean isInferredInternalTopic(final String topicName, final
String applicationId) {
+ return topicName.startsWith(applicationId + "-") &&
matchesInternalTopicFormat(topicName);
+ }
+
+ public static boolean matchesInternalTopicFormat(final String
topicName) {
+ return topicName.endsWith("-changelog") ||
topicName.endsWith("-repartition")
+ || topicName.endsWith("-subscription-registration-topic")
+ || topicName.endsWith("-subscription-response-topic")
+ ||
topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-\\d+-topic")
+ ||
topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-\\d+-topic");
+ }
+
+ Collection<StreamsGroupMemberDescription> collectGroupMembers(String
groupId) throws Exception {
+ return getDescribeGroup(groupId).members();
+ }
+
+ GroupState collectGroupState(String groupId) throws Exception {
+ return getDescribeGroup(groupId).groupState();
+ }
+
+ private <T extends AbstractOptions<T>> T withTimeoutMs(T options) {
+ int t = opts.options.valueOf(opts.timeoutMsOpt).intValue();
+ return options.timeoutMs(t);
+ }
}
+
Review Comment:
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]