C0urante commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1688407540


##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -95,98 +88,60 @@
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG;
-import static 
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
-import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
 
 /**
- * Setup an embedded Kafka cluster with specified number of brokers and 
specified broker properties. To be used for
- * integration tests.
+ * Setup an embedded Kafka KRaft cluster (using {@link 
kafka.testkit.KafkaClusterTestKit} internally) with the
+ * specified number of brokers and the specified broker properties. This can 
be used for integration tests and is
+ * typically used in conjunction with {@link EmbeddedConnectCluster}. 
Additional Kafka client properties can also be
+ * supplied if required. This class also provides various utility methods to 
easily create Kafka topics, produce data,
+ * consume data etc.
  */
 public class EmbeddedKafkaCluster {
 
     private static final Logger log = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
 
-    private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120); 
+    private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120);
 
-    // Kafka Config
-    private final KafkaServer[] brokers;
+    private final KafkaClusterTestKit cluster;
     private final Properties brokerConfig;
-    private final Time time = Time.SYSTEM;
-    private final int[] currentBrokerPorts;
-    private final String[] currentBrokerLogDirs;
-    private final boolean hasListenerConfig;
+    private final Map<String, String> clientConfigs;
 
-    final Map<String, String> clientConfigs;
-
-    private EmbeddedZookeeper zookeeper = null;
-    private ListenerName listenerName = new ListenerName("PLAINTEXT");
     private KafkaProducer<byte[], byte[]> producer;
 
-    public EmbeddedKafkaCluster(final int numBrokers,
-                                final Properties brokerConfig) {
+    public EmbeddedKafkaCluster(final int numBrokers, final Properties 
brokerConfig) {
         this(numBrokers, brokerConfig, Collections.emptyMap());
     }
 
     public EmbeddedKafkaCluster(final int numBrokers,
-                                final Properties brokerConfig,
-                                final Map<String, String> clientConfigs) {
-        brokers = new KafkaServer[numBrokers];
-        currentBrokerPorts = new int[numBrokers];
-        currentBrokerLogDirs = new String[numBrokers];
-        this.brokerConfig = brokerConfig;
-        // Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we 
track whether
-        // a listener config is defined during initialization in order to know 
if it's
-        // safe to override it
-        hasListenerConfig = 
brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null;
+                                   final Properties brokerConfig,
+                                   final Map<String, String> clientConfigs) {
+        addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers);
+        try {
+            KafkaClusterTestKit.Builder clusterBuilder = new 
KafkaClusterTestKit.Builder(

Review Comment:
   I agree that a unified testing style will be better in the long run, but 
we're pretty far away from that with Connect's integration tests.
   
   For the vast majority of our tests, we can't easily switch over to the 
`ClusterTestExtensions` API because our `EmbeddedConnectCluster` automatically 
spins up its own `EmbeddedKafkaCluster`. If we used the annotation-based API, 
we'd have to modify our `EmbeddedConnectCluster` class to not do that and to 
instead accept a `ClusterInstance` that's been created and possibly started 
out-of-band, which would actually be less convenient than our current embedded 
cluster API and IMO is not a path we should pursue.
   
   I also don't think that adopting the `ClusterTestExtensions` API piecemeal 
in Connect (like the proposal in KAFKA-17174 to migrate the 
`OffsetsApiIntegrationTest`) without a long-term strategy to migrate our entire 
collection of integration tests is a good idea. Right now Connect uses a 
consistent style for our all of our integration tests and fragmenting that 
style will just make life harder for the people actually writing and 
maintaining these tests.
   
   TL;DR: While I'm also not opposed to adopting the `ClusterTestExtensions` 
API in Connect, I don't think we should start work on that until we have a plan 
in place for how we can do it for all of our integration tests instead of just 
some.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to