[
https://issues.apache.org/jira/browse/KAFKA-18999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17936182#comment-17936182
]
Chia-Ping Tsai commented on KAFKA-18999:
----------------------------------------
[~ijuma] you are right that using `entry`/`map` make the code less readable. We
can use `UsableBroker` to replace `BrokerMetadata` as
`ReassignPartitionsCommand` eventually convert `BrokerMetadata` to
`UsableBroker`. for example:
{code:java}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
index 33bf23d13d..4fa0fc5285 100644
---
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
@@ -569,8 +569,8 @@ public class ReassignPartitionsCommand {
List<String> topicsToReassign = t0.getValue();
Map<TopicPartition, List<Integer>> currentAssignments =
getReplicaAssignmentForTopics(adminClient, topicsToReassign);
- List<BrokerMetadata> brokerMetadatas = getBrokerMetadata(adminClient,
brokersToReassign, enableRackAwareness);
- Map<TopicPartition, List<Integer>> proposedAssignments =
calculateAssignment(currentAssignments, brokerMetadatas);
+ List<UsableBroker> usableBrokers = getBrokerMetadata(adminClient,
brokersToReassign, enableRackAwareness);
+ Map<TopicPartition, List<Integer>> proposedAssignments =
calculateAssignment(currentAssignments, usableBrokers);
System.out.printf("Current partition replica assignment%n%s%n%n",
formatAsReassignmentJson(currentAssignments,
Collections.emptyMap()));
System.out.printf("Proposed partition reassignment
configuration%n%s%n",
@@ -582,12 +582,12 @@ public class ReassignPartitionsCommand {
* Calculate the new partition assignments to suggest in --generate.
*
* @param currentAssignment The current partition assignments.
- * @param brokerMetadatas The rack information for each broker.
+ * @param usableBrokers The rack information for each broker.
*
* @return A map from partitions to the proposed
assignments for each.
*/
private static Map<TopicPartition, List<Integer>>
calculateAssignment(Map<TopicPartition, List<Integer>> currentAssignment,
-
List<BrokerMetadata> brokerMetadatas) {
+
List<UsableBroker> usableBrokers) {
Map<String, List<Entry<TopicPartition, List<Integer>>>> groupedByTopic
= new HashMap<>();
for (Entry<TopicPartition, List<Integer>> e :
currentAssignment.entrySet())
groupedByTopic.computeIfAbsent(e.getKey().topic(), k -> new
ArrayList<>()).add(e);
@@ -601,11 +601,7 @@ public class ReassignPartitionsCommand {
new ClusterDescriber() {
@Override
public Iterator<UsableBroker> usableBrokers() {
- return brokerMetadatas.stream().map(brokerMetadata
-> new UsableBroker(
- brokerMetadata.id,
- brokerMetadata.rack,
- false
- )).iterator();
+ return usableBrokers.iterator();
}
@Override
@@ -701,16 +697,16 @@ public class ReassignPartitionsCommand {
* @return The metadata for each broker that was found.
* Brokers that were not found will be omitted.
*/
- static List<BrokerMetadata> getBrokerMetadata(Admin adminClient,
List<Integer> brokers, boolean enableRackAwareness) throws ExecutionException,
InterruptedException {
+ static List<UsableBroker> getBrokerMetadata(Admin adminClient,
List<Integer> brokers, boolean enableRackAwareness) throws ExecutionException,
InterruptedException {
Set<Integer> brokerSet = new HashSet<>(brokers);
- List<BrokerMetadata> results =
adminClient.describeCluster().nodes().get().stream()
+ List<UsableBroker> results =
adminClient.describeCluster().nodes().get().stream()
.filter(node -> brokerSet.contains(node.id()))
.map(node -> (enableRackAwareness && node.rack() != null)
- ? new BrokerMetadata(node.id(), Optional.of(node.rack()))
- : new BrokerMetadata(node.id(), Optional.empty())
+ ? new UsableBroker(node.id(), Optional.of(node.rack()), false)
+ : new UsableBroker(node.id(), Optional.empty(), false)
).collect(Collectors.toList());
- long numRackless = results.stream().filter(m ->
m.rack.isEmpty()).count();
+ long numRackless = results.stream().filter(m ->
m.rack().isEmpty()).count();
if (enableRackAwareness && numRackless != 0 && numRackless !=
results.size()) {
throw new AdminOperationException("Not all brokers have rack
information. Add " +
"--disable-rack-aware in command line to make replica
assignment without rack " + {code}
> Remove BrokerMetadata
> ---------------------
>
> Key: KAFKA-18999
> URL: https://issues.apache.org/jira/browse/KAFKA-18999
> Project: Kafka
> Issue Type: Improvement
> Reporter: Chia-Ping Tsai
> Assignee: PoAn Yang
> Priority: Major
>
> After KAFKA-17565 gets merged, It will be a temporary data structure used by
> `ReassignPartitionsCommand`, so we can replace `BrokerMetadata` by either a
> map or List<Entry>
>
> ref:
> https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/admin/BrokerMetadata.java
--
This message was sent by Atlassian Jira
(v8.20.10#820010)