chia7712 commented on code in PR #15946:
URL: https://github.com/apache/kafka/pull/15946#discussion_r1600992789
##########
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##########
@@ -66,96 +65,66 @@ public class RaftClusterInvocationContext implements
TestTemplateInvocationConte
private final String baseDisplayName;
private final ClusterConfig clusterConfig;
- private final AtomicReference<KafkaClusterTestKit> clusterReference;
- private final AtomicReference<EmbeddedZookeeper> zkReference;
private final boolean isCombined;
public RaftClusterInvocationContext(String baseDisplayName, ClusterConfig
clusterConfig, boolean isCombined) {
this.baseDisplayName = baseDisplayName;
this.clusterConfig = clusterConfig;
- this.clusterReference = new AtomicReference<>();
- this.zkReference = new AtomicReference<>();
this.isCombined = isCombined;
}
@Override
public String getDisplayName(int invocationIndex) {
String clusterDesc = clusterConfig.nameTags().entrySet().stream()
- .map(Object::toString)
- .collect(Collectors.joining(", "));
+ .map(Object::toString)
Review Comment:
please avoid those unrelated changes. smaller is better
##########
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##########
@@ -252,7 +220,13 @@ public Admin createAdminClient(Properties configOverrides)
{
public void start() {
Review Comment:
in this method we should always call `format` first. That is a big sugar to
users
##########
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##########
@@ -171,39 +140,39 @@ public Optional<ListenerName> controllerListenerName() {
@Override
public Collection<SocketServer> controllerSocketServers() {
return controllers()
- .map(ControllerServer::socketServer)
- .collect(Collectors.toList());
+ .map(ControllerServer::socketServer)
+ .collect(Collectors.toList());
}
@Override
public SocketServer anyBrokerSocketServer() {
return brokers()
- .map(BrokerServer::socketServer)
- .findFirst()
- .orElseThrow(() -> new RuntimeException("No broker
SocketServers found"));
+ .map(BrokerServer::socketServer)
Review Comment:
ditto. please revert those changes.
##########
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##########
@@ -284,24 +258,51 @@ public void startBroker(int brokerId) {
@Override
public void waitForReadyBrokers() throws InterruptedException {
try {
- clusterReference.get().waitForReadyBrokers();
+ clusterTestKit.waitForReadyBrokers();
} catch (ExecutionException e) {
throw new AssertionError("Failed while waiting for brokers to
become ready", e);
}
}
- private BrokerServer findBrokerOrThrow(int brokerId) {
- return
Optional.ofNullable(clusterReference.get().brokers().get(brokerId))
- .orElseThrow(() -> new IllegalArgumentException("Unknown
brokerId " + brokerId));
- }
-
public Stream<BrokerServer> brokers() {
- return clusterReference.get().brokers().values().stream();
+ return clusterTestKit.brokers().values().stream();
}
public Stream<ControllerServer> controllers() {
- return clusterReference.get().controllers().values().stream();
+ return clusterTestKit.controllers().values().stream();
}
+ public void format() throws Exception {
Review Comment:
`format` and `buildAndFormatCluster` can be merged. for example:
```java
public void format() {
if (this.clusterTestKit == null) {
try {
KafkaClusterTestKit.Builder builder = new
KafkaClusterTestKit.Builder(new TestKitNodes.Builder()
.setBootstrapMetadataVersion(clusterConfig.metadataVersion())
.setCombined(isCombined)
.setNumBrokerNodes(clusterConfig.numBrokers())
.setNumDisksPerBroker(clusterConfig.numDisksPerBroker())
.setPerServerProperties(clusterConfig.perServerOverrideProperties())
.setNumControllerNodes(clusterConfig.numControllers()).build());
if (Boolean.parseBoolean(clusterConfig.serverProperties()
.getOrDefault("zookeeper.metadata.migration.enable",
"false"))) {
this.embeddedZookeeper = new EmbeddedZookeeper();
builder.setConfigProp("zookeeper.connect",
String.format("localhost:%d", embeddedZookeeper.port()));
}
// Copy properties into the TestKit builder
clusterConfig.serverProperties().forEach(builder::setConfigProp);
// KAFKA-12512 need to pass security protocol and
listener name here
this.clusterTestKit = builder.build();
this.clusterTestKit.format();
} catch (Exception e) {
throw new RuntimeException("Failed to format Raft
server", e);
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]