[ 
https://issues.apache.org/jira/browse/KAFKA-18999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17936182#comment-17936182
 ] 

Chia-Ping Tsai commented on KAFKA-18999:
----------------------------------------

[~ijuma] you are right that using `entry`/`map` make the code less readable. We 
can use `UsableBroker` to replace `BrokerMetadata` as 
`ReassignPartitionsCommand` eventually convert `BrokerMetadata` to 
`UsableBroker`. for example:
{code:java}
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
index 33bf23d13d..4fa0fc5285 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
@@ -569,8 +569,8 @@ public class ReassignPartitionsCommand {
         List<String> topicsToReassign = t0.getValue();
 
         Map<TopicPartition, List<Integer>> currentAssignments = 
getReplicaAssignmentForTopics(adminClient, topicsToReassign);
-        List<BrokerMetadata> brokerMetadatas = getBrokerMetadata(adminClient, 
brokersToReassign, enableRackAwareness);
-        Map<TopicPartition, List<Integer>> proposedAssignments = 
calculateAssignment(currentAssignments, brokerMetadatas);
+        List<UsableBroker> usableBrokers = getBrokerMetadata(adminClient, 
brokersToReassign, enableRackAwareness);
+        Map<TopicPartition, List<Integer>> proposedAssignments = 
calculateAssignment(currentAssignments, usableBrokers);
         System.out.printf("Current partition replica assignment%n%s%n%n",
             formatAsReassignmentJson(currentAssignments, 
Collections.emptyMap()));
         System.out.printf("Proposed partition reassignment 
configuration%n%s%n",
@@ -582,12 +582,12 @@ public class ReassignPartitionsCommand {
      * Calculate the new partition assignments to suggest in --generate.
      *
      * @param currentAssignment  The current partition assignments.
-     * @param brokerMetadatas    The rack information for each broker.
+     * @param usableBrokers    The rack information for each broker.
      *
      * @return                   A map from partitions to the proposed 
assignments for each.
      */
     private static Map<TopicPartition, List<Integer>> 
calculateAssignment(Map<TopicPartition, List<Integer>> currentAssignment,
-                                                                          
List<BrokerMetadata> brokerMetadatas) {
+                                                                          
List<UsableBroker> usableBrokers) {
         Map<String, List<Entry<TopicPartition, List<Integer>>>> groupedByTopic 
= new HashMap<>();
         for (Entry<TopicPartition, List<Integer>> e : 
currentAssignment.entrySet())
             groupedByTopic.computeIfAbsent(e.getKey().topic(), k -> new 
ArrayList<>()).add(e);
@@ -601,11 +601,7 @@ public class ReassignPartitionsCommand {
                     new ClusterDescriber() {
                         @Override
                         public Iterator<UsableBroker> usableBrokers() {
-                            return brokerMetadatas.stream().map(brokerMetadata 
-> new UsableBroker(
-                                    brokerMetadata.id,
-                                    brokerMetadata.rack,
-                                    false
-                            )).iterator();
+                            return usableBrokers.iterator();
                         }
 
                         @Override
@@ -701,16 +697,16 @@ public class ReassignPartitionsCommand {
      * @return                    The metadata for each broker that was found.
      *                            Brokers that were not found will be omitted.
      */
-    static List<BrokerMetadata> getBrokerMetadata(Admin adminClient, 
List<Integer> brokers, boolean enableRackAwareness) throws ExecutionException, 
InterruptedException {
+    static List<UsableBroker> getBrokerMetadata(Admin adminClient, 
List<Integer> brokers, boolean enableRackAwareness) throws ExecutionException, 
InterruptedException {
         Set<Integer> brokerSet = new HashSet<>(brokers);
-        List<BrokerMetadata> results = 
adminClient.describeCluster().nodes().get().stream()
+        List<UsableBroker> results = 
adminClient.describeCluster().nodes().get().stream()
             .filter(node -> brokerSet.contains(node.id()))
             .map(node -> (enableRackAwareness && node.rack() != null)
-                ? new BrokerMetadata(node.id(), Optional.of(node.rack()))
-                : new BrokerMetadata(node.id(), Optional.empty())
+                ? new UsableBroker(node.id(), Optional.of(node.rack()), false)
+                : new UsableBroker(node.id(), Optional.empty(), false)
             ).collect(Collectors.toList());
 
-        long numRackless = results.stream().filter(m -> 
m.rack.isEmpty()).count();
+        long numRackless = results.stream().filter(m -> 
m.rack().isEmpty()).count();
         if (enableRackAwareness && numRackless != 0 && numRackless != 
results.size()) {
             throw new AdminOperationException("Not all brokers have rack 
information. Add " +
                 "--disable-rack-aware in command line to make replica 
assignment without rack " + {code}

> Remove BrokerMetadata
> ---------------------
>
>                 Key: KAFKA-18999
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18999
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Chia-Ping Tsai
>            Assignee: PoAn Yang
>            Priority: Major
>
> After KAFKA-17565 gets merged, It will be a temporary data structure used by  
> `ReassignPartitionsCommand`, so we can replace `BrokerMetadata` by either a 
> map or List<Entry>
>  
> ref: 
> https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/admin/BrokerMetadata.java



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to