This is an automated email from the ASF dual-hosted git repository. fmariani pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 978b6665b24 Adding confluent kafka instance type 978b6665b24 is described below commit 978b6665b249c9f8071224bb4defbed4167254d3 Author: Salvatore Mongiardo <smong...@redhat.com> AuthorDate: Thu Apr 17 10:56:23 2025 +0200 Adding confluent kafka instance type --- .../apache/camel/catalog/test-infra/metadata.json | 9 +++ .../src/generated/resources/META-INF/metadata.json | 9 +++ .../test/infra/kafka/common/KafkaProperties.java | 1 + .../infra/kafka/services/ConfluentContainer.java | 91 +++++++++++++++++++++ .../kafka/services/ConfluentInfraService.java | 93 ++++++++++++++++++++++ .../infra/kafka/services/KafkaServiceFactory.java | 6 ++ 6 files changed, 209 insertions(+) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/test-infra/metadata.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/test-infra/metadata.json index 0e030762fa2..6ae17303bd6 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/test-infra/metadata.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/test-infra/metadata.json @@ -124,6 +124,15 @@ "groupId" : "org.apache.camel", "artifactId" : "camel-test-infra-kafka", "version" : "4.12.0-SNAPSHOT" +}, { + "service" : "org.apache.camel.test.infra.kafka.services.KafkaInfraService", + "description" : "Apache Kafka, Distributed event streaming platform", + "implementation" : "org.apache.camel.test.infra.kafka.services.ConfluentInfraService", + "alias" : [ "kafka" ], + "aliasImplementation" : [ "confluent" ], + "groupId" : "org.apache.camel", + "artifactId" : "camel-test-infra-kafka", + "version" : "4.12.0-SNAPSHOT" }, { "service" : "org.apache.camel.test.infra.nats.services.NatsInfraService", "description" : "Messaging Platform NATS", diff --git a/test-infra/camel-test-infra-all/src/generated/resources/META-INF/metadata.json b/test-infra/camel-test-infra-all/src/generated/resources/META-INF/metadata.json index 0e030762fa2..6ae17303bd6 100644 --- a/test-infra/camel-test-infra-all/src/generated/resources/META-INF/metadata.json +++ b/test-infra/camel-test-infra-all/src/generated/resources/META-INF/metadata.json @@ -124,6 +124,15 @@ "groupId" : "org.apache.camel", "artifactId" : "camel-test-infra-kafka", "version" : "4.12.0-SNAPSHOT" +}, { + "service" : "org.apache.camel.test.infra.kafka.services.KafkaInfraService", + "description" : "Apache Kafka, Distributed event streaming platform", + "implementation" : "org.apache.camel.test.infra.kafka.services.ConfluentInfraService", + "alias" : [ "kafka" ], + "aliasImplementation" : [ "confluent" ], + "groupId" : "org.apache.camel", + "artifactId" : "camel-test-infra-kafka", + "version" : "4.12.0-SNAPSHOT" }, { "service" : "org.apache.camel.test.infra.nats.services.NatsInfraService", "description" : "Messaging Platform NATS", diff --git a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/common/KafkaProperties.java b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/common/KafkaProperties.java index 1b936895073..ce4de407b29 100644 --- a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/common/KafkaProperties.java +++ b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/common/KafkaProperties.java @@ -20,6 +20,7 @@ package org.apache.camel.test.infra.kafka.common; public final class KafkaProperties { public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; public static final String KAFKA_ZOOKEEPER_ADDRESS = "kafka.zookeeper.address"; + public static final String CONFLUENT_CONTAINER = "confluent.container.image"; public static final String KAFKA_CONTAINER = "kafka.container"; public static final String KAFKA3_CONTAINER = "kafka3.container"; public static final String REDPANDA_CONTAINER = "redpanda.container.image"; diff --git a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentContainer.java b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentContainer.java new file mode 100644 index 00000000000..42d60218c53 --- /dev/null +++ b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentContainer.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.test.infra.kafka.services; + +import java.util.UUID; + +import com.github.dockerjava.api.command.CreateContainerCmd; +import org.apache.camel.test.infra.common.LocalPropertyResolver; +import org.apache.camel.test.infra.kafka.common.KafkaProperties; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; + +public class ConfluentContainer extends GenericContainer<ConfluentContainer> { + static final String CONFLUENT_CONTAINER = LocalPropertyResolver.getProperty( + ConfluentContainer.class, + KafkaProperties.CONFLUENT_CONTAINER); + private static final int KAFKA_PORT = 9092; + + public ConfluentContainer(Network network, String name) { + this(network, name, CONFLUENT_CONTAINER); + } + + public ConfluentContainer(Network network, String name, String containerName) { + super(containerName); + + withEnv("LOG_DIR", "/tmp/logs") + .withExposedPorts(KAFKA_PORT) + .withEnv("KAFKA_BROKER_ID", "1") + .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", + "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT") + .withEnv("KAFKA_ADVERTISED_LISTENERS", + String.format("PLAINTEXT://%s:9092,BROKER://%s:9093", getHost(), getHost())) + .withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0") + .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1") + .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1") + .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1") + .withEnv("KAFKA_PROCESS_ROLES", "broker,controller") + .withEnv("KAFKA_NODE_ID", "1") + .withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@0.0.0.0:9094") + .withEnv("KAFKA_LISTENERS", + "PLAINTEXT://0.0.0.0:9092,BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094") + .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER") + .withEnv("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER") + .withEnv("KAFKA_LOG_DIRS", "/tmp/kraft-combined-logs") + .withEnv("KAFKA_REST_HOST_NAME", "rest-proxy") + .withEnv("KAFKA_REST_LISTENERS", String.format("http://%s:9092", getHost())) + .withEnv("KAFKA_REST_BOOTSTRAP_SERVERS", "localhost:9092") + .withEnv("PATH", "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin") + .withEnv("container", "oci") + .withEnv("LANG", "C.UTF-8") + .withEnv("UB_CLASSPATH", "/usr/share/java/cp-base-lite/*") + .withEnv("KAFKA_ZOOKEEPER_CONNECT", "") + .withEnv("CLUSTER_ID", UUID.randomUUID().toString().replace("-", "").substring(0, 22)) + .withNetwork(network) + .withCreateContainerCmdModifier(createContainerCmd -> setupContainer(name, createContainerCmd)) + .withCommand("sh", "-c", + "/etc/confluent/docker/run") + .waitingFor(Wait.forLogMessage(".*Kafka Server started.*", 1)); + } + + private void setupContainer(String name, CreateContainerCmd createContainerCmd) { + createContainerCmd.withHostName(name); + createContainerCmd.withName(name); + } + + public int getKafkaPort() { + return getMappedPort(KAFKA_PORT); + } + + @Override + public void start() { + addFixedExposedPort(KAFKA_PORT, KAFKA_PORT); + super.start(); + } +} diff --git a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentInfraService.java b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentInfraService.java new file mode 100644 index 00000000000..42c79fc717b --- /dev/null +++ b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentInfraService.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.test.infra.kafka.services; + +import org.apache.camel.spi.annotations.InfraService; +import org.apache.camel.test.infra.common.TestUtils; +import org.apache.camel.test.infra.common.services.ContainerService; +import org.apache.camel.test.infra.kafka.common.KafkaProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; + +@InfraService(service = KafkaInfraService.class, + description = "Apache Kafka, Distributed event streaming platform", + serviceAlias = "kafka", serviceImplementationAlias = "confluent") +public class ConfluentInfraService implements KafkaInfraService, ContainerService<ConfluentContainer> { + private static final Logger LOG = LoggerFactory.getLogger(ConfluentInfraService.class); + + private final ConfluentContainer confluentContainer; + + public ConfluentInfraService() { + this("confluent-" + TestUtils.randomWithRange(1, 100)); + } + + public ConfluentInfraService(String confluentInstanceName) { + Network network = Network.newNetwork(); + confluentContainer = initConfluentContainer(network, confluentInstanceName); + } + + public ConfluentInfraService(ConfluentContainer confluentContainer) { + this.confluentContainer = confluentContainer; + } + + protected ConfluentContainer initConfluentContainer(Network network, String instanceName) { + return new ConfluentContainer(network, instanceName); + } + + protected Integer getKafkaPort() { + return confluentContainer.getKafkaPort(); + } + + @Override + public String getBootstrapServers() { + return confluentContainer.getHost() + ":" + getKafkaPort(); + } + + @Override + public void registerProperties() { + System.setProperty(KafkaProperties.KAFKA_BOOTSTRAP_SERVERS, getBootstrapServers()); + } + + @Override + public void initialize() { + confluentContainer.start(); + + registerProperties(); + LOG.info("Kafka bootstrap server running at address {}", getBootstrapServers()); + } + + private boolean stopped() { + return !confluentContainer.isRunning(); + } + + @Override + public void shutdown() { + try { + LOG.info("Stopping Kafka container"); + confluentContainer.stop(); + } finally { + TestUtils.waitFor(this::stopped); + } + } + + @Override + public ConfluentContainer getContainer() { + return confluentContainer; + } +} diff --git a/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/KafkaServiceFactory.java b/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/KafkaServiceFactory.java index 0ca9ac80075..fc0fa2a7368 100644 --- a/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/KafkaServiceFactory.java +++ b/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/KafkaServiceFactory.java @@ -60,6 +60,7 @@ public final class KafkaServiceFactory { return builder.addLocalMapping(ContainerLocalKafkaService::kafka3Container) .addMapping("local-strimzi-container", StrimziService::new) + .addMapping("local-confluent-container", ConfluentService::new) .addRemoteMapping(RemoteKafkaService::new) .addMapping("local-kafka3-container", ContainerLocalKafkaService::kafka3Container) .addMapping("local-redpanda-container", RedpandaService::new) @@ -80,6 +81,8 @@ public final class KafkaServiceFactory { .addRemoteMapping(RemoteKafkaService::new) .addMapping("local-kafka3-container", () -> new SingletonKafkaService(ContainerLocalKafkaService.kafka3Container(), "kafka3")) + .addMapping("local-confluent-container", + () -> new SingletonKafkaService(new ConfluentService(), "confluent")) .addMapping("local-strimzi-container", () -> new SingletonKafkaService(new StrimziService(), "strimzi")) .addMapping("local-redpanda-container", @@ -104,6 +107,9 @@ public final class KafkaServiceFactory { } } + public static class ConfluentService extends ConfluentInfraService implements KafkaService { + } + public static class StrimziService extends StrimziInfraService implements KafkaService { }