kirktrue commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1506629098
##########
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##########
@@ -335,6 +335,21 @@ default DescribeTopicsResult
describeTopics(TopicCollection topics) {
*/
DescribeTopicsResult describeTopics(TopicCollection topics,
DescribeTopicsOptions options);
+ /**
+ * Describe some topics in the cluster.
+ *
+ * When using topic IDs, this operation is supported by brokers with
version 3.1.0 or higher.
+ *
+ * @param topics The topics to describe.
+ * @param options The options to use when describing the topics.
+ * @param subscriber The subscriber to consumer the results.
+ */
+ default void describeTopics(
+ TopicCollection topics,
+ DescribeTopicsOptions options,
+ AdminResultsSubscriber<DescribeTopicPartitionsResult> subscriber) {
+ };
Review Comment:
Just so I understand, is the default of a no-op intentional?
##########
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##########
@@ -30,6 +30,7 @@
public class DescribeTopicsOptions extends
AbstractOptions<DescribeTopicsOptions> {
private boolean includeAuthorizedOperations;
+ private int partitionSizeLimitPerResponse;
Review Comment:
Is there an upper limit (besides Integer.MAX_SIZE 😏)?
##########
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##########
@@ -367,6 +372,14 @@ public void printDescription() {
.map(node -> node.toString())
.collect(Collectors.joining(",")));
}
+
+ System.out.print("\tElr: " + info.eligibleLeaderReplicas().stream()
Review Comment:
```suggestion
System.out.print("\tELRs: " +
info.eligibleLeaderReplicas().stream()
```
##########
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##########
@@ -367,6 +372,14 @@ public void printDescription() {
.map(node -> node.toString())
.collect(Collectors.joining(",")));
}
+
+ System.out.print("\tElr: " + info.eligibleLeaderReplicas().stream()
+ .map(node -> Integer.toString(node.id()))
+ .collect(Collectors.joining(",")));
+ System.out.print("\tLastKnownElr: " +
info.lastKnownEligibleLeaderReplicas().stream()
Review Comment:
```suggestion
System.out.print("\tLast Known ELRs: " +
info.lastKnownEligibleLeaderReplicas().stream()
```
##########
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##########
@@ -335,6 +335,21 @@ default DescribeTopicsResult
describeTopics(TopicCollection topics) {
*/
DescribeTopicsResult describeTopics(TopicCollection topics,
DescribeTopicsOptions options);
+ /**
+ * Describe some topics in the cluster.
+ *
+ * When using topic IDs, this operation is supported by brokers with
version 3.1.0 or higher.
Review Comment:
What is the behavior when running sub-3.1.0?
##########
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicPartitionsResult.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.protocol.Errors;
+
+import java.util.Collections;
+
+public class DescribeTopicPartitionsResult {
+ final public TopicDescription topicDescription;
+ final public Exception exception;
Review Comment:
Have we considered using `Optional<Exception>` to lessen the chance of
spurious NPEs?
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -994,6 +1002,37 @@ public boolean isInternal() {
}
}
+ abstract class RecurringCall {
+ private final String name;
+ final long deadlineMs;
+ private final AdminClientRunnable runnable;
+ KafkaFutureImpl<Boolean> nextRun;
+ abstract Call generateCall();
+
+ public RecurringCall(String name, long deadlineMs, AdminClientRunnable
runnable) {
+ this.name = name;
+ this.deadlineMs = deadlineMs;
+ this.runnable = runnable;
+ }
+
+ public String toString() {
+ return "RecurringCall(name=" + name + ", deadlineMs=" + deadlineMs
+ ")";
+ }
+
+ public void run() {
+ try {
+ do {
+ nextRun = new KafkaFutureImpl<>();
+ Call call = generateCall();
+ runnable.call(call, time.milliseconds());
+ } while (nextRun.get());
+ } catch (Exception e) {
+ log.info("Stop the recurring call " + name + " because " + e);
+ e.printStackTrace();
Review Comment:
```suggestion
log.info("Stopping the recurring call " + name + " due to an
error", e);
```
Feel free to change the wording, but we probably want the stack trace to go
through the logging subsystem.
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2276,6 +2337,141 @@ private Node leader(PartitionInfo partitionInfo) {
return partitionInfo.leader();
}
+ @Override
+ public void describeTopics(
+ TopicCollection topics,
+ DescribeTopicsOptions options,
+ AdminResultsSubscriber<DescribeTopicPartitionsResult> subscriber) {
+ if (topics instanceof TopicIdCollection) {
+ subscriber.onError(
+ new IllegalArgumentException("Currently the describeTopics
subscription mode does not support topic IDs.")
+ );
+ return;
+ }
+ if (!(topics instanceof TopicNameCollection)) {
Review Comment:
```suggestion
} else if (!(topics instanceof TopicNameCollection)) {
```
?
##########
clients/src/main/java/org/apache/kafka/clients/admin/AdminResultsSubscriber.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+/**
+ * A subscriber interface for querying API results with pagination.
+ */
+public interface AdminResultsSubscriber<T> {
+
+ // Being called when there is no more subscribed content.
+ void onComplete();
+
+ // Being called when the publisher hits unrecoverable errors.
+ void onError(Exception e);
+
+ // The publisher feeds the next result.
+ void onNext(T result);
+
+ // Initiate the subscriber.
+ void run();
Review Comment:
Is this implying it's a separate thread of execution or something?
##########
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##########
@@ -335,6 +335,21 @@ default DescribeTopicsResult
describeTopics(TopicCollection topics) {
*/
DescribeTopicsResult describeTopics(TopicCollection topics,
DescribeTopicsOptions options);
+ /**
+ * Describe some topics in the cluster.
+ *
+ * When using topic IDs, this operation is supported by brokers with
version 3.1.0 or higher.
+ *
+ * @param topics The topics to describe.
+ * @param options The options to use when describing the topics.
+ * @param subscriber The subscriber to consumer the results.
+ */
+ default void describeTopics(
+ TopicCollection topics,
+ DescribeTopicsOptions options,
+ AdminResultsSubscriber<DescribeTopicPartitionsResult> subscriber) {
+ };
Review Comment:
```suggestion
}
```
##########
clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java:
##########
@@ -79,9 +95,24 @@ public List<Node> isr() {
return isr;
}
+ /**
+ * Return the eligible leader replicas of the partition. Note that the
ordering of the result is unspecified.
+ */
+ public List<Node> eligibleLeaderReplicas() {
+ return elr;
+ }
+
+ /**
+ * Return the last known eligible leader replicas of the partition. Note
that the ordering of the result is unspecified.
+ */
+ public List<Node> lastKnownEligibleLeaderReplicas() {
+ return lastKnownElr;
+ }
+
public String toString() {
return "(partition=" + partition + ", leader=" + leader + ",
replicas=" +
- Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") +
")";
+ Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") +
+ ", elr=" +Utils.join(elr, ", ") + ", lastKnownElr=" +
Utils.join(lastKnownElr, ", ") + ")";
Review Comment:
Just the nitiest of picks...
```suggestion
", elr=" + Utils.join(elr, ", ") + ", lastKnownElr=" +
Utils.join(lastKnownElr, ", ") + ")";
```
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2276,6 +2337,141 @@ private Node leader(PartitionInfo partitionInfo) {
return partitionInfo.leader();
}
+ @Override
+ public void describeTopics(
+ TopicCollection topics,
+ DescribeTopicsOptions options,
+ AdminResultsSubscriber<DescribeTopicPartitionsResult> subscriber) {
+ if (topics instanceof TopicIdCollection) {
+ subscriber.onError(
+ new IllegalArgumentException("Currently the describeTopics
subscription mode does not support topic IDs.")
+ );
+ return;
+ }
+ if (!(topics instanceof TopicNameCollection)) {
+ subscriber.onError(
+ new IllegalArgumentException("The TopicCollection: " + topics
+ " provided did not match any supported classes for describeTopics.")
+ );
+ return;
+ }
+
+ TreeSet<String> topicNames = new TreeSet<>();
+ ((TopicNameCollection) topics).topicNames().forEach(topicName -> {
+ if (topicNameIsUnrepresentable(topicName)) {
+
subscriber.onNext(DescribeTopicPartitionsResult.onError(topicName, new
InvalidTopicException("The given topic name '" +
+ topicName + "' cannot be represented in a request.")));
+ } else {
+ topicNames.add(topicName);
+ }
+ });
+
+ RecurringCall call = new RecurringCall(
+ "DescribeTopics-Recurring",
+ calcDeadlineMs(time.milliseconds(), options.timeoutMs()),
+ runnable
+ ) {
+
+ Map<String, TopicRequest> pendingTopics = new TreeMap<>();
+ Iterator<String> pendingTopicIterator = topicNames.iterator();
+
+ String partiallyFinishedTopicName = "";
+ int partiallyFinishedTopicNextPartitionId = -1;
+
+ @Override
+ Call generateCall() {
+ return new Call("describeTopics", this.deadlineMs, new
LeastLoadedNodeProvider()) {
+ @Override
+ DescribeTopicPartitionsRequest.Builder createRequest(int
timeoutMs) {
+ DescribeTopicPartitionsRequestData request = new
DescribeTopicPartitionsRequestData()
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+
+ if (!partiallyFinishedTopicName.isEmpty()) {
+ request.setCursor(new
DescribeTopicPartitionsRequestData.Cursor()
+ .setTopicName(partiallyFinishedTopicName)
+
.setPartitionIndex(partiallyFinishedTopicNextPartitionId)
+ );
+ }
+
+ for (int ii = pendingTopics.size(); ii <
options.partitionSizeLimitPerResponse() && pendingTopicIterator.hasNext();
++ii) {
+ String topicName = pendingTopicIterator.next();
+ pendingTopics.put(topicName, new
TopicRequest().setName(topicName));
+ }
+
request.setTopics(pendingTopics.values().stream().collect(Collectors.toList()));
Review Comment:
This is a hint more concise, but arguably just preference:
```suggestion
request.setTopics(new
ArrayList<>pendingTopics.values());
```
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2276,6 +2337,141 @@ private Node leader(PartitionInfo partitionInfo) {
return partitionInfo.leader();
}
+ @Override
+ public void describeTopics(
+ TopicCollection topics,
+ DescribeTopicsOptions options,
+ AdminResultsSubscriber<DescribeTopicPartitionsResult> subscriber) {
Review Comment:
```suggestion
public void describeTopics(
TopicNameCollection topics,
DescribeTopicsOptions options,
AdminResultsSubscriber<DescribeTopicPartitionsResult> subscriber) {
```
This makes more sense in that `TopicNameCollection` is really all we support
for this release. We could later generalize the API method signature to accept
`TopicIdCollection` once support is added, and it shouldn't break any existing
usage.
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2276,6 +2337,141 @@ private Node leader(PartitionInfo partitionInfo) {
return partitionInfo.leader();
}
+ @Override
+ public void describeTopics(
+ TopicCollection topics,
+ DescribeTopicsOptions options,
+ AdminResultsSubscriber<DescribeTopicPartitionsResult> subscriber) {
+ if (topics instanceof TopicIdCollection) {
+ subscriber.onError(
+ new IllegalArgumentException("Currently the describeTopics
subscription mode does not support topic IDs.")
+ );
+ return;
+ }
+ if (!(topics instanceof TopicNameCollection)) {
+ subscriber.onError(
+ new IllegalArgumentException("The TopicCollection: " + topics
+ " provided did not match any supported classes for describeTopics.")
+ );
+ return;
+ }
+
+ TreeSet<String> topicNames = new TreeSet<>();
+ ((TopicNameCollection) topics).topicNames().forEach(topicName -> {
+ if (topicNameIsUnrepresentable(topicName)) {
+
subscriber.onNext(DescribeTopicPartitionsResult.onError(topicName, new
InvalidTopicException("The given topic name '" +
+ topicName + "' cannot be represented in a request.")));
+ } else {
+ topicNames.add(topicName);
+ }
+ });
+
+ RecurringCall call = new RecurringCall(
+ "DescribeTopics-Recurring",
+ calcDeadlineMs(time.milliseconds(), options.timeoutMs()),
+ runnable
+ ) {
+
+ Map<String, TopicRequest> pendingTopics = new TreeMap<>();
+ Iterator<String> pendingTopicIterator = topicNames.iterator();
+
+ String partiallyFinishedTopicName = "";
+ int partiallyFinishedTopicNextPartitionId = -1;
Review Comment:
Could we replace this with a DescribeTopicPartitionsRequestData.Cursor
instead of the two separate variables? It makes its purpose a bit more clear
and we don't have to be worried about them getting out of sync.
```suggestion
DescribeTopicPartitionsRequestData.Cursor requestCursor = null;
```
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2276,6 +2337,141 @@ private Node leader(PartitionInfo partitionInfo) {
return partitionInfo.leader();
}
+ @Override
+ public void describeTopics(
+ TopicCollection topics,
+ DescribeTopicsOptions options,
+ AdminResultsSubscriber<DescribeTopicPartitionsResult> subscriber) {
+ if (topics instanceof TopicIdCollection) {
+ subscriber.onError(
+ new IllegalArgumentException("Currently the describeTopics
subscription mode does not support topic IDs.")
+ );
+ return;
+ }
+ if (!(topics instanceof TopicNameCollection)) {
+ subscriber.onError(
+ new IllegalArgumentException("The TopicCollection: " + topics
+ " provided did not match any supported classes for describeTopics.")
+ );
+ return;
+ }
+
+ TreeSet<String> topicNames = new TreeSet<>();
+ ((TopicNameCollection) topics).topicNames().forEach(topicName -> {
+ if (topicNameIsUnrepresentable(topicName)) {
+
subscriber.onNext(DescribeTopicPartitionsResult.onError(topicName, new
InvalidTopicException("The given topic name '" +
+ topicName + "' cannot be represented in a request.")));
+ } else {
+ topicNames.add(topicName);
+ }
+ });
+
+ RecurringCall call = new RecurringCall(
+ "DescribeTopics-Recurring",
+ calcDeadlineMs(time.milliseconds(), options.timeoutMs()),
+ runnable
+ ) {
+
+ Map<String, TopicRequest> pendingTopics = new TreeMap<>();
+ Iterator<String> pendingTopicIterator = topicNames.iterator();
+
+ String partiallyFinishedTopicName = "";
+ int partiallyFinishedTopicNextPartitionId = -1;
+
+ @Override
+ Call generateCall() {
+ return new Call("describeTopics", this.deadlineMs, new
LeastLoadedNodeProvider()) {
+ @Override
+ DescribeTopicPartitionsRequest.Builder createRequest(int
timeoutMs) {
+ DescribeTopicPartitionsRequestData request = new
DescribeTopicPartitionsRequestData()
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+
+ if (!partiallyFinishedTopicName.isEmpty()) {
+ request.setCursor(new
DescribeTopicPartitionsRequestData.Cursor()
+ .setTopicName(partiallyFinishedTopicName)
+
.setPartitionIndex(partiallyFinishedTopicNextPartitionId)
+ );
Review Comment:
```suggestion
request.setCursor(requestCursor);
```
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2108,9 +2147,9 @@ void handleFailure(Throwable throwable) {
public DescribeTopicsResult describeTopics(final TopicCollection topics,
DescribeTopicsOptions options) {
if (topics instanceof TopicIdCollection)
return
DescribeTopicsResult.ofTopicIds(handleDescribeTopicsByIds(((TopicIdCollection)
topics).topicIds(), options));
- else if (topics instanceof TopicNameCollection)
+ else if (topics instanceof TopicNameCollection) {
return
DescribeTopicsResult.ofTopicNames(handleDescribeTopicsByNames(((TopicNameCollection)
topics).topicNames(), options));
- else
+ } else
Review Comment:
Does including the brackets help here?
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2276,6 +2337,141 @@ private Node leader(PartitionInfo partitionInfo) {
return partitionInfo.leader();
}
+ @Override
+ public void describeTopics(
+ TopicCollection topics,
+ DescribeTopicsOptions options,
+ AdminResultsSubscriber<DescribeTopicPartitionsResult> subscriber) {
+ if (topics instanceof TopicIdCollection) {
+ subscriber.onError(
+ new IllegalArgumentException("Currently the describeTopics
subscription mode does not support topic IDs.")
+ );
+ return;
+ }
+ if (!(topics instanceof TopicNameCollection)) {
+ subscriber.onError(
+ new IllegalArgumentException("The TopicCollection: " + topics
+ " provided did not match any supported classes for describeTopics.")
+ );
+ return;
+ }
+
+ TreeSet<String> topicNames = new TreeSet<>();
+ ((TopicNameCollection) topics).topicNames().forEach(topicName -> {
+ if (topicNameIsUnrepresentable(topicName)) {
+
subscriber.onNext(DescribeTopicPartitionsResult.onError(topicName, new
InvalidTopicException("The given topic name '" +
+ topicName + "' cannot be represented in a request.")));
+ } else {
+ topicNames.add(topicName);
+ }
+ });
+
+ RecurringCall call = new RecurringCall(
+ "DescribeTopics-Recurring",
+ calcDeadlineMs(time.milliseconds(), options.timeoutMs()),
+ runnable
+ ) {
+
+ Map<String, TopicRequest> pendingTopics = new TreeMap<>();
+ Iterator<String> pendingTopicIterator = topicNames.iterator();
+
+ String partiallyFinishedTopicName = "";
+ int partiallyFinishedTopicNextPartitionId = -1;
+
+ @Override
+ Call generateCall() {
+ return new Call("describeTopics", this.deadlineMs, new
LeastLoadedNodeProvider()) {
+ @Override
+ DescribeTopicPartitionsRequest.Builder createRequest(int
timeoutMs) {
+ DescribeTopicPartitionsRequestData request = new
DescribeTopicPartitionsRequestData()
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+
+ if (!partiallyFinishedTopicName.isEmpty()) {
+ request.setCursor(new
DescribeTopicPartitionsRequestData.Cursor()
+ .setTopicName(partiallyFinishedTopicName)
+
.setPartitionIndex(partiallyFinishedTopicNextPartitionId)
+ );
+ }
+
+ for (int ii = pendingTopics.size(); ii <
options.partitionSizeLimitPerResponse() && pendingTopicIterator.hasNext();
++ii) {
+ String topicName = pendingTopicIterator.next();
+ pendingTopics.put(topicName, new
TopicRequest().setName(topicName));
+ }
+
request.setTopics(pendingTopics.values().stream().collect(Collectors.toList()));
+
+ return new
DescribeTopicPartitionsRequest.Builder(request);
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ DescribeTopicPartitionsResponse response =
(DescribeTopicPartitionsResponse) abstractResponse;
+ String cursorTopicName = "";
+ int cursorPartitionId = -1;
Review Comment:
Same idea as above, but with the response cursor:
```suggestion
DescribeTopicPartitionsResponseData.Cursor
responseCursor = null;
```
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2276,6 +2337,141 @@ private Node leader(PartitionInfo partitionInfo) {
return partitionInfo.leader();
}
+ @Override
+ public void describeTopics(
+ TopicCollection topics,
+ DescribeTopicsOptions options,
+ AdminResultsSubscriber<DescribeTopicPartitionsResult> subscriber) {
+ if (topics instanceof TopicIdCollection) {
+ subscriber.onError(
+ new IllegalArgumentException("Currently the describeTopics
subscription mode does not support topic IDs.")
+ );
+ return;
+ }
+ if (!(topics instanceof TopicNameCollection)) {
+ subscriber.onError(
+ new IllegalArgumentException("The TopicCollection: " + topics
+ " provided did not match any supported classes for describeTopics.")
+ );
+ return;
+ }
+
+ TreeSet<String> topicNames = new TreeSet<>();
+ ((TopicNameCollection) topics).topicNames().forEach(topicName -> {
+ if (topicNameIsUnrepresentable(topicName)) {
+
subscriber.onNext(DescribeTopicPartitionsResult.onError(topicName, new
InvalidTopicException("The given topic name '" +
+ topicName + "' cannot be represented in a request.")));
+ } else {
+ topicNames.add(topicName);
+ }
+ });
+
+ RecurringCall call = new RecurringCall(
+ "DescribeTopics-Recurring",
+ calcDeadlineMs(time.milliseconds(), options.timeoutMs()),
+ runnable
+ ) {
+
+ Map<String, TopicRequest> pendingTopics = new TreeMap<>();
+ Iterator<String> pendingTopicIterator = topicNames.iterator();
+
+ String partiallyFinishedTopicName = "";
+ int partiallyFinishedTopicNextPartitionId = -1;
+
+ @Override
+ Call generateCall() {
+ return new Call("describeTopics", this.deadlineMs, new
LeastLoadedNodeProvider()) {
+ @Override
+ DescribeTopicPartitionsRequest.Builder createRequest(int
timeoutMs) {
+ DescribeTopicPartitionsRequestData request = new
DescribeTopicPartitionsRequestData()
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+
+ if (!partiallyFinishedTopicName.isEmpty()) {
+ request.setCursor(new
DescribeTopicPartitionsRequestData.Cursor()
+ .setTopicName(partiallyFinishedTopicName)
+
.setPartitionIndex(partiallyFinishedTopicNextPartitionId)
+ );
+ }
+
+ for (int ii = pendingTopics.size(); ii <
options.partitionSizeLimitPerResponse() && pendingTopicIterator.hasNext();
++ii) {
+ String topicName = pendingTopicIterator.next();
+ pendingTopics.put(topicName, new
TopicRequest().setName(topicName));
+ }
+
request.setTopics(pendingTopics.values().stream().collect(Collectors.toList()));
+
+ return new
DescribeTopicPartitionsRequest.Builder(request);
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ DescribeTopicPartitionsResponse response =
(DescribeTopicPartitionsResponse) abstractResponse;
+ String cursorTopicName = "";
+ int cursorPartitionId = -1;
+ if (response.data().nextCursor() != null) {
+ DescribeTopicPartitionsResponseData.Cursor cursor
= response.data().nextCursor();
+ cursorTopicName = cursor.topicName();
+ cursorPartitionId = cursor.partitionIndex();
+ }
+
+ for (DescribeTopicPartitionsResponseTopic topic :
response.data().topics()) {
+ String topicName = topic.name();
+ Errors error = Errors.forCode(topic.errorCode());
+
+ if (error != Errors.NONE) {
+
subscriber.onNext(DescribeTopicPartitionsResult.onError(topicName,
error.exception()));
+ if (cursorTopicName.equals(topicName)) {
+ cursorTopicName = "";
+ cursorPartitionId = -1;
Review Comment:
```suggestion
responseCursor = null;
```
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2276,6 +2337,141 @@ private Node leader(PartitionInfo partitionInfo) {
return partitionInfo.leader();
}
+ @Override
+ public void describeTopics(
+ TopicCollection topics,
+ DescribeTopicsOptions options,
+ AdminResultsSubscriber<DescribeTopicPartitionsResult> subscriber) {
+ if (topics instanceof TopicIdCollection) {
+ subscriber.onError(
+ new IllegalArgumentException("Currently the describeTopics
subscription mode does not support topic IDs.")
+ );
+ return;
+ }
+ if (!(topics instanceof TopicNameCollection)) {
+ subscriber.onError(
+ new IllegalArgumentException("The TopicCollection: " + topics
+ " provided did not match any supported classes for describeTopics.")
+ );
+ return;
+ }
+
+ TreeSet<String> topicNames = new TreeSet<>();
+ ((TopicNameCollection) topics).topicNames().forEach(topicName -> {
+ if (topicNameIsUnrepresentable(topicName)) {
+
subscriber.onNext(DescribeTopicPartitionsResult.onError(topicName, new
InvalidTopicException("The given topic name '" +
+ topicName + "' cannot be represented in a request.")));
+ } else {
+ topicNames.add(topicName);
+ }
+ });
+
+ RecurringCall call = new RecurringCall(
+ "DescribeTopics-Recurring",
+ calcDeadlineMs(time.milliseconds(), options.timeoutMs()),
+ runnable
+ ) {
+
+ Map<String, TopicRequest> pendingTopics = new TreeMap<>();
+ Iterator<String> pendingTopicIterator = topicNames.iterator();
+
+ String partiallyFinishedTopicName = "";
+ int partiallyFinishedTopicNextPartitionId = -1;
+
+ @Override
+ Call generateCall() {
+ return new Call("describeTopics", this.deadlineMs, new
LeastLoadedNodeProvider()) {
+ @Override
+ DescribeTopicPartitionsRequest.Builder createRequest(int
timeoutMs) {
+ DescribeTopicPartitionsRequestData request = new
DescribeTopicPartitionsRequestData()
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+
+ if (!partiallyFinishedTopicName.isEmpty()) {
+ request.setCursor(new
DescribeTopicPartitionsRequestData.Cursor()
+ .setTopicName(partiallyFinishedTopicName)
+
.setPartitionIndex(partiallyFinishedTopicNextPartitionId)
+ );
+ }
+
+ for (int ii = pendingTopics.size(); ii <
options.partitionSizeLimitPerResponse() && pendingTopicIterator.hasNext();
++ii) {
+ String topicName = pendingTopicIterator.next();
+ pendingTopics.put(topicName, new
TopicRequest().setName(topicName));
+ }
+
request.setTopics(pendingTopics.values().stream().collect(Collectors.toList()));
+
+ return new
DescribeTopicPartitionsRequest.Builder(request);
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ DescribeTopicPartitionsResponse response =
(DescribeTopicPartitionsResponse) abstractResponse;
+ String cursorTopicName = "";
+ int cursorPartitionId = -1;
+ if (response.data().nextCursor() != null) {
+ DescribeTopicPartitionsResponseData.Cursor cursor
= response.data().nextCursor();
+ cursorTopicName = cursor.topicName();
+ cursorPartitionId = cursor.partitionIndex();
Review Comment:
```suggestion
responseCursor = response.data().nextCursor();
```
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -994,6 +1002,37 @@ public boolean isInternal() {
}
}
+ abstract class RecurringCall {
+ private final String name;
+ final long deadlineMs;
+ private final AdminClientRunnable runnable;
Review Comment:
Does `runnable` need to be passed in? Can't `runnable` be accessed directly
since `RecurringCall` is a non-static inner class of `KafkaAdminClient`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]