C0urante commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1688407540
##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -95,98 +88,60 @@
import static
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-import static
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
-import static
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG;
-import static
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
-import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
/**
- * Setup an embedded Kafka cluster with specified number of brokers and
specified broker properties. To be used for
- * integration tests.
+ * Setup an embedded Kafka KRaft cluster (using {@link
kafka.testkit.KafkaClusterTestKit} internally) with the
+ * specified number of brokers and the specified broker properties. This can
be used for integration tests and is
+ * typically used in conjunction with {@link EmbeddedConnectCluster}.
Additional Kafka client properties can also be
+ * supplied if required. This class also provides various utility methods to
easily create Kafka topics, produce data,
+ * consume data etc.
*/
public class EmbeddedKafkaCluster {
private static final Logger log =
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
- private static final long DEFAULT_PRODUCE_SEND_DURATION_MS =
TimeUnit.SECONDS.toMillis(120);
+ private static final long DEFAULT_PRODUCE_SEND_DURATION_MS =
TimeUnit.SECONDS.toMillis(120);
- // Kafka Config
- private final KafkaServer[] brokers;
+ private final KafkaClusterTestKit cluster;
private final Properties brokerConfig;
- private final Time time = Time.SYSTEM;
- private final int[] currentBrokerPorts;
- private final String[] currentBrokerLogDirs;
- private final boolean hasListenerConfig;
+ private final Map<String, String> clientConfigs;
- final Map<String, String> clientConfigs;
-
- private EmbeddedZookeeper zookeeper = null;
- private ListenerName listenerName = new ListenerName("PLAINTEXT");
private KafkaProducer<byte[], byte[]> producer;
- public EmbeddedKafkaCluster(final int numBrokers,
- final Properties brokerConfig) {
+ public EmbeddedKafkaCluster(final int numBrokers, final Properties
brokerConfig) {
this(numBrokers, brokerConfig, Collections.emptyMap());
}
public EmbeddedKafkaCluster(final int numBrokers,
- final Properties brokerConfig,
- final Map<String, String> clientConfigs) {
- brokers = new KafkaServer[numBrokers];
- currentBrokerPorts = new int[numBrokers];
- currentBrokerLogDirs = new String[numBrokers];
- this.brokerConfig = brokerConfig;
- // Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we
track whether
- // a listener config is defined during initialization in order to know
if it's
- // safe to override it
- hasListenerConfig =
brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null;
+ final Properties brokerConfig,
+ final Map<String, String> clientConfigs) {
+ addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers);
+ try {
+ KafkaClusterTestKit.Builder clusterBuilder = new
KafkaClusterTestKit.Builder(
Review Comment:
I agree that a unified testing style will be better in the long run, but
we're pretty far away from that with Connect's integration tests.
For the vast majority of our tests, we can't easily switch over to the
`ClusterTestExtensions` API because our `EmbeddedConnectCluster` automatically
spins up its own `EmbeddedKafkaCluster`. If we used the annotation-based API,
we'd have to modify our `EmbeddedConnectCluster` class to not create its own
embedded Kafka cluster and to instead accept a `ClusterInstance` that's been
created and possibly started out-of-band, which would actually be less
convenient than our current embedded cluster API and IMO is not a path we
should pursue.
I also don't think that adopting the `ClusterTestExtensions` API piecemeal
in Connect (like the proposal in KAFKA-17174 to migrate the
`OffsetsApiIntegrationTest`) without a long-term strategy to migrate our entire
collection of integration tests is a good idea. Right now Connect uses a
consistent style for our all of our integration tests and fragmenting that
style will just make life harder for the people actually writing and
maintaining these tests.
TL;DR: While I'm also not opposed to adopting the `ClusterTestExtensions`
API in Connect, I don't think we should start work on that until we have a plan
in place for how we can do it for all of our integration tests instead of just
some.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]