oneby-wang commented on code in PR #25126:
URL: https://github.com/apache/pulsar/pull/25126#discussion_r3144647017


##########
pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java:
##########
@@ -1738,10 +1737,6 @@ public void topics() throws Exception {
         verify(mockTopics).createSubscription("persistent://myprop/ns1/ds1", 
"sub1",
                 MessageId.earliest, false, null);
 
-        cmdTopics.run(split("analyze-backlog persistent://myprop/ns1/ds1 -s 
sub1"));
-        
verify(mockTopics).analyzeSubscriptionBacklog("persistent://myprop/ns1/ds1", 
"sub1",
-                Optional.empty());
-

Review Comment:
   Addressed.



##########
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java:
##########
@@ -3002,24 +3003,44 @@ private class AnalyzeBacklog extends CliCommand {
         @Parameters(description = "persistent://tenant/namespace/topic", arity 
= "1")
         private String topicName;
 
-        @Option(names = { "-s", "--subscription" }, description = 
"Subscription to be analyzed", required = true)
+        @Option(names = {"-s", "--subscription"}, description = "Subscription 
to be analyzed", required = true)
         private String subName;
 
-        @Option(names = { "--position",
-                "-p" }, description = "message position to start the scan from 
(ledgerId:entryId)", required = false)
+        @Option(names = {"--position",
+                "-p"}, description = "Message position to start the scan from 
(ledgerId:entryId)", required = false)
         private String messagePosition;
 
+        @Option(names = {"--backlog-scan-max-entries",
+                "-b"}, description = "The maximum number of backlog entries 
the client will scan before terminating "
+                + "its loop", required = false)
+        private long backlogScanMaxEntries = -1;
+
+        @Option(names = {"--quiet", "-q"}, description = "Disable 
analyze-backlog progress reporting", required = false)
+        private boolean quiet = false;
+
+        @Option(names = {"--plain-print",
+                "-pp"}, description = "Plain(Non-pretty) print the final 
result output as NDJSON", required = false)

Review Comment:
   Addressed.



##########
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java:
##########
@@ -3002,24 +3003,44 @@ private class AnalyzeBacklog extends CliCommand {
         @Parameters(description = "persistent://tenant/namespace/topic", arity 
= "1")
         private String topicName;
 
-        @Option(names = { "-s", "--subscription" }, description = 
"Subscription to be analyzed", required = true)
+        @Option(names = {"-s", "--subscription"}, description = "Subscription 
to be analyzed", required = true)
         private String subName;
 
-        @Option(names = { "--position",
-                "-p" }, description = "message position to start the scan from 
(ledgerId:entryId)", required = false)
+        @Option(names = {"--position",
+                "-p"}, description = "Message position to start the scan from 
(ledgerId:entryId)", required = false)
         private String messagePosition;
 
+        @Option(names = {"--backlog-scan-max-entries",
+                "-b"}, description = "The maximum number of backlog entries 
the client will scan before terminating "
+                + "its loop", required = false)
+        private long backlogScanMaxEntries = -1;
+
+        @Option(names = {"--quiet", "-q"}, description = "Disable 
analyze-backlog progress reporting", required = false)
+        private boolean quiet = false;
+
+        @Option(names = {"--plain-print",
+                "-pp"}, description = "Plain(Non-pretty) print the final 
result output as NDJSON", required = false)
+        private boolean plainPrint = false;
+
         @Override
-        void run() throws PulsarAdminException {
+        void run() throws Exception {
             String persistentTopic = validatePersistentTopic(topicName);
             Optional<MessageId> startPosition = Optional.empty();
+            int partitionIndex = 
TopicName.get(persistentTopic).getPartitionIndex();
             if (isNotBlank(messagePosition)) {
-                int partitionIndex = 
TopicName.get(persistentTopic).getPartitionIndex();
                 MessageId messageId = validateMessageIdString(messagePosition, 
partitionIndex);
                 startPosition = Optional.of(messageId);
             }
-            print(getTopics().analyzeSubscriptionBacklog(persistentTopic, 
subName, startPosition));
 
+            AnalyzeSubscriptionBacklogResult backlogResult =
+                    
getTopics().analyzeSubscriptionBacklogAsync(persistentTopic, subName, 
startPosition, result -> {
+                        boolean terminate = result.getEntries() >= 
backlogScanMaxEntries;
+                        if (!quiet && !terminate) {
+                            print(result, false);
+                        }
+                        return terminate;
+                    }).get();

Review Comment:
   Addressed.



##########
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java:
##########
@@ -3002,24 +3003,44 @@ private class AnalyzeBacklog extends CliCommand {
         @Parameters(description = "persistent://tenant/namespace/topic", arity 
= "1")
         private String topicName;
 
-        @Option(names = { "-s", "--subscription" }, description = 
"Subscription to be analyzed", required = true)
+        @Option(names = {"-s", "--subscription"}, description = "Subscription 
to be analyzed", required = true)
         private String subName;
 
-        @Option(names = { "--position",
-                "-p" }, description = "message position to start the scan from 
(ledgerId:entryId)", required = false)
+        @Option(names = {"--position",
+                "-p"}, description = "Message position to start the scan from 
(ledgerId:entryId)", required = false)
         private String messagePosition;
 
+        @Option(names = {"--backlog-scan-max-entries",
+                "-b"}, description = "The maximum number of backlog entries 
the client will scan before terminating "
+                + "its loop", required = false)
+        private long backlogScanMaxEntries = -1;

Review Comment:
   Addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to