akhileshchg commented on code in PR #21654:
URL: https://github.com/apache/kafka/pull/21654#discussion_r2898472500


##########
tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java:
##########
@@ -696,6 +717,82 @@ public void testModifyLogDirThrottle() throws Exception {
         }
     }
 
+    /**
+     * Test that modifyInterBrokerThrottle only applies throttle to live 
brokers.
+     * When broker 3 is down (not in describeCluster), it should be skipped to 
avoid TimeoutException.
+     */
+    @Test
+    public void testModifyInterBrokerThrottleSkipsDownBrokers() throws 
Exception {
+        try (MockAdminClient adminClient = new 
MockAdminClient.Builder().numBrokers(3).build()) {
+            modifyInterBrokerThrottle(adminClient, Set.of(0, 1, 2, 3), 1000);
+
+            List<ConfigResource> brokers = new ArrayList<>();
+            for (int i = 0; i < 3; i++)
+                brokers.add(new ConfigResource(ConfigResource.Type.BROKER, 
Integer.toString(i)));
+            Map<ConfigResource, Config> results = 
adminClient.describeConfigs(brokers).all().get();
+            verifyBrokerThrottleResults(results.get(brokers.get(0)), 1000, -1);
+            verifyBrokerThrottleResults(results.get(brokers.get(1)), 1000, -1);
+            verifyBrokerThrottleResults(results.get(brokers.get(2)), 1000, -1);
+        }
+    }
+
+    /**
+     * Test that modifyLogDirThrottle only applies throttle to live brokers.
+     * When broker 3 is down (not in describeCluster), it should be skipped to 
avoid TimeoutException.
+     */
+    @Test
+    public void testModifyLogDirThrottleSkipsDownBrokers() throws Exception {
+        try (MockAdminClient adminClient = new 
MockAdminClient.Builder().numBrokers(3).build()) {
+            modifyLogDirThrottle(adminClient, Set.of(0, 1, 2, 3), 2000);
+
+            List<ConfigResource> brokers = new ArrayList<>();
+            for (int i = 0; i < 3; i++)
+                brokers.add(new ConfigResource(ConfigResource.Type.BROKER, 
Integer.toString(i)));
+            Map<ConfigResource, Config> results = 
adminClient.describeConfigs(brokers).all().get();
+            verifyBrokerThrottleResults(results.get(brokers.get(0)), -1, 2000);
+            verifyBrokerThrottleResults(results.get(brokers.get(1)), -1, 2000);
+            verifyBrokerThrottleResults(results.get(brokers.get(2)), -1, 2000);
+        }
+    }
+
+    /**
+     * Test that executeAssignment with throttle succeeds when some brokers 
are down.
+     * Uses MockAdminClient with 3 brokers; reassignment only involves live 
brokers (0, 1, 2).
+     * The throttle is applied only to live brokers via 
modifyInterBrokerThrottle/modifyLogDirThrottle.
+     */
+    @Test
+    public void testExecuteAssignmentWithThrottleWhenBrokerDown() throws 
Exception {
+        try (MockAdminClient adminClient = new 
MockAdminClient.Builder().numBrokers(3).build()) {
+            addTopicsForThreeBrokers(adminClient);
+            String assignment = "{\"version\":1,\"partitions\":" +
+                
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,2],\"log_dirs\":[\"any\",\"any\",\"any\"]},"
 +
+                
"{\"topic\":\"foo\",\"partition\":1,\"replicas\":[1,2,0],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+                "]}";
+            executeAssignment(adminClient, false, assignment, 1000L, 2000L, 
10000L, Time.SYSTEM, false);
+
+            List<ConfigResource> brokers = new ArrayList<>();
+            for (int i = 0; i < 3; i++) {
+                brokers.add(new ConfigResource(ConfigResource.Type.BROKER, 
Integer.toString(i)));
+            }
+            Map<ConfigResource, Config> results = 
adminClient.describeConfigs(brokers).all().get();
+            // Only inter-broker throttle is applied; log-dir throttle is not 
set since log_dirs are all "any"
+            verifyBrokerThrottleResults(results.get(brokers.get(0)), 1000, -1);
+            verifyBrokerThrottleResults(results.get(brokers.get(1)), 1000, -1);
+            verifyBrokerThrottleResults(results.get(brokers.get(2)), 1000, -1);
+        }
+    }
+
+    /**
+     * Test that modifyInterBrokerThrottle succeeds when all reassigning 
brokers are down.
+     * Should return early without calling incrementalAlterConfigs.
+     */
+    @Test
+    public void testModifyInterBrokerThrottleWhenAllBrokersDown() throws 
Exception {

Review Comment:
   Can we actually verify if incrementAlterConfigs was not called in the test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to