chia7712 commented on code in PR #15928:
URL: https://github.com/apache/kafka/pull/15928#discussion_r1597930023
##########
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##########
@@ -111,6 +111,9 @@ public List<Extension> getAdditionalExtensions() {
cluster.format();
if (clusterConfig.isAutoStart()) {
cluster.startup();
+ if(!clusterInstance.started.compareAndSet(false,true)){
Review Comment:
Could you please do a bit refactor for it? It seems to me those code used to
"start" a cluster should be moved to `RaftClusterInstance#start`. Otherwise,
`RaftClusterInstance#start` is unused and not working because it does not
format the storage.
In short, the impl of this extension should look like following code.
```java
@Override
public List<Extension> getAdditionalExtensions() {
RaftClusterInstance clusterInstance = new
RaftClusterInstance(clusterConfig, isCombined);
return Arrays.asList(
(BeforeTestExecutionCallback) context -> {
if (clusterConfig.isAutoStart()) {
clusterInstance.start();
}
},
(AfterTestExecutionCallback) context -> clusterInstance.stop(),
new ClusterInstanceParameterResolver(clusterInstance)
);
}
```
Also, `RaftClusterInstance#start` could be:
```java
@Override
public void start() {
if (started.compareAndSet(false, true)) {
try {
TestKitNodes nodes = new TestKitNodes.Builder().
setBootstrapMetadataVersion(clusterConfig.metadataVersion()).
setCombined(isCombined).
setNumBrokerNodes(clusterConfig.numBrokers()).
setPerServerProperties(clusterConfig.perServerOverrideProperties()).
setNumDisksPerBroker(clusterConfig.numDisksPerBroker()).
setNumControllerNodes(clusterConfig.numControllers()).build();
KafkaClusterTestKit.Builder builder = new
KafkaClusterTestKit.Builder(nodes);
if
(Boolean.parseBoolean(clusterConfig.serverProperties().getOrDefault("zookeeper.metadata.migration.enable",
"false"))) {
embeddedZookeeper = new EmbeddedZookeeper();
builder.setConfigProp("zookeeper.connect",
String.format("localhost:%d", embeddedZookeeper.port()));
}
// Copy properties into the TestKit builder
clusterConfig.serverProperties().forEach(builder::setConfigProp);
// KAFKA-12512 need to pass security protocol and
listener name here
testKit = builder.build();
testKit.format();
testKit.startup();
kafka.utils.TestUtils.waitUntilTrue(
() -> testKit.brokers().get(0).brokerState() ==
BrokerState.RUNNING,
() -> "Broker never made it to RUNNING state.",
org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS,
100L);
} catch (Exception e) {
throw new RuntimeException("Failed to start Raft
server", e);
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]