AndrewJSchofield commented on code in PR #19758:
URL: https://github.com/apache/kafka/pull/19758#discussion_r2108655166
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -398,6 +403,127 @@ protected Admin createAdminClient(Map<String, String>
configOverrides) throws IO
props.putAll(configOverrides);
return Admin.create(props);
}
+
+ Map<String, Throwable> deleteGroups() {
+ List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+ ? listStreamsGroups()
+ : opts.options.valuesOf(opts.groupOpt);
+
+ Map<String, Throwable> success = new HashMap<>();
+ Map<String, Throwable> failed = new HashMap<>();
+
+ // retrieve internal topics before deleting groups
+ Map<String, List<String>> internalTopics =
retrieveInternalTopics(groupIds);
+
+ Map<String, KafkaFuture<Void>> groupsToDelete =
adminClient.deleteStreamsGroups(
+ groupIds
+ ).deletedGroups();
+
+ groupsToDelete.forEach((g, f) -> {
+ try {
+ f.get();
+ success.put(g, null);
+ } catch (InterruptedException ie) {
+ failed.put(g, ie);
+ } catch (ExecutionException e) {
+ failed.put(g, e.getCause());
+ }
+ });
+
+ // delete internal topics
+ Map<String, Throwable> internalTopicsDeletionFailures = new
HashMap<>();
+ if (!success.isEmpty()) {
+ for (String groupId : success.keySet()) {
+ List<String> internalTopicsToDelete =
internalTopics.get(groupId);
+ if (internalTopicsToDelete != null &&
!internalTopicsToDelete.isEmpty()) {
+ try {
+ DeleteTopicsResult deleteTopicsResult =
adminClient.deleteTopics(internalTopicsToDelete);
+ deleteTopicsResult.all().get();
+ } catch (InterruptedException | ExecutionException e) {
+ internalTopicsDeletionFailures.put(groupId,
e.getCause());
+ }
+ }
+ }
+ }
+
+ if (failed.isEmpty()) {
+ System.out.println("Deletion of requested streams groups (" +
"'" +
success.keySet().stream().map(Object::toString).collect(Collectors.joining(",
")) + "'" + ") was successful.");
+ } else {
+ printError("Deletion of some streams groups failed:",
Optional.empty());
+ failed.forEach((group, error) -> System.out.println("* Group
'" + group + "' could not be deleted due to: " + error));
+
+ if (!success.isEmpty()) {
+ System.out.println("\nThese streams groups were deleted
successfully: " + "'" +
success.keySet().stream().map(Object::toString).collect(Collectors.joining("'"))
+ "', '");
+ }
+ }
+
+ if (!internalTopics.keySet().isEmpty()) {
+ printInternalTopicErrors(internalTopicsDeletionFailures,
success.keySet(), internalTopics.keySet());
+ }
+
+ failed.putAll(success);
+ failed.putAll(internalTopicsDeletionFailures);
Review Comment:
If the deletion of a streams group succeeds but the deletion of its internal
topics fails, it appears as if this failure is reported as a group-level
failure via the `failed` map. This seems misleading because the group deletion
succeeded. I suppose a case could be made that this is only used for testing
purposes, and I could understand that might be useful. If so, please could we
have a comment making clear what's going on here?
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -398,6 +403,127 @@ protected Admin createAdminClient(Map<String, String>
configOverrides) throws IO
props.putAll(configOverrides);
return Admin.create(props);
}
+
+ Map<String, Throwable> deleteGroups() {
+ List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+ ? listStreamsGroups()
+ : opts.options.valuesOf(opts.groupOpt);
+
+ Map<String, Throwable> success = new HashMap<>();
+ Map<String, Throwable> failed = new HashMap<>();
+
+ // retrieve internal topics before deleting groups
+ Map<String, List<String>> internalTopics =
retrieveInternalTopics(groupIds);
+
+ Map<String, KafkaFuture<Void>> groupsToDelete =
adminClient.deleteStreamsGroups(
+ groupIds
Review Comment:
You should apply the configured timeout on this operation.
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -398,6 +403,127 @@ protected Admin createAdminClient(Map<String, String>
configOverrides) throws IO
props.putAll(configOverrides);
return Admin.create(props);
}
+
+ Map<String, Throwable> deleteGroups() {
+ List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+ ? listStreamsGroups()
+ : opts.options.valuesOf(opts.groupOpt);
+
+ Map<String, Throwable> success = new HashMap<>();
+ Map<String, Throwable> failed = new HashMap<>();
+
+ // retrieve internal topics before deleting groups
+ Map<String, List<String>> internalTopics =
retrieveInternalTopics(groupIds);
+
+ Map<String, KafkaFuture<Void>> groupsToDelete =
adminClient.deleteStreamsGroups(
Review Comment:
If specific group IDs are provided but they are not streams groups, the
command still deletes them. In `kafka-share-groups.sh`, there is a specific
test for group type to guard against this. It seems to me this would be a good
safety measure here too.
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -398,6 +403,127 @@ protected Admin createAdminClient(Map<String, String>
configOverrides) throws IO
props.putAll(configOverrides);
return Admin.create(props);
}
+
+ Map<String, Throwable> deleteGroups() {
+ List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+ ? listStreamsGroups()
+ : opts.options.valuesOf(opts.groupOpt);
+
+ Map<String, Throwable> success = new HashMap<>();
+ Map<String, Throwable> failed = new HashMap<>();
+
+ // retrieve internal topics before deleting groups
+ Map<String, List<String>> internalTopics =
retrieveInternalTopics(groupIds);
+
+ Map<String, KafkaFuture<Void>> groupsToDelete =
adminClient.deleteStreamsGroups(
+ groupIds
+ ).deletedGroups();
+
+ groupsToDelete.forEach((g, f) -> {
+ try {
+ f.get();
+ success.put(g, null);
+ } catch (InterruptedException ie) {
+ failed.put(g, ie);
+ } catch (ExecutionException e) {
+ failed.put(g, e.getCause());
+ }
+ });
+
+ // delete internal topics
+ Map<String, Throwable> internalTopicsDeletionFailures = new
HashMap<>();
+ if (!success.isEmpty()) {
+ for (String groupId : success.keySet()) {
+ List<String> internalTopicsToDelete =
internalTopics.get(groupId);
+ if (internalTopicsToDelete != null &&
!internalTopicsToDelete.isEmpty()) {
+ try {
+ DeleteTopicsResult deleteTopicsResult =
adminClient.deleteTopics(internalTopicsToDelete);
+ deleteTopicsResult.all().get();
+ } catch (InterruptedException | ExecutionException e) {
+ internalTopicsDeletionFailures.put(groupId,
e.getCause());
+ }
+ }
+ }
+ }
+
+ if (failed.isEmpty()) {
+ System.out.println("Deletion of requested streams groups (" +
"'" +
success.keySet().stream().map(Object::toString).collect(Collectors.joining(",
")) + "'" + ") was successful.");
Review Comment:
The output here would be more elegant if the group IDs were surrounded by
the `'`, as opposed to surrounding the list. For example, this strikes as a bit
odd: `Deletion of requested streams groups ('') was successful.`.
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -398,6 +403,127 @@ protected Admin createAdminClient(Map<String, String>
configOverrides) throws IO
props.putAll(configOverrides);
return Admin.create(props);
}
+
+ Map<String, Throwable> deleteGroups() {
+ List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+ ? listStreamsGroups()
+ : opts.options.valuesOf(opts.groupOpt);
+
+ Map<String, Throwable> success = new HashMap<>();
+ Map<String, Throwable> failed = new HashMap<>();
+
+ // retrieve internal topics before deleting groups
+ Map<String, List<String>> internalTopics =
retrieveInternalTopics(groupIds);
+
+ Map<String, KafkaFuture<Void>> groupsToDelete =
adminClient.deleteStreamsGroups(
+ groupIds
+ ).deletedGroups();
+
+ groupsToDelete.forEach((g, f) -> {
+ try {
+ f.get();
+ success.put(g, null);
+ } catch (InterruptedException ie) {
+ failed.put(g, ie);
+ } catch (ExecutionException e) {
+ failed.put(g, e.getCause());
+ }
+ });
+
+ // delete internal topics
+ Map<String, Throwable> internalTopicsDeletionFailures = new
HashMap<>();
+ if (!success.isEmpty()) {
+ for (String groupId : success.keySet()) {
+ List<String> internalTopicsToDelete =
internalTopics.get(groupId);
+ if (internalTopicsToDelete != null &&
!internalTopicsToDelete.isEmpty()) {
+ try {
+ DeleteTopicsResult deleteTopicsResult =
adminClient.deleteTopics(internalTopicsToDelete);
+ deleteTopicsResult.all().get();
+ } catch (InterruptedException | ExecutionException e) {
+ internalTopicsDeletionFailures.put(groupId,
e.getCause());
+ }
+ }
+ }
+ }
+
+ if (failed.isEmpty()) {
+ System.out.println("Deletion of requested streams groups (" +
"'" +
success.keySet().stream().map(Object::toString).collect(Collectors.joining(",
")) + "'" + ") was successful.");
+ } else {
+ printError("Deletion of some streams groups failed:",
Optional.empty());
+ failed.forEach((group, error) -> System.out.println("* Group
'" + group + "' could not be deleted due to: " + error));
+
+ if (!success.isEmpty()) {
+ System.out.println("\nThese streams groups were deleted
successfully: " + "'" +
success.keySet().stream().map(Object::toString).collect(Collectors.joining("'"))
+ "', '");
+ }
+ }
+
+ if (!internalTopics.keySet().isEmpty()) {
+ printInternalTopicErrors(internalTopicsDeletionFailures,
success.keySet(), internalTopics.keySet());
+ }
+
+ failed.putAll(success);
+ failed.putAll(internalTopicsDeletionFailures);
+ return failed;
+ }
+
+ private void printInternalTopicErrors(Map<String, Throwable>
internalTopicsDeletionFailures,
+ Set<String> deletedGroupIds,
+ Set<String>
groupIdsWithInternalTopics) {
+ if (!deletedGroupIds.isEmpty()) {
+ if (internalTopicsDeletionFailures.isEmpty()) {
+ List<String> successfulGroups = deletedGroupIds.stream()
+ .filter(groupIdsWithInternalTopics::contains)
+ .collect(Collectors.toList());
+ System.out.println("Deletion of associated internal topics
of the streams groups ('" +
+ String.join(", ", successfulGroups) + "') was
successful.");
+ } else {
+ System.out.println("Deletion of some associated internal
topics failed:");
+ internalTopicsDeletionFailures.forEach((group, error) ->
+ System.out.println("* Group '" + group + "' could not
be deleted due to: " + error));
+ }
+ }
+ }
+
+ // Visibility for testing
+ Map<String, List<String>> retrieveInternalTopics(List<String>
groupIds) {
+ Map<String, List<String>> groupToInternalTopics = new HashMap<>();
+ try {
+ Map<String, StreamsGroupDescription> descriptionMap =
adminClient.describeStreamsGroups(groupIds).all().get();
+ for (StreamsGroupDescription description :
descriptionMap.values()) {
+
+ List<String> sourceTopics =
description.subtopologies().stream()
+ .flatMap(subtopology ->
subtopology.sourceTopics().stream())
+ .toList();
+
+ List<String> internalTopics =
description.subtopologies().stream()
+ .flatMap(subtopology -> Stream.concat(
+
subtopology.repartitionSourceTopics().keySet().stream(),
+
subtopology.stateChangelogTopics().keySet().stream()))
+ .filter(topic -> !sourceTopics.contains(topic))
+ .collect(Collectors.toList());
+ if (!internalTopics.isEmpty()) {
+ groupToInternalTopics.put(description.groupId(),
internalTopics);
+ }
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ if (e.getCause() instanceof UnsupportedVersionException) {
+ printError("Retrieving internal topics is not supported by
the broker version. " +
+ "Please execute --delete --internal-topics <topic
names> to delete the group's associated internal topics.",
Optional.of(e.getCause()));
Review Comment:
This is referring to a non-existent option I think (or at least not yet
implemented). I would prefer not to introduce the error message in advance of
the code that the user would use to rectify the problem.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]