C0urante commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1684857716


##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -95,98 +88,60 @@
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG;
-import static 
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
-import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
 
 /**
- * Setup an embedded Kafka cluster with specified number of brokers and 
specified broker properties. To be used for
- * integration tests.
+ * Setup an embedded Kafka KRaft cluster (using {@link 
kafka.testkit.KafkaClusterTestKit} internally) with the
+ * specified number of brokers and the specified broker properties. This can 
be used for integration tests and is
+ * typically used in conjunction with {@link EmbeddedConnectCluster}. 
Additional Kafka client properties can also be
+ * supplied if required. This class also provides various utility methods to 
easily create Kafka topics, produce data,
+ * consume data etc.
  */
 public class EmbeddedKafkaCluster {
 
     private static final Logger log = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
 
-    private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120); 
+    private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120);
 
-    // Kafka Config
-    private final KafkaServer[] brokers;
+    private final KafkaClusterTestKit cluster;
     private final Properties brokerConfig;
-    private final Time time = Time.SYSTEM;
-    private final int[] currentBrokerPorts;
-    private final String[] currentBrokerLogDirs;
-    private final boolean hasListenerConfig;
+    private final Map<String, String> clientConfigs;
 
-    final Map<String, String> clientConfigs;
-
-    private EmbeddedZookeeper zookeeper = null;
-    private ListenerName listenerName = new ListenerName("PLAINTEXT");
     private KafkaProducer<byte[], byte[]> producer;
 
