chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1573452044
##########
core/src/test/java/kafka/test/ClusterConfig.java:
##########
@@ -139,28 +159,46 @@ public Map<String, String> nameTags() {
return tags;
}
- public ClusterConfig copyOf() {
- ClusterConfig copy = new ClusterConfig(type, brokers, controllers,
name, autoStart, securityProtocol, listenerName, trustStoreFile,
metadataVersion);
- copy.serverProperties.putAll(serverProperties);
- copy.producerProperties.putAll(producerProperties);
- copy.consumerProperties.putAll(consumerProperties);
- copy.saslServerProperties.putAll(saslServerProperties);
- copy.saslClientProperties.putAll(saslClientProperties);
- perBrokerOverrideProperties.forEach((brokerId, props) -> {
- Properties propsCopy = new Properties();
- propsCopy.putAll(props);
- copy.perBrokerOverrideProperties.put(brokerId, propsCopy);
- });
- return copy;
+ public static Builder defaultClusterBuilder() {
+ return new Builder()
+ .type(Type.ZK)
+ .brokers(1)
+ .controllers(1)
+ .autoStart(true)
+ .securityProtocol(SecurityProtocol.PLAINTEXT)
+ .metadataVersion(MetadataVersion.latestTesting());
}
- public static Builder defaultClusterBuilder() {
- return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT,
MetadataVersion.latestTesting());
+ public static Builder clusterBuilder() {
Review Comment:
It seems to me `builder` is good enough.
##########
core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java:
##########
@@ -349,4 +277,86 @@ public Stream<KafkaServer> servers() {
return
JavaConverters.asJavaCollection(clusterReference.get().servers()).stream();
}
}
+
+ private static class ClusterConfigurableIntegrationHarness extends
IntegrationTestHarness {
+ private ClusterConfig clusterConfig;
+
+ public ClusterConfigurableIntegrationHarness(ClusterConfig
clusterConfig) {
Review Comment:
`public` -> `private`
##########
core/src/test/java/kafka/test/ClusterConfig.java:
##########
@@ -139,28 +159,46 @@ public Map<String, String> nameTags() {
return tags;
}
- public ClusterConfig copyOf() {
- ClusterConfig copy = new ClusterConfig(type, brokers, controllers,
name, autoStart, securityProtocol, listenerName, trustStoreFile,
metadataVersion);
- copy.serverProperties.putAll(serverProperties);
- copy.producerProperties.putAll(producerProperties);
- copy.consumerProperties.putAll(consumerProperties);
- copy.saslServerProperties.putAll(saslServerProperties);
- copy.saslClientProperties.putAll(saslClientProperties);
- perBrokerOverrideProperties.forEach((brokerId, props) -> {
- Properties propsCopy = new Properties();
- propsCopy.putAll(props);
- copy.perBrokerOverrideProperties.put(brokerId, propsCopy);
- });
- return copy;
+ public static Builder defaultClusterBuilder() {
+ return new Builder()
+ .type(Type.ZK)
+ .brokers(1)
+ .controllers(1)
+ .autoStart(true)
+ .securityProtocol(SecurityProtocol.PLAINTEXT)
+ .metadataVersion(MetadataVersion.latestTesting());
}
- public static Builder defaultClusterBuilder() {
- return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT,
MetadataVersion.latestTesting());
+ public static Builder clusterBuilder() {
+ return new Builder();
}
- public static Builder clusterBuilder(Type type, int brokers, int
controllers, boolean autoStart,
- SecurityProtocol securityProtocol,
MetadataVersion metadataVersion) {
- return new Builder(type, brokers, controllers, autoStart,
securityProtocol, metadataVersion);
+ public static Builder clusterBuilder(ClusterConfig clusterConfig) {
+ ClusterConfig.Builder builder = new Builder()
+ .type(clusterConfig.type)
+ .brokers(clusterConfig.brokers)
+ .controllers(clusterConfig.controllers)
+ .name(clusterConfig.name)
+ .autoStart(clusterConfig.autoStart)
+ .securityProtocol(clusterConfig.securityProtocol)
+ .listenerName(clusterConfig.listenerName)
+ .trustStoreFile(clusterConfig.trustStoreFile)
+ .metadataVersion(clusterConfig.metadataVersion);
+ builder.serverProperties = copyOf(clusterConfig.serverProperties);
+ builder.producerProperties = copyOf(clusterConfig.producerProperties);
+ builder.consumerProperties = copyOf(clusterConfig.consumerProperties);
+ builder.adminClientProperties =
copyOf(clusterConfig.adminClientProperties);
+ builder.saslServerProperties =
copyOf(clusterConfig.saslServerProperties);
+ builder.saslClientProperties =
copyOf(clusterConfig.saslClientProperties);
+ clusterConfig.perBrokerOverrideProperties.forEach((brokerId, props) ->
+ builder.perBrokerOverrideProperties.put(brokerId,
copyOf(props)));
+ return builder;
+ }
+
+ private static Properties copyOf(Properties properties) {
Review Comment:
Instead of creating copy, we can change the type of inner variable from
`Properties` to `Map` to return unmodifiable view.
##########
core/src/test/java/kafka/test/ClusterConfig.java:
##########
@@ -139,28 +159,46 @@ public Map<String, String> nameTags() {
return tags;
}
- public ClusterConfig copyOf() {
- ClusterConfig copy = new ClusterConfig(type, brokers, controllers,
name, autoStart, securityProtocol, listenerName, trustStoreFile,
metadataVersion);
- copy.serverProperties.putAll(serverProperties);
- copy.producerProperties.putAll(producerProperties);
- copy.consumerProperties.putAll(consumerProperties);
- copy.saslServerProperties.putAll(saslServerProperties);
- copy.saslClientProperties.putAll(saslClientProperties);
- perBrokerOverrideProperties.forEach((brokerId, props) -> {
- Properties propsCopy = new Properties();
- propsCopy.putAll(props);
- copy.perBrokerOverrideProperties.put(brokerId, propsCopy);
- });
- return copy;
+ public static Builder defaultClusterBuilder() {
+ return new Builder()
+ .type(Type.ZK)
+ .brokers(1)
+ .controllers(1)
+ .autoStart(true)
+ .securityProtocol(SecurityProtocol.PLAINTEXT)
+ .metadataVersion(MetadataVersion.latestTesting());
}
- public static Builder defaultClusterBuilder() {
- return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT,
MetadataVersion.latestTesting());
+ public static Builder clusterBuilder() {
+ return new Builder();
}
- public static Builder clusterBuilder(Type type, int brokers, int
controllers, boolean autoStart,
- SecurityProtocol securityProtocol,
MetadataVersion metadataVersion) {
- return new Builder(type, brokers, controllers, autoStart,
securityProtocol, metadataVersion);
+ public static Builder clusterBuilder(ClusterConfig clusterConfig) {
Review Comment:
Could we add a constructor to Builder to accept a `ClusterConfig` to
initialize a builder?
##########
core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala:
##########
@@ -17,48 +17,78 @@
package kafka.server
-import kafka.test.{ClusterConfig, ClusterInstance}
+import kafka.test.ClusterInstance
import org.apache.kafka.common.message.ApiVersionsRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.ApiVersionsRequest
-import kafka.test.annotation.{ClusterConfigProperty, ClusterTest,
ClusterTestDefaults, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest,
ClusterTestDefaults, ClusterTests, Type}
import kafka.test.junit.ClusterTestExtensions
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.extension.ExtendWith
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1)
+@ClusterTestDefaults(brokers = 1)
class ApiVersionsRequestTest(cluster: ClusterInstance) extends
AbstractApiVersionsRequestTest(cluster) {
- @BeforeEach
- def setup(config: ClusterConfig): Unit = {
- super.brokerPropertyOverrides(config.serverProperties())
- }
-
- @ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties
= Array(
- new ClusterConfigProperty(key = "unstable.api.versions.enable", value =
"false"),
- new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value
= "true"),
+ @ClusterTests(Array(
+ new ClusterTest(clusterType = Type.ZK, metadataVersion =
MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
Review Comment:
Pardon me. Why we need those changes?
##########
core/src/test/java/kafka/testkit/BrokerNode.java:
##########
@@ -66,11 +68,27 @@ public Builder setNumLogDirectories(int numLogDirectories) {
return this;
}
- public BrokerNode build(
- String baseDirectory,
- Uuid clusterId,
- boolean combined
- ) {
+ public Builder setClusterId(Uuid clusterId) {
+ this.clusterId = clusterId;
+ return this;
+ }
+
+ public Builder setBaseDirectory(String baseDirectory) {
+ this.baseDirectory = baseDirectory;
+ return this;
+ }
+
+ public Builder setCombined(boolean combined) {
+ this.combined = combined;
+ return this;
+ }
+
+ public Builder setPropertyOverrides(Map<String, String>
propertyOverrides) {
+ this.propertyOverrides = propertyOverrides;
Review Comment:
We should do a deep copy to avoid modification from caller in the future.
##########
core/src/test/java/kafka/test/ClusterConfig.java:
##########
@@ -139,28 +159,46 @@ public Map<String, String> nameTags() {
return tags;
}
- public ClusterConfig copyOf() {
- ClusterConfig copy = new ClusterConfig(type, brokers, controllers,
name, autoStart, securityProtocol, listenerName, trustStoreFile,
metadataVersion);
- copy.serverProperties.putAll(serverProperties);
- copy.producerProperties.putAll(producerProperties);
- copy.consumerProperties.putAll(consumerProperties);
- copy.saslServerProperties.putAll(saslServerProperties);
- copy.saslClientProperties.putAll(saslClientProperties);
- perBrokerOverrideProperties.forEach((brokerId, props) -> {
- Properties propsCopy = new Properties();
- propsCopy.putAll(props);
- copy.perBrokerOverrideProperties.put(brokerId, propsCopy);
- });
- return copy;
+ public static Builder defaultClusterBuilder() {
+ return new Builder()
+ .type(Type.ZK)
+ .brokers(1)
+ .controllers(1)
+ .autoStart(true)
+ .securityProtocol(SecurityProtocol.PLAINTEXT)
+ .metadataVersion(MetadataVersion.latestTesting());
}
- public static Builder defaultClusterBuilder() {
- return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT,
MetadataVersion.latestTesting());
+ public static Builder clusterBuilder() {
+ return new Builder();
}
- public static Builder clusterBuilder(Type type, int brokers, int
controllers, boolean autoStart,
- SecurityProtocol securityProtocol,
MetadataVersion metadataVersion) {
- return new Builder(type, brokers, controllers, autoStart,
securityProtocol, metadataVersion);
+ public static Builder clusterBuilder(ClusterConfig clusterConfig) {
+ ClusterConfig.Builder builder = new Builder()
+ .type(clusterConfig.type)
+ .brokers(clusterConfig.brokers)
+ .controllers(clusterConfig.controllers)
+ .name(clusterConfig.name)
+ .autoStart(clusterConfig.autoStart)
+ .securityProtocol(clusterConfig.securityProtocol)
+ .listenerName(clusterConfig.listenerName)
+ .trustStoreFile(clusterConfig.trustStoreFile)
+ .metadataVersion(clusterConfig.metadataVersion);
+ builder.serverProperties = copyOf(clusterConfig.serverProperties);
Review Comment:
why we don't use setter here?
##########
core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java:
##########
@@ -96,83 +98,7 @@ public List<Extension> getAdditionalExtensions() {
// have run. This allows tests to set up external dependencies
like ZK, MiniKDC, etc.
// However, since we cannot create this instance until we are
inside the test invocation, we have
// to use a container class (AtomicReference) to provide this
cluster object to the test itself
-
- // This is what tests normally extend from to start a cluster,
here we create it anonymously and
Review Comment:
Could we keep this comment?
##########
core/src/test/java/kafka/test/ClusterConfig.java:
##########
@@ -139,28 +159,46 @@ public Map<String, String> nameTags() {
return tags;
}
- public ClusterConfig copyOf() {
- ClusterConfig copy = new ClusterConfig(type, brokers, controllers,
name, autoStart, securityProtocol, listenerName, trustStoreFile,
metadataVersion);
- copy.serverProperties.putAll(serverProperties);
- copy.producerProperties.putAll(producerProperties);
- copy.consumerProperties.putAll(consumerProperties);
- copy.saslServerProperties.putAll(saslServerProperties);
- copy.saslClientProperties.putAll(saslClientProperties);
- perBrokerOverrideProperties.forEach((brokerId, props) -> {
- Properties propsCopy = new Properties();
- propsCopy.putAll(props);
- copy.perBrokerOverrideProperties.put(brokerId, propsCopy);
- });
- return copy;
+ public static Builder defaultClusterBuilder() {
+ return new Builder()
+ .type(Type.ZK)
+ .brokers(1)
+ .controllers(1)
+ .autoStart(true)
+ .securityProtocol(SecurityProtocol.PLAINTEXT)
+ .metadataVersion(MetadataVersion.latestTesting());
}
- public static Builder defaultClusterBuilder() {
- return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT,
MetadataVersion.latestTesting());
+ public static Builder clusterBuilder() {
Review Comment:
BTW, could we apply this pattern to `BrokerNode` and `ControllerNode` that
we encourage users to create builder by static method?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]