chia7712 commented on code in PR #20199:
URL: https://github.com/apache/kafka/pull/20199#discussion_r2261155159
##########
test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java:
##########
@@ -447,4 +447,15 @@ default int
waitUntilLeaderIsElectedOrChangedWithAdmin(Admin admin,
throw new AssertionError("Timing out after " + timeoutMs +
" ms since a leader was not elected for partition " +
topicPartition);
}
+
+ default void restartDeadBrokers() throws ExecutionException {
Review Comment:
`ExecutionException` is unnecessary
##########
server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java:
##########
@@ -180,35 +155,31 @@ public void
testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc
producer.send(new ProducerRecord<>(testTopicName, "0", "0")).get();
waitUntilOneMessageIsConsumed(consumer);
- killBroker(initialReplicas.get(0).id());
- killBroker(initialReplicas.get(1).id());
+ clusterInstance.shutdownBroker(initialReplicas.get(0).id());
+ clusterInstance.shutdownBroker(initialReplicas.get(1).id());
- waitForIsrAndElr((isrSize, elrSize) -> {
- return isrSize == 2 && elrSize == 1;
- });
+ waitForIsrAndElr((isrSize, elrSize) -> isrSize == 2 && elrSize ==
1);
// Now the partition is under min ISR. HWM should not advance.
producer.send(new ProducerRecord<>(testTopicName, "1", "1")).get();
Thread.sleep(100);
Review Comment:
Please use `TimeUnit` instead
##########
server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java:
##########
@@ -432,16 +383,16 @@ public void
testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep
return false;
}
},
- () -> String.format("Partition metadata for %s is not
correct", testTopicName),
- DEFAULT_MAX_WAIT_MS, 100L
+ DEFAULT_MAX_WAIT_MS,
+ () -> String.format("Partition metadata for %s is not
correct", testTopicName)
);
} finally {
- restartDeadBrokers(false);
+ clusterInstance.restartDeadBrokers();
Review Comment:
This helper method is very simple, so perhaps we could move it into the test
class
##########
test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java:
##########
@@ -447,4 +447,15 @@ default int
waitUntilLeaderIsElectedOrChangedWithAdmin(Admin admin,
throw new AssertionError("Timing out after " + timeoutMs +
" ms since a leader was not elected for partition " +
topicPartition);
}
+
+ default void restartDeadBrokers() throws ExecutionException {
+ if (brokers().isEmpty()) {
+ throw new RuntimeException("Must supply at least one server
config.");
+ }
+ brokers().entrySet().forEach(entry -> {
Review Comment:
```java
brokers().forEach((key, value) -> {
if (value.isShutdown()) value.startup();
});
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]