jack2012aa commented on code in PR #20461:
URL: https://github.com/apache/kafka/pull/20461#discussion_r2323355531
##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -2517,6 +2517,55 @@ public void testDescribeReplicaLogDirsUnexpected()
throws ExecutionException, In
}
}
+ @Test
+ public void testDescribeReplicaLogDirsWithAuthorizationException() throws
ExecutionException, InterruptedException {
+ TopicPartitionReplica tpr = new TopicPartitionReplica("topic", 12, 1);
+
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ String broker1log0 = "/var/data/kafka0";
+ env.kafkaClient().prepareResponseFrom(
+
prepareDescribeLogDirsResponse(Errors.CLUSTER_AUTHORIZATION_FAILED,
broker1log0),
+ env.cluster().nodeById(tpr.brokerId()));
+
+ DescribeReplicaLogDirsResult result =
env.adminClient().describeReplicaLogDirs(singletonList(tpr));
+ Map<TopicPartitionReplica,
KafkaFuture<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> values =
result.values();
+
+ Throwable e = assertThrows(Exception.class, () ->
values.get(tpr).get());
+ assertInstanceOf(ClusterAuthorizationException.class,
e.getCause());
+ }
+
+ }
+
+ @Test
+ public void testDescribeReplicaLogDirsWithSingleDirException() throws
ExecutionException, InterruptedException {
+ int brokerId = 1;
+ TopicPartitionReplica successfulTpr = new
TopicPartitionReplica("topic", 12, brokerId);
+ TopicPartitionReplica failedTpr = new TopicPartitionReplica("failed",
12, brokerId);
+
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ String broker1log0 = "/var/data/kafka0";
+ String broker1log1 = "/var/data/kafka1";
+ int broker1Log0PartitionSize = 987654321;
+ int broker1Log0OffsetLag = 24;
+
+ DescribeLogDirsResponseData.DescribeLogDirsResult successfulResult
= prepareDescribeLogDirsResult(
+ successfulTpr, broker1log0, broker1Log0PartitionSize,
broker1Log0OffsetLag, false);
+ DescribeLogDirsResponseData.DescribeLogDirsResult failedResult =
new DescribeLogDirsResponseData.DescribeLogDirsResult()
+ .setErrorCode(Errors.LOG_DIR_NOT_FOUND.code())
+ .setLogDir(broker1log1);
+ DescribeLogDirsResponse response = new DescribeLogDirsResponse(new
DescribeLogDirsResponseData().setResults(asList(successfulResult,
failedResult)));
+ env.kafkaClient().prepareResponseFrom(response,
env.cluster().nodeById(successfulTpr.brokerId()));
+
+ DescribeReplicaLogDirsResult result =
env.adminClient().describeReplicaLogDirs(asList(successfulTpr, failedTpr));
+ Map<TopicPartitionReplica,
KafkaFuture<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> values =
result.values();
+
+ assertNotNull(values.get(successfulTpr).get());
+ assertThrows(Exception.class, () -> values.get(failedTpr).get());
Review Comment:
Yes, an assertion on the exception type and could better ensure that the
future ends with the expected reason. They are included in the new commit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]