chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1580513112
##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -57,16 +57,18 @@ import org.slf4j.{Logger, LoggerFactory}
import java.util
import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit}
-import java.util.{Collections, Optional, Properties, UUID}
+import java.util.{Collections, Optional, UUID}
import scala.collection.Seq
import scala.jdk.CollectionConverters._
object ZkMigrationIntegrationTest {
- def addZkBrokerProps(props: Properties): Unit = {
- props.setProperty("inter.broker.listener.name", "EXTERNAL")
- props.setProperty("listeners",
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0")
- props.setProperty("advertised.listeners",
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0")
- props.setProperty("listener.security.protocol.map",
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+ def addZkBrokerProps(builder: ClusterConfig.Builder): Unit = {
Review Comment:
It seems this method is used by `zkClustersForAllMigrationVersions` only. We
can merge them into one method. For example:
```scala
val serverProperties = new util.HashMap[String, String]()
serverProperties.put("inter.broker.listener.name", "EXTERNAL")
serverProperties.put("listeners",
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0")
serverProperties.put("advertised.listeners",
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0")
serverProperties.put("listener.security.protocol.map",
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
clusterGenerator.accept(ClusterConfig.defaultBuilder()
.setMetadataVersion(mv)
.setBrokers(3)
.setType(Type.ZK)
.setServerProperties(serverProperties).build())
```
##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -453,11 +464,16 @@ class ZkMigrationIntegrationTest {
// Enable migration configs and restart brokers
log.info("Restart brokers in migration mode")
-
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp,
"true")
-
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG,
kraftCluster.quorumVotersConfig())
-
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
"CONTROLLER")
-
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
"CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
- zkCluster.rollingBrokerRestart()
+ val serverProperties = new util.HashMap[String, String]()
+ serverProperties.putAll(zkCluster.config().serverProperties())
Review Comment:
ditto
##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -517,11 +533,16 @@ class ZkMigrationIntegrationTest {
// Enable migration configs and restart brokers
log.info("Restart brokers in migration mode")
-
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp,
"true")
-
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG,
kraftCluster.quorumVotersConfig())
-
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
"CONTROLLER")
-
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
"CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
- zkCluster.rollingBrokerRestart() // This would throw if authorizers
weren't allowed
+ val serverProperties = new util.HashMap[String, String]()
+ serverProperties.putAll(zkCluster.config().serverProperties())
Review Comment:
ditto
##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -802,11 +838,16 @@ class ZkMigrationIntegrationTest {
// Enable migration configs and restart brokers
log.info("Restart brokers in migration mode")
-
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp,
"true")
-
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG,
kraftCluster.quorumVotersConfig())
-
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
"CONTROLLER")
-
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
"CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
- zkCluster.rollingBrokerRestart()
+ val serverProperties = new util.HashMap[String, String]()
+ serverProperties.putAll(zkCluster.config().serverProperties())
Review Comment:
ditto
##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -667,11 +693,16 @@ class ZkMigrationIntegrationTest {
// Enable migration configs and restart brokers
log.info("Restart brokers in migration mode")
-
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp,
"true")
-
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG,
kraftCluster.quorumVotersConfig())
-
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
"CONTROLLER")
-
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
"CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
- zkCluster.rollingBrokerRestart()
+ val serverProperties = new util.HashMap[String, String]()
+ serverProperties.putAll(zkCluster.config().serverProperties())
Review Comment:
ditto
##########
core/src/test/java/kafka/test/ClusterConfig.java:
##########
@@ -139,28 +151,38 @@ public Map<String, String> nameTags() {
return tags;
}
- public ClusterConfig copyOf() {
- ClusterConfig copy = new ClusterConfig(type, brokers, controllers,
name, autoStart, securityProtocol, listenerName, trustStoreFile,
metadataVersion);
- copy.serverProperties.putAll(serverProperties);
- copy.producerProperties.putAll(producerProperties);
- copy.consumerProperties.putAll(consumerProperties);
- copy.saslServerProperties.putAll(saslServerProperties);
- copy.saslClientProperties.putAll(saslClientProperties);
- perBrokerOverrideProperties.forEach((brokerId, props) -> {
- Properties propsCopy = new Properties();
- propsCopy.putAll(props);
- copy.perBrokerOverrideProperties.put(brokerId, propsCopy);
- });
- return copy;
+ public static Builder defaultBuilder() {
+ return new Builder()
+ .setType(Type.ZK)
+ .setBrokers(1)
+ .setControllers(1)
+ .setAutoStart(true)
+ .setSecurityProtocol(SecurityProtocol.PLAINTEXT)
+ .setMetadataVersion(MetadataVersion.latestTesting());
}
- public static Builder defaultClusterBuilder() {
- return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT,
MetadataVersion.latestTesting());
+ public static Builder builder() {
+ return new Builder();
}
- public static Builder clusterBuilder(Type type, int brokers, int
controllers, boolean autoStart,
- SecurityProtocol securityProtocol,
MetadataVersion metadataVersion) {
- return new Builder(type, brokers, controllers, autoStart,
securityProtocol, metadataVersion);
+ public static Builder builder(ClusterConfig clusterConfig) {
Review Comment:
Could you please add unit test for this method? That deep copy is
error-prone.
##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -319,11 +325,16 @@ class ZkMigrationIntegrationTest {
// Enable migration configs and restart brokers
log.info("Restart brokers in migration mode")
-
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp,
"true")
-
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG,
kraftCluster.quorumVotersConfig())
-
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
"CONTROLLER")
-
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
"CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
- zkCluster.rollingBrokerRestart()
+ val serverProperties = new util.HashMap[String, String]()
+ serverProperties.putAll(zkCluster.config().serverProperties())
Review Comment:
ditto
##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -602,11 +623,16 @@ class ZkMigrationIntegrationTest {
// Enable migration configs and restart brokers
log.info("Restart brokers in migration mode")
-
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp,
"true")
-
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG,
kraftCluster.quorumVotersConfig())
-
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
"CONTROLLER")
-
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
"CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
- zkCluster.rollingBrokerRestart()
+ val serverProperties = new util.HashMap[String, String]()
+ serverProperties.putAll(zkCluster.config().serverProperties())
Review Comment:
ditto
##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -187,11 +188,16 @@ class ZkMigrationIntegrationTest {
// Enable migration configs and restart brokers
log.info("Restart brokers in migration mode")
-
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp,
"true")
-
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG,
kraftCluster.quorumVotersConfig())
-
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
"CONTROLLER")
-
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
"CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
- zkCluster.rollingBrokerRestart() // This would throw if authorizers
weren't allowed
+ val serverProperties = new util.HashMap[String, String]()
Review Comment:
```java
val serverProperties = new util.HashMap[String,
String](zkCluster.config().serverProperties())
```
##########
core/src/test/java/kafka/test/ClusterTestExtensionsTest.java:
##########
@@ -48,14 +51,18 @@ public class ClusterTestExtensionsTest {
// Static methods can generate cluster configurations
static void generate1(ClusterGenerator clusterGenerator) {
-
clusterGenerator.accept(ClusterConfig.defaultClusterBuilder().name("Generated
Test").build());
+ Map<String, String> serverProperties = new HashMap<>();
+ serverProperties.put("foo", "bar");
+ clusterGenerator.accept(ClusterConfig.defaultBuilder()
+ .setName("Generated Test")
+ .setServerProperties(serverProperties)
+ .build());
}
// BeforeEach run after class construction, but before cluster
initialization and test invocation
@BeforeEach
public void beforeEach(ClusterConfig config) {
Review Comment:
As `ClusterConfig` becomes immutable object, developers can modify the
configs by injection of `ClusterConfig`. Maybe we should remove this usage from
code base. WDYT?
##########
core/src/test/java/kafka/test/ClusterTestExtensionsTest.java:
##########
@@ -48,14 +51,18 @@ public class ClusterTestExtensionsTest {
// Static methods can generate cluster configurations
static void generate1(ClusterGenerator clusterGenerator) {
-
clusterGenerator.accept(ClusterConfig.defaultClusterBuilder().name("Generated
Test").build());
+ Map<String, String> serverProperties = new HashMap<>();
+ serverProperties.put("foo", "bar");
+ clusterGenerator.accept(ClusterConfig.defaultBuilder()
+ .setName("Generated Test")
+ .setServerProperties(serverProperties)
Review Comment:
We can use `Collections.singletonMap("foo", "bar")`
##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -727,11 +758,16 @@ class ZkMigrationIntegrationTest {
// Enable migration configs and restart brokers
log.info("Restart brokers in migration mode")
-
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp,
"true")
-
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG,
kraftCluster.quorumVotersConfig())
-
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
"CONTROLLER")
-
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
"CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
- zkCluster.rollingBrokerRestart()
+ val serverProperties = new util.HashMap[String, String]()
+ serverProperties.putAll(zkCluster.config().serverProperties())
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]