mimaison commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1682557520


##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -95,98 +88,60 @@
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG;
-import static 
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
-import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
 
 /**
- * Setup an embedded Kafka cluster with specified number of brokers and 
specified broker properties. To be used for
- * integration tests.
+ * Setup an embedded Kafka KRaft cluster (using {@link 
kafka.testkit.KafkaClusterTestKit} internally) with the
+ * specified number of brokers and the specified broker properties. This can 
be used for integration tests and is
+ * typically used in conjunction with {@link EmbeddedConnectCluster}. 
Additional Kafka client properties can also be
+ * supplied if required. This class also provides various utility methods to 
easily create Kafka topics, produce data,
+ * consume data etc.
  */
 public class EmbeddedKafkaCluster {
 
     private static final Logger log = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
 
-    private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120); 
+    private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120);
 
-    // Kafka Config
-    private final KafkaServer[] brokers;
+    private final KafkaClusterTestKit cluster;
     private final Properties brokerConfig;
-    private final Time time = Time.SYSTEM;
-    private final int[] currentBrokerPorts;
-    private final String[] currentBrokerLogDirs;
-    private final boolean hasListenerConfig;
+    private final Map<String, String> clientConfigs;
 
-    final Map<String, String> clientConfigs;
-
-    private EmbeddedZookeeper zookeeper = null;
-    private ListenerName listenerName = new ListenerName("PLAINTEXT");
     private KafkaProducer<byte[], byte[]> producer;
 
