chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1576492165
##########
core/src/test/java/kafka/testkit/BrokerNode.java:
##########
@@ -66,17 +69,30 @@ public Builder setNumLogDirectories(int numLogDirectories) {
return this;
}
- public BrokerNode build(
- String baseDirectory,
- Uuid clusterId,
- boolean combined
- ) {
+ public Builder setClusterId(Uuid clusterId) {
+ this.clusterId = clusterId;
+ return this;
+ }
+
+ public Builder setBaseDirectory(String baseDirectory) {
+ this.baseDirectory = baseDirectory;
+ return this;
+ }
+
+ public Builder setCombined(boolean combined) {
+ this.combined = combined;
+ return this;
+ }
+
+ public Builder setPropertyOverrides(Map<String, String>
propertyOverrides) {
+ this.propertyOverrides = Collections.unmodifiableMap(new
HashMap<>(propertyOverrides));
+ return this;
+ }
+
+ public BrokerNode build() {
if (id == -1) {
throw new RuntimeException("You must set the node id.");
}
- if (incarnationId == null) {
- incarnationId = Uuid.randomUuid();
- }
List<String> logDataDirectories = IntStream
Review Comment:
Could you add null check for `baseDirectory`?
##########
core/src/test/java/kafka/testkit/TestKitNodes.java:
##########
@@ -59,73 +65,64 @@ public Builder setCombined(boolean combined) {
}
public Builder setNumControllerNodes(int numControllerNodes) {
- if (numControllerNodes < 0) {
- throw new RuntimeException("Invalid negative value for
numControllerNodes");
- }
-
- while (controllerNodeBuilders.size() > numControllerNodes) {
- controllerNodeBuilders.pollFirstEntry();
- }
- while (controllerNodeBuilders.size() < numControllerNodes) {
- int nextId = startControllerId();
- if (!controllerNodeBuilders.isEmpty()) {
- nextId = controllerNodeBuilders.lastKey() + 1;
- }
- controllerNodeBuilders.put(nextId,
- new ControllerNode.Builder().
- setId(nextId));
- }
+ this.numControllerNodes = numControllerNodes;
return this;
}
public Builder setNumBrokerNodes(int numBrokerNodes) {
- return setBrokerNodes(numBrokerNodes, 1);
+ this.numBrokerNodes = numBrokerNodes;
+ return this;
+ }
+
+ public Builder setNumDisksPerBroker(int numDisksPerBroker) {
+ this.numDisksPerBroker = numDisksPerBroker;
+ return this;
+ }
+
+ public Builder setPerBrokerProperties(Map<Integer, Map<String,
String>> perBrokerProperties) {
+ this.perBrokerProperties = Collections.unmodifiableMap(
+ perBrokerProperties.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
Collections.unmodifiableMap(new HashMap<>(e.getValue())))));
+ return this;
}
- public Builder setBrokerNodes(int numBrokerNodes, int disksPerBroker) {
+ public TestKitNodes build() {
+ if (numControllerNodes < 0) {
+ throw new RuntimeException("Invalid negative value for
numControllerNodes");
+ }
if (numBrokerNodes < 0) {
throw new RuntimeException("Invalid negative value for
numBrokerNodes");
}
- if (disksPerBroker <= 0) {
- throw new RuntimeException("Invalid value for disksPerBroker");
- }
- while (brokerNodeBuilders.size() > numBrokerNodes) {
- brokerNodeBuilders.pollFirstEntry();
+ if (numDisksPerBroker <= 0) {
+ throw new RuntimeException("Invalid value for
numDisksPerBroker");
}
- while (brokerNodeBuilders.size() < numBrokerNodes) {
- int nextId = startBrokerId();
- if (!brokerNodeBuilders.isEmpty()) {
- nextId = brokerNodeBuilders.lastKey() + 1;
- }
- BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder()
- .setId(nextId)
- .setNumLogDirectories(disksPerBroker);
- brokerNodeBuilders.put(nextId, brokerNodeBuilder);
- }
- return this;
- }
- public TestKitNodes build() {
String baseDirectory = TestUtils.tempDirectory().getAbsolutePath();
Review Comment:
We don't need to delete `baseDirectory` since `TestUtils.tempDirectory()`
will delete the return folder when terminating.
##########
core/src/test/java/kafka/testkit/TestKitNodes.java:
##########
@@ -167,11 +164,11 @@ private TestKitNodes(
NavigableMap<Integer, ControllerNode> controllerNodes,
NavigableMap<Integer, BrokerNode> brokerNodes
) {
- this.baseDirectory = baseDirectory;
- this.clusterId = clusterId;
- this.bootstrapMetadata = bootstrapMetadata;
- this.controllerNodes = controllerNodes;
- this.brokerNodes = brokerNodes;
+ this.baseDirectory = Objects.requireNonNull(baseDirectory);
+ this.clusterId = Objects.requireNonNull(clusterId);
+ this.bootstrapMetadata = Objects.requireNonNull(bootstrapMetadata);
+ this.controllerNodes = new
TreeMap<>(Objects.requireNonNull(controllerNodes));
Review Comment:
please set immutable wrapper
##########
core/src/test/java/kafka/testkit/TestKitNodes.java:
##########
@@ -167,11 +164,11 @@ private TestKitNodes(
NavigableMap<Integer, ControllerNode> controllerNodes,
NavigableMap<Integer, BrokerNode> brokerNodes
) {
- this.baseDirectory = baseDirectory;
- this.clusterId = clusterId;
- this.bootstrapMetadata = bootstrapMetadata;
- this.controllerNodes = controllerNodes;
- this.brokerNodes = brokerNodes;
+ this.baseDirectory = Objects.requireNonNull(baseDirectory);
+ this.clusterId = Objects.requireNonNull(clusterId);
+ this.bootstrapMetadata = Objects.requireNonNull(bootstrapMetadata);
+ this.controllerNodes = new
TreeMap<>(Objects.requireNonNull(controllerNodes));
+ this.brokerNodes = new TreeMap<>(Objects.requireNonNull(brokerNodes));
Review Comment:
ditto
##########
core/src/test/java/kafka/testkit/TestKitNodes.java:
##########
@@ -167,11 +164,11 @@ private TestKitNodes(
NavigableMap<Integer, ControllerNode> controllerNodes,
NavigableMap<Integer, BrokerNode> brokerNodes
Review Comment:
We should use `Map` by default. please check the usage.
##########
core/src/test/java/kafka/testkit/TestKitNodes.java:
##########
@@ -59,73 +65,64 @@ public Builder setCombined(boolean combined) {
}
public Builder setNumControllerNodes(int numControllerNodes) {
- if (numControllerNodes < 0) {
- throw new RuntimeException("Invalid negative value for
numControllerNodes");
- }
-
- while (controllerNodeBuilders.size() > numControllerNodes) {
- controllerNodeBuilders.pollFirstEntry();
- }
- while (controllerNodeBuilders.size() < numControllerNodes) {
- int nextId = startControllerId();
- if (!controllerNodeBuilders.isEmpty()) {
- nextId = controllerNodeBuilders.lastKey() + 1;
- }
- controllerNodeBuilders.put(nextId,
- new ControllerNode.Builder().
- setId(nextId));
- }
+ this.numControllerNodes = numControllerNodes;
return this;
}
public Builder setNumBrokerNodes(int numBrokerNodes) {
- return setBrokerNodes(numBrokerNodes, 1);
+ this.numBrokerNodes = numBrokerNodes;
+ return this;
+ }
+
+ public Builder setNumDisksPerBroker(int numDisksPerBroker) {
+ this.numDisksPerBroker = numDisksPerBroker;
+ return this;
+ }
+
+ public Builder setPerBrokerProperties(Map<Integer, Map<String,
String>> perBrokerProperties) {
+ this.perBrokerProperties = Collections.unmodifiableMap(
+ perBrokerProperties.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
Collections.unmodifiableMap(new HashMap<>(e.getValue())))));
+ return this;
}
- public Builder setBrokerNodes(int numBrokerNodes, int disksPerBroker) {
+ public TestKitNodes build() {
+ if (numControllerNodes < 0) {
+ throw new RuntimeException("Invalid negative value for
numControllerNodes");
+ }
if (numBrokerNodes < 0) {
throw new RuntimeException("Invalid negative value for
numBrokerNodes");
}
- if (disksPerBroker <= 0) {
- throw new RuntimeException("Invalid value for disksPerBroker");
- }
- while (brokerNodeBuilders.size() > numBrokerNodes) {
- brokerNodeBuilders.pollFirstEntry();
+ if (numDisksPerBroker <= 0) {
+ throw new RuntimeException("Invalid value for
numDisksPerBroker");
}
- while (brokerNodeBuilders.size() < numBrokerNodes) {
- int nextId = startBrokerId();
- if (!brokerNodeBuilders.isEmpty()) {
- nextId = brokerNodeBuilders.lastKey() + 1;
- }
- BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder()
- .setId(nextId)
- .setNumLogDirectories(disksPerBroker);
- brokerNodeBuilders.put(nextId, brokerNodeBuilder);
- }
- return this;
- }
- public TestKitNodes build() {
String baseDirectory = TestUtils.tempDirectory().getAbsolutePath();
try {
if (clusterId == null) {
clusterId = Uuid.randomUuid();
}
TreeMap<Integer, ControllerNode> controllerNodes = new
TreeMap<>();
- for (ControllerNode.Builder builder :
controllerNodeBuilders.values()) {
- ControllerNode node = builder.
- build(baseDirectory, clusterId,
brokerNodeBuilders.containsKey(builder.id()));
- if (controllerNodes.put(node.id(), node) != null) {
- throw new RuntimeException("Duplicate builder for
controller " + node.id());
- }
+ for (int id = startControllerId(); id < startControllerId() +
numControllerNodes; id++) {
+ ControllerNode node = ControllerNode.builder()
+ .setId(id)
+ .setBaseDirectory(baseDirectory)
+ .setClusterId(clusterId)
+ .setCombined(combined)
+ .build();
+ controllerNodes.put(node.id(), node);
}
TreeMap<Integer, BrokerNode> brokerNodes = new TreeMap<>();
Review Comment:
We can leverage lambda here.
```java
Map<Integer, BrokerNode> brokerNodes =
IntStream.range(startBrokerId(), startBrokerId() + numBrokerNodes)
.boxed().collect(Collectors.toMap(Function.identity(), id ->
BrokerNode.builder()
.setId(id)
.setNumLogDirectories(numDisksPerBroker)
.setBaseDirectory(baseDirectory)
.setClusterId(clusterId)
.setCombined(combined)
.setPropertyOverrides(perBrokerProperties.getOrDefault(id,
Collections.emptyMap()))
.build()));
```
##########
core/src/test/java/kafka/testkit/TestKitNodes.java:
##########
@@ -59,73 +65,64 @@ public Builder setCombined(boolean combined) {
}
public Builder setNumControllerNodes(int numControllerNodes) {
- if (numControllerNodes < 0) {
- throw new RuntimeException("Invalid negative value for
numControllerNodes");
- }
-
- while (controllerNodeBuilders.size() > numControllerNodes) {
- controllerNodeBuilders.pollFirstEntry();
- }
- while (controllerNodeBuilders.size() < numControllerNodes) {
- int nextId = startControllerId();
- if (!controllerNodeBuilders.isEmpty()) {
- nextId = controllerNodeBuilders.lastKey() + 1;
- }
- controllerNodeBuilders.put(nextId,
- new ControllerNode.Builder().
- setId(nextId));
- }
+ this.numControllerNodes = numControllerNodes;
return this;
}
public Builder setNumBrokerNodes(int numBrokerNodes) {
- return setBrokerNodes(numBrokerNodes, 1);
+ this.numBrokerNodes = numBrokerNodes;
+ return this;
+ }
+
+ public Builder setNumDisksPerBroker(int numDisksPerBroker) {
+ this.numDisksPerBroker = numDisksPerBroker;
+ return this;
+ }
+
+ public Builder setPerBrokerProperties(Map<Integer, Map<String,
String>> perBrokerProperties) {
+ this.perBrokerProperties = Collections.unmodifiableMap(
+ perBrokerProperties.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
Collections.unmodifiableMap(new HashMap<>(e.getValue())))));
+ return this;
}
- public Builder setBrokerNodes(int numBrokerNodes, int disksPerBroker) {
+ public TestKitNodes build() {
+ if (numControllerNodes < 0) {
+ throw new RuntimeException("Invalid negative value for
numControllerNodes");
+ }
if (numBrokerNodes < 0) {
throw new RuntimeException("Invalid negative value for
numBrokerNodes");
}
- if (disksPerBroker <= 0) {
- throw new RuntimeException("Invalid value for disksPerBroker");
- }
- while (brokerNodeBuilders.size() > numBrokerNodes) {
- brokerNodeBuilders.pollFirstEntry();
+ if (numDisksPerBroker <= 0) {
+ throw new RuntimeException("Invalid value for
numDisksPerBroker");
}
- while (brokerNodeBuilders.size() < numBrokerNodes) {
- int nextId = startBrokerId();
- if (!brokerNodeBuilders.isEmpty()) {
- nextId = brokerNodeBuilders.lastKey() + 1;
- }
- BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder()
- .setId(nextId)
- .setNumLogDirectories(disksPerBroker);
- brokerNodeBuilders.put(nextId, brokerNodeBuilder);
- }
- return this;
- }
- public TestKitNodes build() {
String baseDirectory = TestUtils.tempDirectory().getAbsolutePath();
try {
if (clusterId == null) {
clusterId = Uuid.randomUuid();
}
TreeMap<Integer, ControllerNode> controllerNodes = new
TreeMap<>();
Review Comment:
ditto. please try to use lambda to simplify code.
##########
core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala:
##########
@@ -84,17 +125,38 @@ class ApiVersionsRequestTest(cluster: ClusterInstance)
extends AbstractApiVersio
assertEquals(ApiKeys.API_VERSIONS.latestVersion(), apiVersion.maxVersion())
}
- @ClusterTest(metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties
= Array(
- new ClusterConfigProperty(key = "unstable.api.versions.enable", value =
"false"),
- new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value
= "false"),
+ @ClusterTests(Array(
Review Comment:
please add TODO and KAFKA-xxxx
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]