chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1573839545
##########
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##########
@@ -290,7 +287,7 @@ public void waitForReadyBrokers() throws
InterruptedException {
}
@Override
- public void rollingBrokerRestart() {
+ public void rollingBrokerRestart(Optional<ClusterConfig>
clusterConfig) {
Review Comment:
As not all implementation support this method, we should remove it from
interface. The callers can use `getUnderlying` to get zk instance and call that
method
##########
core/src/test/java/kafka/test/ClusterConfig.java:
##########
@@ -211,13 +186,36 @@ public static class Builder {
private String listenerName;
private File trustStoreFile;
private MetadataVersion metadataVersion;
- private Properties serverProperties = new Properties();
- private Properties producerProperties = new Properties();
- private Properties consumerProperties = new Properties();
- private Properties adminClientProperties = new Properties();
- private Properties saslServerProperties = new Properties();
- private Properties saslClientProperties = new Properties();
- private final Map<Integer, Properties> perBrokerOverrideProperties =
new HashMap<>();
+ private Map<String, String> serverProperties = new HashMap<>();
+ private Map<String, String> producerProperties = new HashMap<>();
+ private Map<String, String> consumerProperties = new HashMap<>();
+ private Map<String, String> adminClientProperties = new HashMap<>();
+ private Map<String, String> saslServerProperties = new HashMap<>();
+ private Map<String, String> saslClientProperties = new HashMap<>();
+ private Map<Integer, Map<String, String>> perBrokerOverrideProperties
= new HashMap<>();
+
+ Builder() {}
+
+ Builder(ClusterConfig clusterConfig) {
+ this.type = clusterConfig.type;
+ this.brokers = clusterConfig.brokers;
+ this.controllers = clusterConfig.controllers;
+ this.name = clusterConfig.name;
+ this.autoStart = clusterConfig.autoStart;
+ this.securityProtocol = clusterConfig.securityProtocol;
+ this.listenerName = clusterConfig.listenerName;
+ this.trustStoreFile = clusterConfig.trustStoreFile;
+ this.metadataVersion = clusterConfig.metadataVersion;
+ this.serverProperties = new
HashMap<>(clusterConfig.serverProperties);
+ this.producerProperties = new
HashMap<>(clusterConfig.producerProperties);
+ this.consumerProperties = new
HashMap<>(clusterConfig.consumerProperties);
+ this.adminClientProperties = new
HashMap<>(clusterConfig.adminClientProperties);
+ this.saslServerProperties = new
HashMap<>(clusterConfig.saslServerProperties);
+ this.saslClientProperties = new
HashMap<>(clusterConfig.saslClientProperties);
+ Map<Integer, Map<String, String>> perBrokerOverrideProps = new
HashMap<>();
+ clusterConfig.perBrokerOverrideProperties.forEach((k, v) ->
perBrokerOverrideProps.put(k, new HashMap<>(v)));
+ this.perBrokerOverrideProperties = perBrokerOverrideProps;
Review Comment:
```java
this.perBrokerOverrideProperties =
clusterConfig.perBrokerOverrideProperties.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> new
HashMap<>(e.getValue())));
```
##########
core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala:
##########
@@ -18,41 +18,58 @@ package kafka.server
import java.net.Socket
import java.util.Collections
-
import kafka.api.{KafkaSasl, SaslSetup}
-import kafka.test.annotation.{ClusterTest, Type}
+import kafka.server.SaslApiVersionsRequestTest.{kafkaClientSaslMechanism,
kafkaServerSaslMechanisms}
+import kafka.test.annotation.{ClusterTemplate, Type}
import kafka.test.junit.ClusterTestExtensions
-import kafka.test.{ClusterConfig, ClusterInstance}
+import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance}
import kafka.utils.JaasTestUtils
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.message.SaslHandshakeRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ApiVersionsRequest,
ApiVersionsResponse, SaslHandshakeRequest, SaslHandshakeResponse}
import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.server.config.KafkaSecurityConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{AfterEach, BeforeEach}
import scala.jdk.CollectionConverters._
+object SaslApiVersionsRequestTest {
+ val kafkaClientSaslMechanism = "PLAIN"
+ val kafkaServerSaslMechanisms: Seq[String] = List("PLAIN")
+ val controlPlaneListenerName = "CONTROL_PLANE"
+ val securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+
+ def saslApiVersionsRequestClusterConfig(clusterGenerator: ClusterGenerator):
Unit = {
+ clusterGenerator.accept(ClusterConfig.defaultBuilder
+ .securityProtocol(securityProtocol)
+ .`type`(Type.ZK)
+
.putSaslServerProperty(KafkaSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG,
kafkaClientSaslMechanism)
+
.putSaslServerProperty(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
kafkaServerSaslMechanisms.mkString(","))
+ .putSaslClientProperty(SaslConfigs.SASL_MECHANISM,
kafkaClientSaslMechanism)
+ // Configure control plane listener to make sure we have separate
listeners for testing.
+ .putServerProperty(KafkaConfig.ControlPlaneListenerNameProp,
controlPlaneListenerName)
+ .putServerProperty(KafkaConfig.ListenerSecurityProtocolMapProp,
s"$controlPlaneListenerName:$securityProtocol,$securityProtocol:$securityProtocol")
+ .putServerProperty("listeners",
s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")
+ .putServerProperty(KafkaConfig.AdvertisedListenersProp,
s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")
+ .build())
+ }
+}
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends
AbstractApiVersionsRequestTest(cluster) {
-
- val kafkaClientSaslMechanism = "PLAIN"
- val kafkaServerSaslMechanisms = List("PLAIN")
-
private var sasl: SaslSetup = _
@BeforeEach
- def setupSasl(config: ClusterConfig): Unit = {
+ def setupSasl(): Unit = {
sasl = new SaslSetup() {}
Review Comment:
Could we create `sasl` in initialization?
##########
core/src/test/java/kafka/test/ClusterConfig.java:
##########
@@ -67,13 +69,16 @@ public class ClusterConfig {
this.listenerName = listenerName;
this.trustStoreFile = trustStoreFile;
this.metadataVersion = metadataVersion;
- this.serverProperties = copyOf(serverProperties);
- this.producerProperties = copyOf(producerProperties);
- this.consumerProperties = copyOf(consumerProperties);
- this.adminClientProperties = copyOf(adminClientProperties);
- this.saslServerProperties = copyOf(saslServerProperties);
- this.saslClientProperties = copyOf(saslClientProperties);
- perBrokerOverrideProperties.forEach((brokerId, props) ->
this.perBrokerOverrideProperties.put(brokerId, copyOf(props)));
+ this.serverProperties = Collections.unmodifiableMap(serverProperties);
+ this.producerProperties =
Collections.unmodifiableMap(producerProperties);
+ this.consumerProperties =
Collections.unmodifiableMap(consumerProperties);
+ this.adminClientProperties =
Collections.unmodifiableMap(adminClientProperties);
+ this.saslServerProperties =
Collections.unmodifiableMap(saslServerProperties);
+ this.saslClientProperties =
Collections.unmodifiableMap(saslClientProperties);
+ this.perBrokerOverrideProperties = Collections.unmodifiableMap(
Review Comment:
In order to keep consistency, please do deep copy in eight Builder or this
constructor
##########
core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala:
##########
@@ -17,48 +17,78 @@
package kafka.server
-import kafka.test.{ClusterConfig, ClusterInstance}
+import kafka.test.ClusterInstance
import org.apache.kafka.common.message.ApiVersionsRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.ApiVersionsRequest
-import kafka.test.annotation.{ClusterConfigProperty, ClusterTest,
ClusterTestDefaults, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest,
ClusterTestDefaults, ClusterTests, Type}
import kafka.test.junit.ClusterTestExtensions
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.extension.ExtendWith
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1)
+@ClusterTestDefaults(brokers = 1)
class ApiVersionsRequestTest(cluster: ClusterInstance) extends
AbstractApiVersionsRequestTest(cluster) {
- @BeforeEach
- def setup(config: ClusterConfig): Unit = {
- super.brokerPropertyOverrides(config.serverProperties())
- }
-
- @ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties
= Array(
- new ClusterConfigProperty(key = "unstable.api.versions.enable", value =
"false"),
- new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value
= "true"),
+ @ClusterTests(Array(
+ new ClusterTest(clusterType = Type.ZK, metadataVersion =
MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
Review Comment:
Could we simplify the annotation by allowing to define `ClusterTemplate` in
`ClusterTests`?
##########
core/src/test/java/kafka/testkit/TestKitNodes.java:
##########
@@ -97,14 +96,32 @@ public Builder setBrokerNodes(int numBrokerNodes, int
disksPerBroker) {
if (!brokerNodeBuilders.isEmpty()) {
nextId = brokerNodeBuilders.lastKey() + 1;
}
- BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder()
+ BrokerNode.Builder brokerNodeBuilder = BrokerNode.builder()
.setId(nextId)
.setNumLogDirectories(disksPerBroker);
brokerNodeBuilders.put(nextId, brokerNodeBuilder);
}
return this;
}
+ /**
+ * Set per broker properties overrides, this setter must be invoked
after setBrokerNodes which
Review Comment:
The happen-before is anti-pattern in builder pattern. Could we merge
`PerBrokerPropertiesOverrides` to `setNumControllerNodes/setNumBrokerNodes`?
for example:
```java
setBrokerNodes(int numBrokerNodes, int disksPerBroker, Function<Integer,
Map<String, Object>> perBrokerConfigs)
```
##########
core/src/test/java/kafka/test/ClusterConfig.java:
##########
@@ -22,11 +22,13 @@
import org.apache.kafka.server.common.MetadataVersion;
import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
-import java.util.Properties;
+import java.util.stream.Collectors;
/**
Review Comment:
please add comments to say this class is a immutable object
##########
core/src/test/java/kafka/testkit/TestKitNodes.java:
##########
@@ -97,14 +96,32 @@ public Builder setBrokerNodes(int numBrokerNodes, int
disksPerBroker) {
if (!brokerNodeBuilders.isEmpty()) {
nextId = brokerNodeBuilders.lastKey() + 1;
}
- BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder()
+ BrokerNode.Builder brokerNodeBuilder = BrokerNode.builder()
.setId(nextId)
.setNumLogDirectories(disksPerBroker);
brokerNodeBuilders.put(nextId, brokerNodeBuilder);
}
return this;
}
+ /**
+ * Set per broker properties overrides, this setter must be invoked
after setBrokerNodes which
+ * setup broker id and broker builder.
+ * @param perBrokerPropertiesOverrides properties to override in each
broker
+ * @return Builder
+ */
+ public Builder setPerBrokerPropertiesOverrides(Map<Integer,
Map<String, String>> perBrokerPropertiesOverrides) {
+ perBrokerPropertiesOverrides.forEach((brokerId, properties) -> {
+ if (!brokerNodeBuilders.containsKey(brokerId)) {
+ throw new RuntimeException("Broker id " + brokerId + "
does not exist");
+ }
+ Map<String, String> propertiesOverride = new HashMap<>();
+ properties.forEach((key, value) ->
propertiesOverride.put(key.toString(), value.toString()));
Review Comment:
`toString` is redundant.
##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -435,15 +443,13 @@ class KRaftClusterTest {
private def doOnStartedKafkaCluster(numControllerNodes: Int = 1,
numBrokerNodes: Int,
- brokerPropertyOverrides: (TestKitNodes,
BrokerNode) => Map[String, String])
+ brokerPropertyOverrides:
util.Map[Integer, util.Map[String, String]])
(action: KafkaClusterTestKit => Unit):
Unit = {
val nodes = new TestKitNodes.Builder()
Review Comment:
Could we change the style to `TestKitNodes.builder()` so as to unify the
builder pattern in testing?
##########
core/src/test/java/kafka/testkit/BrokerNode.java:
##########
@@ -66,11 +74,27 @@ public Builder setNumLogDirectories(int numLogDirectories) {
return this;
}
- public BrokerNode build(
- String baseDirectory,
- Uuid clusterId,
- boolean combined
- ) {
+ public Builder setClusterId(Uuid clusterId) {
Review Comment:
The builder of `ClusterConfig` do not have `set` prefix. Could we unify the
naming?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]