-    public EmbeddedKafkaCluster(final int numBrokers,
-                                final Properties brokerConfig) {
+    public EmbeddedKafkaCluster(final int numBrokers, final Properties 
brokerConfig) {
         this(numBrokers, brokerConfig, Collections.emptyMap());
     }
 
     public EmbeddedKafkaCluster(final int numBrokers,
-                                final Properties brokerConfig,
-                                final Map<String, String> clientConfigs) {
-        brokers = new KafkaServer[numBrokers];
-        currentBrokerPorts = new int[numBrokers];
-        currentBrokerLogDirs = new String[numBrokers];
-        this.brokerConfig = brokerConfig;
-        // Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we 
track whether
-        // a listener config is defined during initialization in order to know 
if it's
-        // safe to override it
-        hasListenerConfig = 
brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null;
+                                   final Properties brokerConfig,
+                                   final Map<String, String> clientConfigs) {
+        addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers);
+        try {
+            KafkaClusterTestKit.Builder clusterBuilder = new 
KafkaClusterTestKit.Builder(
+                    new TestKitNodes.Builder()
+                            .setCombined(true)

Review Comment:
   Have you tried having a separate controller? My concern is that we have a 
method `stopOnlyBrokers()` which would stop controllers too in combined mode.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -748,9 +625,10 @@ public KafkaProducer<byte[], byte[]> 
createProducer(Map<String, Object> producer
         return producer;
     }
 
-    private static void putIfAbsent(final Map<String, Object> props, final 
String propertyKey, final Object propertyValue) {
-        if (!props.containsKey(propertyKey)) {
-            props.put(propertyKey, propertyValue);
-        }
+    private void addDefaultBrokerPropsIfAbsent(Properties brokerConfig, int 
numBrokers) {
+        brokerConfig.putIfAbsent(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, 
"true");

Review Comment:
   Do we need that? This is the default value



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##########
@@ -694,12 +701,29 @@ public void testTasksFailOnInabilityToFence() throws 
Exception {
             )).all().get();
         }
 
-        StartAndStopLatch connectorStart = connectorAndTaskStart(tasksMax);
-
         log.info("Bringing up connector with fresh slate; fencing should not 
be necessary");
         connect.configureConnector(CONNECTOR_NAME, props);
-        assertConnectorStarted(connectorStart);
-        // Verify that the connector and its tasks have been able to start 
successfully
+
+        // Hack: There is a small chance that our recent ACL updates for the 
connector have
+        // not yet been propagated across the entire Kafka cluster, and that 
our connector
+        // will fail on startup when it tries to list the end offsets of the 
worker's offsets topic
+        // So, we implement some retry logic here to add a layer of resiliency 
in that case
+        waitForCondition(
+                () -> {
+                    ConnectorStateInfo status = 
connect.connectorStatus(CONNECTOR_NAME);
+                    if ("RUNNING".equals(status.connector().state())) {
+                        return true;
+                    } else if ("FAILED".equals(status.connector().state())) {
+                        log.debug("Restarting failed connector {}", 
CONNECTOR_NAME);
+                        connect.restartConnector(CONNECTOR_NAME);
+                    }
+                    return false;
+                },
+                30_000,

Review Comment:
   Could we use `VALIDATION_DURATION_MS` here?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -199,149 +154,72 @@ private void doStart() {
         producer = new KafkaProducer<>(producerProps, new 
ByteArraySerializer(), new ByteArraySerializer());
     }
 
-    public void stopOnlyKafka() {
-        stop(false, false);
-    }
-
-    public void stop() {
-        stop(true, true);
-    }
-
-    private void stop(boolean deleteLogDirs, boolean stopZK) {
-        maybeShutDownProducer();
-        triggerBrokerShutdown();
-        awaitBrokerShutdown();
-
-        if (deleteLogDirs)
-            deleteLogDirs();
-
-        if (stopZK)
-            stopZK();
-    }
-
-    private void maybeShutDownProducer() {
-        try {
-            if (producer != null) {
-                producer.close();
-            }
-        } catch (Exception e) {
-            log.error("Could not shutdown producer ", e);
-            throw new RuntimeException("Could not shutdown producer", e);
-        }
-    }
-
-    private void triggerBrokerShutdown() {
-        for (KafkaServer broker : brokers) {
-            try {
-                broker.shutdown();
-            } catch (Throwable t) {
-                String msg = String.format("Could not shutdown broker at %s", 
address(broker));
-                log.error(msg, t);
-                throw new RuntimeException(msg, t);
-            }
-        }
-    }
-
-    private void awaitBrokerShutdown() {
-        for (KafkaServer broker : brokers) {
-            try {
-                broker.awaitShutdown();
-            } catch (Throwable t) {
-                String msg = String.format("Failed while awaiting shutdown of 
broker at %s", address(broker));
-                log.error(msg, t);
-                throw new RuntimeException(msg, t);
-            }
-        }
-    }
-
-    private void deleteLogDirs() {
-        for (KafkaServer broker : brokers) {
-            try {
-                log.info("Cleaning up kafka log dirs at {}", 
broker.config().logDirs());
-                CoreUtils.delete(broker.config().logDirs());
-            } catch (Throwable t) {
-                String msg = String.format("Could not clean up log dirs for 
broker at %s",
-                        address(broker));
-                log.error(msg, t);
-                throw new RuntimeException(msg, t);
-            }
-        }
-    }
-
-    private void stopZK() {
-        try {
-            zookeeper.shutdown();
-        } catch (Throwable t) {
-            String msg = String.format("Could not shutdown zookeeper at %s", 
zKConnectString());
-            log.error(msg, t);
-            throw new RuntimeException(msg, t);
-        }
+    /**
+     * Restarts the Kafka brokers. This can be called after {@link 
#stopOnlyBrokers()}. Note that if the Kafka brokers
+     * need to be listening on the same ports as earlier, the {@link 
#brokerConfig} should contain the
+     * {@link SocketServerConfigs#LISTENERS_CONFIG} property and it should use 
a fixed non-zero free port. Also note that this is
+     * only possible when {@code numBrokers} is 1.
+     */
+    public void restartOnlyBrokers() {
+        cluster.brokers().values().forEach(BrokerServer::startup);
     }
 
-    private static void putIfAbsent(final Properties props, final String 
propertyKey, final Object propertyValue) {
-        if (!props.containsKey(propertyKey)) {
-            props.put(propertyKey, propertyValue);
-        }
+    /**
+     * Stop only the Kafka brokers (and not the KRaft controllers). This can 
be used to test Connect's functionality
+     * when the backing Kafka cluster goes offline.
+     */
+    public void stopOnlyBrokers() {
+        cluster.brokers().values().forEach(BrokerServer::shutdown);
+        cluster.brokers().values().forEach(BrokerServer::awaitShutdown);
     }
 
-    private String createLogDir() {
-        try {
-            return 
Files.createTempDirectory(getClass().getSimpleName()).toString();
-        } catch (IOException e) {
-            log.error("Unable to create temporary log directory", e);
-            throw new ConnectException("Unable to create temporary log 
directory", e);
+    public void stop() {
+        AtomicReference<Throwable> shutdownFailure = new AtomicReference<>();
+        Utils.closeQuietly(producer, "producer for embedded Kafka cluster", 
shutdownFailure);
+        Utils.closeQuietly(cluster, "embedded Kafka cluster", shutdownFailure);
+        if (shutdownFailure.get() != null) {
+            throw new ConnectException("Failed to shut down producer / 
embedded Kafka cluster", shutdownFailure.get());
         }
     }
 
     public String bootstrapServers() {
-        return Arrays.stream(brokers)
-                .map(this::address)
-                .collect(Collectors.joining(","));
-    }
-
-    public String address(KafkaServer server) {
-        final EndPoint endPoint = server.advertisedListeners().head();
-        return endPoint.host() + ":" + endPoint.port();
-    }
-
-    public String zKConnectString() {
-        return "127.0.0.1:" + zookeeper.port();
+        return cluster.bootstrapServers();
     }
 
     /**
      * Get the brokers that have a {@link BrokerState#RUNNING} state.
      *
-     * @return the list of {@link KafkaServer} instances that are running;
-     *         never null but  possibly empty
+     * @return the set of {@link BrokerServer} instances that are running;
+     *         never null but possibly empty
      */
-    public Set<KafkaServer> runningBrokers() {
+    public Set<BrokerServer> runningBrokers() {
         return brokersInState(state -> state == BrokerState.RUNNING);
     }
 
     /**
      * Get the brokers whose state match the given predicate.
      *
-     * @return the list of {@link KafkaServer} instances with states that 
match the predicate;
-     *         never null but  possibly empty
+     * @return the set of {@link BrokerServer} instances with states that 
match the predicate;
+     *         never null but possibly empty
      */
-    public Set<KafkaServer> brokersInState(Predicate<BrokerState> 
desiredState) {
-        return Arrays.stream(brokers)
-                     .filter(b -> hasState(b, desiredState))
-                     .collect(Collectors.toSet());
+    public Set<BrokerServer> brokersInState(Predicate<BrokerState> 
desiredState) {
+        return cluster.brokers().values().stream()
+                .filter(b -> hasState(b, desiredState))
+                .collect(Collectors.toSet());
     }
 
-    protected boolean hasState(KafkaServer server, Predicate<BrokerState> 
desiredState) {
+    protected boolean hasState(BrokerServer server, Predicate<BrokerState> 
desiredState) {
         try {
             return desiredState.test(server.brokerState());
         } catch (Throwable e) {
             // Broker failed to respond.
             return false;
         }
     }
-    
+
     public boolean sslEnabled() {
-        final String listeners = 
brokerConfig.getProperty(SocketServerConfigs.LISTENERS_CONFIG);
-        return listeners != null && listeners.contains("SSL");
+        final String listenerSecurityProtocolMap = 
brokerConfig.getProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG);
+        return listenerSecurityProtocolMap != null && 
listenerSecurityProtocolMap.contains("SSL");

Review Comment:
   This was already there before but to be pedantic we'd need `:SSL` in the 
protocol map to be sure SSL is enabled. This would return `true` for 
`NOT_SSL:PLAINTEXT`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to