kirktrue commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1503175975
##########
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##########
@@ -47,8 +49,25 @@ public DescribeTopicsOptions
includeAuthorizedOperations(boolean includeAuthoriz
return this;
}
+ public DescribeTopicsOptions useDescribeTopicsApi(boolean
useDescribeTopicsApi) {
+ this.useDescribeTopicsApi = useDescribeTopicsApi;
+ return this;
+ }
Review Comment:
Can we add some comments here for a developer to know _why_ to use the
topics API or not?
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -994,6 +1002,36 @@ public boolean isInternal() {
}
}
+ abstract class RecurringCall {
+ private final String name;
+ final long deadlineMs;
+ private final AdminClientRunnable runnable;
+ KafkaFutureImpl<Boolean> nextRun;
+ abstract Call generateCall();
+
+ public RecurringCall(String name, long deadlineMs, AdminClientRunnable
runnable) {
+ this.name = name;
+ this.deadlineMs = deadlineMs;
+ this.runnable = runnable;
+ }
+
+ public String toString() {
+ return "RecurCall(name=" + name + ", deadlineMs=" + deadlineMs +
")";
Review Comment:
```suggestion
return "RecurringCall(name=" + name + ", deadlineMs=" +
deadlineMs + ")";
```
##########
clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java:
##########
@@ -79,9 +95,24 @@ public List<Node> isr() {
return isr;
}
+ /**
+ * Return the eligible leader replicas of the partition. Note that the
ordering of the result is unspecified.
+ */
+ public List<Node> elr() {
+ return elr;
+ }
+
+ /**
+ * Return the last known eligible leader replicas of the partition. Note
that the ordering of the result is unspecified.
+ */
+ public List<Node> lastKnownElr() {
+ return lastKnownElr;
+ }
+
public String toString() {
return "(partition=" + partition + ", leader=" + leader + ",
replicas=" +
- Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") +
")";
+ Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") +
+ Utils.join(elr, ", ") + Utils.join(lastKnownElr, ", ") + ")";
Review Comment:
```suggestion
Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") +
", elr=" +
Utils.join(elr, ", ") + ", lastKnownElr=" +
Utils.join(lastKnownElr, ", ") + ")";
```
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -994,6 +1002,36 @@ public boolean isInternal() {
}
}
+ abstract class RecurringCall {
+ private final String name;
+ final long deadlineMs;
+ private final AdminClientRunnable runnable;
+ KafkaFutureImpl<Boolean> nextRun;
+ abstract Call generateCall();
+
+ public RecurringCall(String name, long deadlineMs, AdminClientRunnable
runnable) {
+ this.name = name;
+ this.deadlineMs = deadlineMs;
+ this.runnable = runnable;
+ }
+
+ public String toString() {
+ return "RecurCall(name=" + name + ", deadlineMs=" + deadlineMs +
")";
+ }
+
+ public void run() {
+ try {
+ do {
+ nextRun = new KafkaFutureImpl<>();
+ Call call = generateCall();
+ runnable.call(call, time.milliseconds());
+ } while (nextRun.get());
+ } catch (Exception e) {
+ log.info("Stop the recurring call " + name + " because " + e);
Review Comment:
Are we specifically wanting to avoid outputting a stack trace?
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2108,9 +2146,12 @@ void handleFailure(Throwable throwable) {
public DescribeTopicsResult describeTopics(final TopicCollection topics,
DescribeTopicsOptions options) {
if (topics instanceof TopicIdCollection)
return
DescribeTopicsResult.ofTopicIds(handleDescribeTopicsByIds(((TopicIdCollection)
topics).topicIds(), options));
- else if (topics instanceof TopicNameCollection)
+ else if (topics instanceof TopicNameCollection) {
+ if (options.useDescribeTopicsApi()) {
+ return DescribeTopicsResult.ofTopicNameIterator(new
DescribeTopicPartitionsIterator(((TopicNameCollection) topics).topicNames(),
options));
Review Comment:
It's been my experience that it's "dangerous" 😨 to run arbitrary user code
from within the context of the client code. User code can (and will) do
unpredictable things with state, errors, threads, etc. The surrounding code
inside the client has to be very careful to make sure it handles many different
cases.
That said, I like the ergonomics of the `Consumer`-based approach over the
iterator.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]