lucasbru commented on code in PR #19646:
URL: https://github.com/apache/kafka/pull/19646#discussion_r2157280851
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -678,29 +692,7 @@ Map<String, Throwable> deleteGroups() {
});
// delete internal topics
- if (!success.isEmpty()) {
- for (String groupId : success.keySet()) {
- List<String> internalTopicsToDelete =
internalTopics.get(groupId);
- if (internalTopicsToDelete != null &&
!internalTopicsToDelete.isEmpty()) {
- DeleteTopicsResult deleteTopicsResult = null;
- try {
- deleteTopicsResult =
adminClient.deleteTopics(internalTopicsToDelete);
- deleteTopicsResult.all().get();
- } catch (InterruptedException | ExecutionException
e) {
- if (deleteTopicsResult != null) {
-
deleteTopicsResult.topicNameValues().forEach((topic, future) -> {
- try {
- future.get();
- } catch (Exception topicException) {
- System.out.println("Failed to
delete internal topic: " + topic);
- }
- });
- }
- internalTopicsDeletionFailures.put(groupId,
e.getCause());
- }
- }
- }
- }
+ internalTopicsDeletionFailures =
maybeDeleteInternalTopics(deleteInternalTopics, success, internalTopics);
Review Comment:
If you pass "internalTopicsToBeDeleted", you don't need to pass
"deleteInternalTopics"
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -483,8 +483,8 @@ Map<String, Map<TopicPartition, OffsetAndMetadata>>
resetOffsets() {
result.put(groupId,
resetOffsetsForInactiveGroup(groupId, dryRun));
// delete internal topics
if (!dryRun) {
- List<String> internalTopics =
retrieveInternalTopics(List.of(groupId)).get(groupId);
- if (internalTopics != null &&
!internalTopics.isEmpty()) {
+ List<String> internalTopics =
getInternalTopicsForGroup(groupId);
Review Comment:
`getInternalTopicsToBeDeleted` maybe?
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java:
##########
@@ -73,6 +73,10 @@ public class StreamsGroupCommandOptions extends
CommandDefaultOptions {
private static final String RESET_TO_LATEST_DOC = "Reset offsets to latest
offset.";
private static final String RESET_TO_CURRENT_DOC = "Reset offsets to
current offset.";
private static final String RESET_SHIFT_BY_DOC = "Reset offsets shifting
current offset by 'n', where 'n' can be positive or negative.";
+ private static final String DELETE_INTERNAL_TOPIC_DOC = "Delete specified
internal topic of the streams group. Supported operations: reset-offsets." +
Review Comment:
also with --delete
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -723,6 +715,36 @@ Map<String, Throwable> deleteGroups() {
return failed;
}
+ private Map<String, Throwable> maybeDeleteInternalTopics(boolean
deleteInternalTopics,
+ Map<String, Throwable> success,
Review Comment:
formatting is off
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java:
##########
@@ -73,6 +73,10 @@ public class StreamsGroupCommandOptions extends
CommandDefaultOptions {
private static final String RESET_TO_LATEST_DOC = "Reset offsets to latest
offset.";
private static final String RESET_TO_CURRENT_DOC = "Reset offsets to
current offset.";
private static final String RESET_SHIFT_BY_DOC = "Reset offsets shifting
current offset by 'n', where 'n' can be positive or negative.";
+ private static final String DELETE_INTERNAL_TOPIC_DOC = "Delete specified
internal topic of the streams group. Supported operations: reset-offsets." +
+ "This option is applicable only when --execute is used.";
+ private static final String DELETE_ALL_INTERNAL_TOPICS_DOC = "Delete all
internal topics linked to the streams group. Supported operations:
reset-offsets, delete." +
Review Comment:
also with --delete
##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -659,7 +671,9 @@ Map<String, Throwable> deleteGroups() {
Map<String, Throwable> internalTopicsDeletionFailures = new
HashMap<>();
if (!groupIds.isEmpty()) {
// retrieve internal topics before deleting groups
- internalTopics = retrieveInternalTopics(groupIds);
+ if (deleteInternalTopics) {
+ internalTopics = retrieveInternalTopics(groupIds);
Review Comment:
`internalTopicsToBeDeleted`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]