FrankYang0529 commented on code in PR #19433:
URL: https://github.com/apache/kafka/pull/19433#discussion_r2039384250


##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -154,6 +166,173 @@ private void printGroupInfo(List<GroupListing> groups) {
             }
         }
 
+        public void describeGroups() throws ExecutionException, 
InterruptedException {
+            String group = opts.options.valueOf(opts.groupOpt);
+            StreamsGroupDescription description = getDescribeGroup(group);
+            if (description == null)
+                return;
+            boolean verbose =  opts.options.has(opts.verboseOpt);
+            if (opts.options.has(opts.membersOpt)) {
+                printMembers(description, verbose);
+            } else if (opts.options.has(opts.stateOpt)) {
+                printStates(description, verbose);
+            } else {
+                printOffsets(description, verbose);
+            }
+        }
+
+        StreamsGroupDescription getDescribeGroup(String group) throws 
ExecutionException, InterruptedException {
+            DescribeStreamsGroupsResult result = 
adminClient.describeStreamsGroups(List.of(group));
+            Map<String, StreamsGroupDescription> descriptionMap = 
result.all().get();
+            return descriptionMap.get(group);
+        }
+
+        private void printMembers(StreamsGroupDescription description, boolean 
verbose) {
+            int groupLen = Math.max(15, description.groupId().length());
+            int maxMemberIdLen = 15, maxHostLen = 15, maxClientIdLen = 15;
+            Collection<StreamsGroupMemberDescription> members = 
description.members();
+            if (maybePrintEmptyGroupState(description.groupId(), 
description.groupState(), description.members().size())) {
+                for (StreamsGroupMemberDescription member : members) {
+                    maxMemberIdLen = Math.max(maxMemberIdLen, 
member.memberId().length());
+                    maxHostLen = Math.max(maxHostLen, 
member.processId().length());
+                    maxClientIdLen = Math.max(maxClientIdLen, 
member.clientId().length());
+                }
+
+                if (!verbose) {
+                    String fmt = "%" + -groupLen + "s %" + -maxMemberIdLen + 
"s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n";
+                    for (StreamsGroupMemberDescription member : members) {
+                        System.out.printf(fmt, "GROUP", "MEMBER", "PROCESS", 
"CLIENT-ID");
+                        System.out.printf(fmt, description.groupId(), 
member.memberId(), member.processId(), member.clientId());
+                        printTasks(member.assignment(), false);
+                        System.out.println();
+                    }
+                } else {
+                    String fmt = "%" + -groupLen + "s %s %-15s%" + 
-maxMemberIdLen + "s %s %15s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n";

Review Comment:
   ```suggestion
                       String fmt = "%" + -groupLen + "s %-23s %-15s%" + 
-maxMemberIdLen + "s %-15s %-15s %" + -maxHostLen + "s %" + -maxClientIdLen + 
"s\n";
   ```
   
   Current format may get result like following:
   
   ```
   GROUP           TARGET-ASSIGNMENT-EPOCH TOPOLOGY-EPOCH MEMBER          
MEMBER-PROTOCOL    MEMBER-EPOCH PROCESS         CLIENT-ID      
   group1          0 0              foo             streams               0 qux 
            bar            
   
   ```



##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -154,6 +166,173 @@ private void printGroupInfo(List<GroupListing> groups) {
             }
         }
 
+        public void describeGroups() throws ExecutionException, 
InterruptedException {
+            String group = opts.options.valueOf(opts.groupOpt);
+            StreamsGroupDescription description = getDescribeGroup(group);
+            if (description == null)
+                return;
+            boolean verbose =  opts.options.has(opts.verboseOpt);
+            if (opts.options.has(opts.membersOpt)) {
+                printMembers(description, verbose);
+            } else if (opts.options.has(opts.stateOpt)) {
+                printStates(description, verbose);
+            } else {
+                printOffsets(description, verbose);
+            }
+        }
+
+        StreamsGroupDescription getDescribeGroup(String group) throws 
ExecutionException, InterruptedException {
+            DescribeStreamsGroupsResult result = 
adminClient.describeStreamsGroups(List.of(group));
+            Map<String, StreamsGroupDescription> descriptionMap = 
result.all().get();
+            return descriptionMap.get(group);
+        }
+
+        private void printMembers(StreamsGroupDescription description, boolean 
verbose) {
+            int groupLen = Math.max(15, description.groupId().length());
+            int maxMemberIdLen = 15, maxHostLen = 15, maxClientIdLen = 15;
+            Collection<StreamsGroupMemberDescription> members = 
description.members();
+            if (maybePrintEmptyGroupState(description.groupId(), 
description.groupState(), description.members().size())) {
+                for (StreamsGroupMemberDescription member : members) {
+                    maxMemberIdLen = Math.max(maxMemberIdLen, 
member.memberId().length());
+                    maxHostLen = Math.max(maxHostLen, 
member.processId().length());
+                    maxClientIdLen = Math.max(maxClientIdLen, 
member.clientId().length());
+                }
+
+                if (!verbose) {
+                    String fmt = "%" + -groupLen + "s %" + -maxMemberIdLen + 
"s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n";
+                    for (StreamsGroupMemberDescription member : members) {
+                        System.out.printf(fmt, "GROUP", "MEMBER", "PROCESS", 
"CLIENT-ID");
+                        System.out.printf(fmt, description.groupId(), 
member.memberId(), member.processId(), member.clientId());
+                        printTasks(member.assignment(), false);
+                        System.out.println();
+                    }
+                } else {
+                    String fmt = "%" + -groupLen + "s %s %-15s%" + 
-maxMemberIdLen + "s %s %15s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n";
+                    for (StreamsGroupMemberDescription member : members) {
+                        System.out.printf(fmt, "GROUP", 
"TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", 
"MEMBER-EPOCH", "PROCESS", "CLIENT-ID");
+                        System.out.printf(fmt, description.groupId(), 
description.targetAssignmentEpoch(), description.topologyEpoch(), 
member.memberId(),
+                            member.isClassic() ? "classic" : "streams", 
member.memberEpoch(), member.processId(), member.clientId());
+                        printTasks(member.assignment(), false);
+                        printTasks(member.targetAssignment(), true);
+                        System.out.println();
+                    }
+                }
+            }
+        }
+
+        private void printTaskType(List<StreamsGroupMemberAssignment.TaskIds> 
tasks, String taskType) {
+            System.out.printf("%s%n", taskType + ": " + 
tasks.stream().map(taskId -> taskId.subtopologyId() + ": [" + 
taskId.partitions()).collect(Collectors.joining(",")) + "] ");
+        }
+
+        private void printTasks(StreamsGroupMemberAssignment assignment, 
boolean isTarget) {
+            String typePrefix = isTarget ? "TARGET-" : "";
+            printTaskType(assignment.activeTasks(), typePrefix + 
"ACTIVE-TASKS:");
+            printTaskType(assignment.standbyTasks(), typePrefix + 
"STANDBY-TASKS:");
+            printTaskType(assignment.warmupTasks(), typePrefix + 
"WARMUP-TASKS:");

Review Comment:
   ```suggestion
               printTaskType(assignment.activeTasks(), typePrefix + 
"ACTIVE-TASKS");
               printTaskType(assignment.standbyTasks(), typePrefix + 
"STANDBY-TASKS");
               printTaskType(assignment.warmupTasks(), typePrefix + 
"WARMUP-TASKS");
   ```
   
   Remove colon here because we already have it in `printTaskType`. (... 
+taskType + ": " + ...)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to