-    public EmbeddedKafkaCluster(final int numBrokers,
-                                final Properties brokerConfig) {
+    public EmbeddedKafkaCluster(final int numBrokers, final Properties 
brokerConfig) {
         this(numBrokers, brokerConfig, Collections.emptyMap());
     }
 
     public EmbeddedKafkaCluster(final int numBrokers,
-                                final Properties brokerConfig,
-                                final Map<String, String> clientConfigs) {
-        brokers = new KafkaServer[numBrokers];
-        currentBrokerPorts = new int[numBrokers];
-        currentBrokerLogDirs = new String[numBrokers];
-        this.brokerConfig = brokerConfig;
-        // Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we 
track whether
-        // a listener config is defined during initialization in order to know 
if it's
-        // safe to override it
-        hasListenerConfig = 
brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null;
+                                   final Properties brokerConfig,
+                                   final Map<String, String> clientConfigs) {
+        addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers);
+        try {
+            KafkaClusterTestKit.Builder clusterBuilder = new 
KafkaClusterTestKit.Builder(
+                    new TestKitNodes.Builder()
+                            .setCombined(true)

Review Comment:
   Ah, interesting! I think it's possible to use a separate controller 
node/cluster but it's unnecessary for the purposes of our tests. IMO we should 
instead just rename the `stopOnlyBrokers` and `restartOnlyBrokers` methods to 
`stop` and `restart`, respectively.
   
   For the Connect integration tests, we largely treat Kafka like a black box 
that's usually on and, for some scenarios, may be turned off (or generally 
unavailable). FWICT we don't really do anything that requires the kind of 
granularity where we'd distinguish between brokers being unavailable and 
controllers being unavailable.
   
   It's also a little tricky to use separate controllers and brokers because we 
still have to be able to turn our embedded Kafka cluster off and back on again 
with the same port, which becomes difficult when you have to specify a fixed 
port to use across restarts for brokers only. You end up having to abuse the 
`setPerServerProperties` API to accomplish that, and while it's certainly 
possible, it is a bit ugly to expose that cleanly in our internal 
`EmbeddedKafkaCluster` API.
   
   Thoughts?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -199,149 +154,72 @@ private void doStart() {
         producer = new KafkaProducer<>(producerProps, new 
ByteArraySerializer(), new ByteArraySerializer());
     }
 
-    public void stopOnlyKafka() {
-        stop(false, false);
-    }
-
-    public void stop() {
-        stop(true, true);
-    }
-
-    private void stop(boolean deleteLogDirs, boolean stopZK) {
-        maybeShutDownProducer();
-        triggerBrokerShutdown();
-        awaitBrokerShutdown();
-
-        if (deleteLogDirs)
-            deleteLogDirs();
-
-        if (stopZK)
-            stopZK();
-    }
-
-    private void maybeShutDownProducer() {
-        try {
-            if (producer != null) {
-                producer.close();
-            }
-        } catch (Exception e) {
-            log.error("Could not shutdown producer ", e);
-            throw new RuntimeException("Could not shutdown producer", e);
-        }
-    }
-
-    private void triggerBrokerShutdown() {
-        for (KafkaServer broker : brokers) {
-            try {
-                broker.shutdown();
-            } catch (Throwable t) {
-                String msg = String.format("Could not shutdown broker at %s", 
address(broker));
-                log.error(msg, t);
-                throw new RuntimeException(msg, t);
-            }
-        }
-    }
-
-    private void awaitBrokerShutdown() {
-        for (KafkaServer broker : brokers) {
-            try {
-                broker.awaitShutdown();
-            } catch (Throwable t) {
-                String msg = String.format("Failed while awaiting shutdown of 
broker at %s", address(broker));
-                log.error(msg, t);
-                throw new RuntimeException(msg, t);
-            }
-        }
-    }
-
-    private void deleteLogDirs() {
-        for (KafkaServer broker : brokers) {
-            try {
-                log.info("Cleaning up kafka log dirs at {}", 
broker.config().logDirs());
-                CoreUtils.delete(broker.config().logDirs());
-            } catch (Throwable t) {
-                String msg = String.format("Could not clean up log dirs for 
broker at %s",
-                        address(broker));
-                log.error(msg, t);
-                throw new RuntimeException(msg, t);
-            }
-        }
-    }
-
-    private void stopZK() {
-        try {
-            zookeeper.shutdown();
-        } catch (Throwable t) {
-            String msg = String.format("Could not shutdown zookeeper at %s", 
zKConnectString());
-            log.error(msg, t);
-            throw new RuntimeException(msg, t);
-        }
+    /**
+     * Restarts the Kafka brokers. This can be called after {@link 
#stopOnlyBrokers()}. Note that if the Kafka brokers
+     * need to be listening on the same ports as earlier, the {@link 
#brokerConfig} should contain the
+     * {@link SocketServerConfigs#LISTENERS_CONFIG} property and it should use 
a fixed non-zero free port. Also note that this is
+     * only possible when {@code numBrokers} is 1.
+     */
+    public void restartOnlyBrokers() {
+        cluster.brokers().values().forEach(BrokerServer::startup);
     }
 
-    private static void putIfAbsent(final Properties props, final String 
propertyKey, final Object propertyValue) {
-        if (!props.containsKey(propertyKey)) {
-            props.put(propertyKey, propertyValue);
-        }
+    /**
+     * Stop only the Kafka brokers (and not the KRaft controllers). This can 
be used to test Connect's functionality
+     * when the backing Kafka cluster goes offline.
+     */
+    public void stopOnlyBrokers() {
+        cluster.brokers().values().forEach(BrokerServer::shutdown);
+        cluster.brokers().values().forEach(BrokerServer::awaitShutdown);
     }
 
-    private String createLogDir() {
-        try {
-            return 
Files.createTempDirectory(getClass().getSimpleName()).toString();
-        } catch (IOException e) {
-            log.error("Unable to create temporary log directory", e);
-            throw new ConnectException("Unable to create temporary log 
directory", e);
+    public void stop() {
+        AtomicReference<Throwable> shutdownFailure = new AtomicReference<>();
+        Utils.closeQuietly(producer, "producer for embedded Kafka cluster", 
shutdownFailure);
+        Utils.closeQuietly(cluster, "embedded Kafka cluster", shutdownFailure);
+        if (shutdownFailure.get() != null) {
+            throw new ConnectException("Failed to shut down producer / 
embedded Kafka cluster", shutdownFailure.get());
         }
     }
 
     public String bootstrapServers() {
-        return Arrays.stream(brokers)
-                .map(this::address)
-                .collect(Collectors.joining(","));
-    }
-
-    public String address(KafkaServer server) {
-        final EndPoint endPoint = server.advertisedListeners().head();
-        return endPoint.host() + ":" + endPoint.port();
-    }
-
-    public String zKConnectString() {
-        return "127.0.0.1:" + zookeeper.port();
+        return cluster.bootstrapServers();
     }
 
     /**
      * Get the brokers that have a {@link BrokerState#RUNNING} state.
      *
-     * @return the list of {@link KafkaServer} instances that are running;
-     *         never null but  possibly empty
+     * @return the set of {@link BrokerServer} instances that are running;
+     *         never null but possibly empty
      */
-    public Set<KafkaServer> runningBrokers() {
+    public Set<BrokerServer> runningBrokers() {
         return brokersInState(state -> state == BrokerState.RUNNING);
     }
 
     /**
      * Get the brokers whose state match the given predicate.
      *
-     * @return the list of {@link KafkaServer} instances with states that 
match the predicate;
-     *         never null but  possibly empty
+     * @return the set of {@link BrokerServer} instances with states that 
match the predicate;
+     *         never null but possibly empty
      */
-    public Set<KafkaServer> brokersInState(Predicate<BrokerState> 
desiredState) {
-        return Arrays.stream(brokers)
-                     .filter(b -> hasState(b, desiredState))
-                     .collect(Collectors.toSet());
+    public Set<BrokerServer> brokersInState(Predicate<BrokerState> 
desiredState) {
+        return cluster.brokers().values().stream()
+                .filter(b -> hasState(b, desiredState))
+                .collect(Collectors.toSet());
     }
 
-    protected boolean hasState(KafkaServer server, Predicate<BrokerState> 
desiredState) {
+    protected boolean hasState(BrokerServer server, Predicate<BrokerState> 
desiredState) {
         try {
             return desiredState.test(server.brokerState());
         } catch (Throwable e) {
             // Broker failed to respond.
             return false;
         }
     }
-    
+
     public boolean sslEnabled() {
-        final String listeners = 
brokerConfig.getProperty(SocketServerConfigs.LISTENERS_CONFIG);
-        return listeners != null && listeners.contains("SSL");
+        final String listenerSecurityProtocolMap = 
brokerConfig.getProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG);
+        return listenerSecurityProtocolMap != null && 
listenerSecurityProtocolMap.contains("SSL");

Review Comment:
   I <3 pedantry, thanks for the catch! Updated to include `:` and to also 
recognize `SASL_SSL` as an SSL-enabled listener protocol.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -748,9 +625,10 @@ public KafkaProducer<byte[], byte[]> 
createProducer(Map<String, Object> producer
         return producer;
     }
 
-    private static void putIfAbsent(final Map<String, Object> props, final 
String propertyKey, final Object propertyValue) {
-        if (!props.containsKey(propertyKey)) {
-            props.put(propertyKey, propertyValue);
-        }
+    private void addDefaultBrokerPropsIfAbsent(Properties brokerConfig, int 
numBrokers) {
+        brokerConfig.putIfAbsent(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, 
"true");

Review Comment:
   Correct, we can remove. I think it was left over from a lateral refactor 
that moved from a custom `putIfAbsent` implementation to using the Java 
standard library.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to