frankvicky commented on code in PR #20461:
URL: https://github.com/apache/kafka/pull/20461#discussion_r2322628958
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -3108,34 +3109,54 @@ public DescribeLogDirsRequest.Builder createRequest(int
timeoutMs) {
@Override
public void handleResponse(AbstractResponse abstractResponse) {
DescribeLogDirsResponse response =
(DescribeLogDirsResponse) abstractResponse;
+
+ Set<TopicPartition> pendingPartitions = new
HashSet<>(replicaDirInfoByPartition.keySet());
+ Map<String, Throwable> directoryFailures = new HashMap<>();
+
for (Map.Entry<String, LogDirDescription> responseEntry :
logDirDescriptions(response).entrySet()) {
String logDir = responseEntry.getKey();
LogDirDescription logDirInfo =
responseEntry.getValue();
// No replica info will be provided if the log
directory is offline
if (logDirInfo.error() instanceof
KafkaStorageException)
continue;
- if (logDirInfo.error() != null)
- handleFailure(new IllegalStateException(
- "The error " +
logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in
the response from broker " + brokerId + " is illegal"));
-
- for (Map.Entry<TopicPartition, ReplicaInfo>
replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) {
- TopicPartition tp = replicaInfoEntry.getKey();
- ReplicaInfo replicaInfo =
replicaInfoEntry.getValue();
- ReplicaLogDirInfo replicaLogDirInfo =
replicaDirInfoByPartition.get(tp);
- if (replicaLogDirInfo == null) {
- log.warn("Server response from broker {}
mentioned unknown partition {}", brokerId, tp);
- } else if (replicaInfo.isFuture()) {
- replicaDirInfoByPartition.put(tp, new
ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(),
-
replicaLogDirInfo.getCurrentReplicaOffsetLag(),
- logDir,
- replicaInfo.offsetLag()));
- } else {
- replicaDirInfoByPartition.put(tp, new
ReplicaLogDirInfo(logDir,
- replicaInfo.offsetLag(),
- replicaLogDirInfo.getFutureReplicaLogDir(),
-
replicaLogDirInfo.getFutureReplicaOffsetLag()));
+ if (logDirInfo.error() instanceof
ClusterAuthorizationException)
+ handleFailure(logDirInfo.error());
+
+ if (logDirInfo.error() == null) {
+ for (Map.Entry<TopicPartition, ReplicaInfo>
replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) {
+ TopicPartition tp = replicaInfoEntry.getKey();
+ ReplicaInfo replicaInfo =
replicaInfoEntry.getValue();
+ ReplicaLogDirInfo replicaLogDirInfo =
replicaDirInfoByPartition.get(tp);
+ if (replicaLogDirInfo == null) {
+ log.warn("Server response from broker {}
mentioned unknown partition {}", brokerId, tp);
+ } else if (replicaInfo.isFuture()) {
+ replicaDirInfoByPartition.put(tp, new
ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(),
+
replicaLogDirInfo.getCurrentReplicaOffsetLag(),
+ logDir,
+ replicaInfo.offsetLag()));
+ } else {
+ replicaDirInfoByPartition.put(tp, new
ReplicaLogDirInfo(logDir,
+ replicaInfo.offsetLag(),
+
replicaLogDirInfo.getFutureReplicaLogDir(),
+
replicaLogDirInfo.getFutureReplicaOffsetLag()));
+ }
+ pendingPartitions.remove(tp);
}
+ } else {
+ directoryFailures.put(logDir, logDirInfo.error());
+ }
+ }
+
+ if (!pendingPartitions.isEmpty() &&
!directoryFailures.isEmpty()) {
+ ArrayList<String> errorAtDir = new ArrayList<>();
Review Comment:
nit
```suggestion
List<String> errorAtDir = new ArrayList<>();
```
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -3108,34 +3109,54 @@ public DescribeLogDirsRequest.Builder createRequest(int
timeoutMs) {
@Override
public void handleResponse(AbstractResponse abstractResponse) {
DescribeLogDirsResponse response =
(DescribeLogDirsResponse) abstractResponse;
+
+ Set<TopicPartition> pendingPartitions = new
HashSet<>(replicaDirInfoByPartition.keySet());
+ Map<String, Throwable> directoryFailures = new HashMap<>();
+
for (Map.Entry<String, LogDirDescription> responseEntry :
logDirDescriptions(response).entrySet()) {
String logDir = responseEntry.getKey();
LogDirDescription logDirInfo =
responseEntry.getValue();
// No replica info will be provided if the log
directory is offline
if (logDirInfo.error() instanceof
KafkaStorageException)
continue;
- if (logDirInfo.error() != null)
- handleFailure(new IllegalStateException(
- "The error " +
logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in
the response from broker " + brokerId + " is illegal"));
-
- for (Map.Entry<TopicPartition, ReplicaInfo>
replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) {
- TopicPartition tp = replicaInfoEntry.getKey();
- ReplicaInfo replicaInfo =
replicaInfoEntry.getValue();
- ReplicaLogDirInfo replicaLogDirInfo =
replicaDirInfoByPartition.get(tp);
- if (replicaLogDirInfo == null) {
- log.warn("Server response from broker {}
mentioned unknown partition {}", brokerId, tp);
- } else if (replicaInfo.isFuture()) {
- replicaDirInfoByPartition.put(tp, new
ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(),
-
replicaLogDirInfo.getCurrentReplicaOffsetLag(),
- logDir,
- replicaInfo.offsetLag()));
- } else {
- replicaDirInfoByPartition.put(tp, new
ReplicaLogDirInfo(logDir,
- replicaInfo.offsetLag(),
- replicaLogDirInfo.getFutureReplicaLogDir(),
-
replicaLogDirInfo.getFutureReplicaOffsetLag()));
+ if (logDirInfo.error() instanceof
ClusterAuthorizationException)
+ handleFailure(logDirInfo.error());
+
+ if (logDirInfo.error() == null) {
+ for (Map.Entry<TopicPartition, ReplicaInfo>
replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) {
+ TopicPartition tp = replicaInfoEntry.getKey();
+ ReplicaInfo replicaInfo =
replicaInfoEntry.getValue();
+ ReplicaLogDirInfo replicaLogDirInfo =
replicaDirInfoByPartition.get(tp);
+ if (replicaLogDirInfo == null) {
+ log.warn("Server response from broker {}
mentioned unknown partition {}", brokerId, tp);
+ } else if (replicaInfo.isFuture()) {
+ replicaDirInfoByPartition.put(tp, new
ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(),
+
replicaLogDirInfo.getCurrentReplicaOffsetLag(),
+ logDir,
+ replicaInfo.offsetLag()));
+ } else {
+ replicaDirInfoByPartition.put(tp, new
ReplicaLogDirInfo(logDir,
+ replicaInfo.offsetLag(),
+
replicaLogDirInfo.getFutureReplicaLogDir(),
+
replicaLogDirInfo.getFutureReplicaOffsetLag()));
+ }
+ pendingPartitions.remove(tp);
}
+ } else {
+ directoryFailures.put(logDir, logDirInfo.error());
+ }
+ }
+
+ if (!pendingPartitions.isEmpty() &&
!directoryFailures.isEmpty()) {
+ ArrayList<String> errorAtDir = new ArrayList<>();
+ for (Map.Entry<String, Throwable> entry :
directoryFailures.entrySet()) {
+
errorAtDir.add(entry.getValue().getClass().getName() + " at " + entry.getKey());
+ }
Review Comment:
nit:
```suggestion
directoryFailures.forEach((k, v) ->
errorAtDir.add(v.getClass().getName() + " at " + k));
```
##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -2517,6 +2517,55 @@ public void testDescribeReplicaLogDirsUnexpected()
throws ExecutionException, In
}
}
+ @Test
+ public void testDescribeReplicaLogDirsWithAuthorizationException() throws
ExecutionException, InterruptedException {
+ TopicPartitionReplica tpr = new TopicPartitionReplica("topic", 12, 1);
+
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ String broker1log0 = "/var/data/kafka0";
+ env.kafkaClient().prepareResponseFrom(
+
prepareDescribeLogDirsResponse(Errors.CLUSTER_AUTHORIZATION_FAILED,
broker1log0),
+ env.cluster().nodeById(tpr.brokerId()));
+
+ DescribeReplicaLogDirsResult result =
env.adminClient().describeReplicaLogDirs(singletonList(tpr));
+ Map<TopicPartitionReplica,
KafkaFuture<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> values =
result.values();
+
+ Throwable e = assertThrows(Exception.class, () ->
values.get(tpr).get());
+ assertInstanceOf(ClusterAuthorizationException.class,
e.getCause());
Review Comment:
Please use `TestUtils#assertFutureThrows` instead.
##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -2517,6 +2517,55 @@ public void testDescribeReplicaLogDirsUnexpected()
throws ExecutionException, In
}
}
+ @Test
+ public void testDescribeReplicaLogDirsWithAuthorizationException() throws
ExecutionException, InterruptedException {
+ TopicPartitionReplica tpr = new TopicPartitionReplica("topic", 12, 1);
+
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ String broker1log0 = "/var/data/kafka0";
+ env.kafkaClient().prepareResponseFrom(
+
prepareDescribeLogDirsResponse(Errors.CLUSTER_AUTHORIZATION_FAILED,
broker1log0),
+ env.cluster().nodeById(tpr.brokerId()));
+
+ DescribeReplicaLogDirsResult result =
env.adminClient().describeReplicaLogDirs(singletonList(tpr));
+ Map<TopicPartitionReplica,
KafkaFuture<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> values =
result.values();
+
+ Throwable e = assertThrows(Exception.class, () ->
values.get(tpr).get());
+ assertInstanceOf(ClusterAuthorizationException.class,
e.getCause());
+ }
+
+ }
+
+ @Test
+ public void testDescribeReplicaLogDirsWithSingleDirException() throws
ExecutionException, InterruptedException {
+ int brokerId = 1;
+ TopicPartitionReplica successfulTpr = new
TopicPartitionReplica("topic", 12, brokerId);
+ TopicPartitionReplica failedTpr = new TopicPartitionReplica("failed",
12, brokerId);
+
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ String broker1log0 = "/var/data/kafka0";
+ String broker1log1 = "/var/data/kafka1";
+ int broker1Log0PartitionSize = 987654321;
+ int broker1Log0OffsetLag = 24;
+
+ DescribeLogDirsResponseData.DescribeLogDirsResult successfulResult
= prepareDescribeLogDirsResult(
+ successfulTpr, broker1log0, broker1Log0PartitionSize,
broker1Log0OffsetLag, false);
+ DescribeLogDirsResponseData.DescribeLogDirsResult failedResult =
new DescribeLogDirsResponseData.DescribeLogDirsResult()
+ .setErrorCode(Errors.LOG_DIR_NOT_FOUND.code())
+ .setLogDir(broker1log1);
+ DescribeLogDirsResponse response = new DescribeLogDirsResponse(new
DescribeLogDirsResponseData().setResults(asList(successfulResult,
failedResult)));
+ env.kafkaClient().prepareResponseFrom(response,
env.cluster().nodeById(successfulTpr.brokerId()));
+
+ DescribeReplicaLogDirsResult result =
env.adminClient().describeReplicaLogDirs(asList(successfulTpr, failedTpr));
Review Comment:
List.of
##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -2517,6 +2517,55 @@ public void testDescribeReplicaLogDirsUnexpected()
throws ExecutionException, In
}
}
+ @Test
+ public void testDescribeReplicaLogDirsWithAuthorizationException() throws
ExecutionException, InterruptedException {
+ TopicPartitionReplica tpr = new TopicPartitionReplica("topic", 12, 1);
+
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ String broker1log0 = "/var/data/kafka0";
+ env.kafkaClient().prepareResponseFrom(
+
prepareDescribeLogDirsResponse(Errors.CLUSTER_AUTHORIZATION_FAILED,
broker1log0),
+ env.cluster().nodeById(tpr.brokerId()));
+
+ DescribeReplicaLogDirsResult result =
env.adminClient().describeReplicaLogDirs(singletonList(tpr));
+ Map<TopicPartitionReplica,
KafkaFuture<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> values =
result.values();
+
+ Throwable e = assertThrows(Exception.class, () ->
values.get(tpr).get());
+ assertInstanceOf(ClusterAuthorizationException.class,
e.getCause());
+ }
+
+ }
+
+ @Test
+ public void testDescribeReplicaLogDirsWithSingleDirException() throws
ExecutionException, InterruptedException {
+ int brokerId = 1;
+ TopicPartitionReplica successfulTpr = new
TopicPartitionReplica("topic", 12, brokerId);
+ TopicPartitionReplica failedTpr = new TopicPartitionReplica("failed",
12, brokerId);
+
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ String broker1log0 = "/var/data/kafka0";
+ String broker1log1 = "/var/data/kafka1";
+ int broker1Log0PartitionSize = 987654321;
+ int broker1Log0OffsetLag = 24;
+
+ DescribeLogDirsResponseData.DescribeLogDirsResult successfulResult
= prepareDescribeLogDirsResult(
+ successfulTpr, broker1log0, broker1Log0PartitionSize,
broker1Log0OffsetLag, false);
+ DescribeLogDirsResponseData.DescribeLogDirsResult failedResult =
new DescribeLogDirsResponseData.DescribeLogDirsResult()
+ .setErrorCode(Errors.LOG_DIR_NOT_FOUND.code())
+ .setLogDir(broker1log1);
+ DescribeLogDirsResponse response = new DescribeLogDirsResponse(new
DescribeLogDirsResponseData().setResults(asList(successfulResult,
failedResult)));
+ env.kafkaClient().prepareResponseFrom(response,
env.cluster().nodeById(successfulTpr.brokerId()));
+
+ DescribeReplicaLogDirsResult result =
env.adminClient().describeReplicaLogDirs(asList(successfulTpr, failedTpr));
+ Map<TopicPartitionReplica,
KafkaFuture<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> values =
result.values();
+
+ assertNotNull(values.get(successfulTpr).get());
+ assertThrows(Exception.class, () -> values.get(failedTpr).get());
Review Comment:
Do you think we should assert the class of this exception?
##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -2517,6 +2517,55 @@ public void testDescribeReplicaLogDirsUnexpected()
throws ExecutionException, In
}
}
+ @Test
+ public void testDescribeReplicaLogDirsWithAuthorizationException() throws
ExecutionException, InterruptedException {
+ TopicPartitionReplica tpr = new TopicPartitionReplica("topic", 12, 1);
+
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ String broker1log0 = "/var/data/kafka0";
+ env.kafkaClient().prepareResponseFrom(
+
prepareDescribeLogDirsResponse(Errors.CLUSTER_AUTHORIZATION_FAILED,
broker1log0),
+ env.cluster().nodeById(tpr.brokerId()));
+
+ DescribeReplicaLogDirsResult result =
env.adminClient().describeReplicaLogDirs(singletonList(tpr));
Review Comment:
List.of
##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -2517,6 +2517,55 @@ public void testDescribeReplicaLogDirsUnexpected()
throws ExecutionException, In
}
}
+ @Test
+ public void testDescribeReplicaLogDirsWithAuthorizationException() throws
ExecutionException, InterruptedException {
+ TopicPartitionReplica tpr = new TopicPartitionReplica("topic", 12, 1);
+
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ String broker1log0 = "/var/data/kafka0";
+ env.kafkaClient().prepareResponseFrom(
+
prepareDescribeLogDirsResponse(Errors.CLUSTER_AUTHORIZATION_FAILED,
broker1log0),
+ env.cluster().nodeById(tpr.brokerId()));
+
+ DescribeReplicaLogDirsResult result =
env.adminClient().describeReplicaLogDirs(singletonList(tpr));
+ Map<TopicPartitionReplica,
KafkaFuture<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> values =
result.values();
+
+ Throwable e = assertThrows(Exception.class, () ->
values.get(tpr).get());
+ assertInstanceOf(ClusterAuthorizationException.class,
e.getCause());
+ }
+
+ }
+
+ @Test
+ public void testDescribeReplicaLogDirsWithSingleDirException() throws
ExecutionException, InterruptedException {
+ int brokerId = 1;
+ TopicPartitionReplica successfulTpr = new
TopicPartitionReplica("topic", 12, brokerId);
+ TopicPartitionReplica failedTpr = new TopicPartitionReplica("failed",
12, brokerId);
+
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ String broker1log0 = "/var/data/kafka0";
+ String broker1log1 = "/var/data/kafka1";
+ int broker1Log0PartitionSize = 987654321;
+ int broker1Log0OffsetLag = 24;
+
+ DescribeLogDirsResponseData.DescribeLogDirsResult successfulResult
= prepareDescribeLogDirsResult(
+ successfulTpr, broker1log0, broker1Log0PartitionSize,
broker1Log0OffsetLag, false);
+ DescribeLogDirsResponseData.DescribeLogDirsResult failedResult =
new DescribeLogDirsResponseData.DescribeLogDirsResult()
+ .setErrorCode(Errors.LOG_DIR_NOT_FOUND.code())
+ .setLogDir(broker1log1);
+ DescribeLogDirsResponse response = new DescribeLogDirsResponse(new
DescribeLogDirsResponseData().setResults(asList(successfulResult,
failedResult)));
Review Comment:
List.of
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]