This is an automated email from the ASF dual-hosted git repository.

fmariani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 978b6665b24 Adding confluent kafka instance type
978b6665b24 is described below

commit 978b6665b249c9f8071224bb4defbed4167254d3
Author: Salvatore Mongiardo <smong...@redhat.com>
AuthorDate: Thu Apr 17 10:56:23 2025 +0200

    Adding confluent kafka instance type
---
 .../apache/camel/catalog/test-infra/metadata.json  |  9 +++
 .../src/generated/resources/META-INF/metadata.json |  9 +++
 .../test/infra/kafka/common/KafkaProperties.java   |  1 +
 .../infra/kafka/services/ConfluentContainer.java   | 91 +++++++++++++++++++++
 .../kafka/services/ConfluentInfraService.java      | 93 ++++++++++++++++++++++
 .../infra/kafka/services/KafkaServiceFactory.java  |  6 ++
 6 files changed, 209 insertions(+)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/test-infra/metadata.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/test-infra/metadata.json
index 0e030762fa2..6ae17303bd6 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/test-infra/metadata.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/test-infra/metadata.json
@@ -124,6 +124,15 @@
   "groupId" : "org.apache.camel",
   "artifactId" : "camel-test-infra-kafka",
   "version" : "4.12.0-SNAPSHOT"
+}, {
+  "service" : "org.apache.camel.test.infra.kafka.services.KafkaInfraService",
+  "description" : "Apache Kafka, Distributed event streaming platform",
+  "implementation" : 
"org.apache.camel.test.infra.kafka.services.ConfluentInfraService",
+  "alias" : [ "kafka" ],
+  "aliasImplementation" : [ "confluent" ],
+  "groupId" : "org.apache.camel",
+  "artifactId" : "camel-test-infra-kafka",
+  "version" : "4.12.0-SNAPSHOT"
 }, {
   "service" : "org.apache.camel.test.infra.nats.services.NatsInfraService",
   "description" : "Messaging Platform NATS",
diff --git 
a/test-infra/camel-test-infra-all/src/generated/resources/META-INF/metadata.json
 
b/test-infra/camel-test-infra-all/src/generated/resources/META-INF/metadata.json
index 0e030762fa2..6ae17303bd6 100644
--- 
a/test-infra/camel-test-infra-all/src/generated/resources/META-INF/metadata.json
+++ 
b/test-infra/camel-test-infra-all/src/generated/resources/META-INF/metadata.json
@@ -124,6 +124,15 @@
   "groupId" : "org.apache.camel",
   "artifactId" : "camel-test-infra-kafka",
   "version" : "4.12.0-SNAPSHOT"
+}, {
+  "service" : "org.apache.camel.test.infra.kafka.services.KafkaInfraService",
+  "description" : "Apache Kafka, Distributed event streaming platform",
+  "implementation" : 
"org.apache.camel.test.infra.kafka.services.ConfluentInfraService",
+  "alias" : [ "kafka" ],
+  "aliasImplementation" : [ "confluent" ],
+  "groupId" : "org.apache.camel",
+  "artifactId" : "camel-test-infra-kafka",
+  "version" : "4.12.0-SNAPSHOT"
 }, {
   "service" : "org.apache.camel.test.infra.nats.services.NatsInfraService",
   "description" : "Messaging Platform NATS",
diff --git 
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/common/KafkaProperties.java
 
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/common/KafkaProperties.java
index 1b936895073..ce4de407b29 100644
--- 
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/common/KafkaProperties.java
+++ 
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/common/KafkaProperties.java
@@ -20,6 +20,7 @@ package org.apache.camel.test.infra.kafka.common;
 public final class KafkaProperties {
     public static final String KAFKA_BOOTSTRAP_SERVERS = 
"kafka.bootstrap.servers";
     public static final String KAFKA_ZOOKEEPER_ADDRESS = 
"kafka.zookeeper.address";
+    public static final String CONFLUENT_CONTAINER = 
"confluent.container.image";
     public static final String KAFKA_CONTAINER = "kafka.container";
     public static final String KAFKA3_CONTAINER = "kafka3.container";
     public static final String REDPANDA_CONTAINER = "redpanda.container.image";
diff --git 
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentContainer.java
 
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentContainer.java
new file mode 100644
index 00000000000..42d60218c53
--- /dev/null
+++ 
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentContainer.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.test.infra.kafka.services;
+
+import java.util.UUID;
+
+import com.github.dockerjava.api.command.CreateContainerCmd;
+import org.apache.camel.test.infra.common.LocalPropertyResolver;
+import org.apache.camel.test.infra.kafka.common.KafkaProperties;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+public class ConfluentContainer extends GenericContainer<ConfluentContainer> {
+    static final String CONFLUENT_CONTAINER = 
LocalPropertyResolver.getProperty(
+            ConfluentContainer.class,
+            KafkaProperties.CONFLUENT_CONTAINER);
+    private static final int KAFKA_PORT = 9092;
+
+    public ConfluentContainer(Network network, String name) {
+        this(network, name, CONFLUENT_CONTAINER);
+    }
+
+    public ConfluentContainer(Network network, String name, String 
containerName) {
+        super(containerName);
+
+        withEnv("LOG_DIR", "/tmp/logs")
+                .withExposedPorts(KAFKA_PORT)
+                .withEnv("KAFKA_BROKER_ID", "1")
+                .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
+                        
"BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT")
+                .withEnv("KAFKA_ADVERTISED_LISTENERS",
+                        String.format("PLAINTEXT://%s:9092,BROKER://%s:9093", 
getHost(), getHost()))
+                .withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")
+                .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
+                .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
+                .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
+                .withEnv("KAFKA_PROCESS_ROLES", "broker,controller")
+                .withEnv("KAFKA_NODE_ID", "1")
+                .withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@0.0.0.0:9094")
+                .withEnv("KAFKA_LISTENERS",
+                        
"PLAINTEXT://0.0.0.0:9092,BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094")
+                .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
+                .withEnv("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER")
+                .withEnv("KAFKA_LOG_DIRS", "/tmp/kraft-combined-logs")
+                .withEnv("KAFKA_REST_HOST_NAME", "rest-proxy")
+                .withEnv("KAFKA_REST_LISTENERS", 
String.format("http://%s:9092";, getHost()))
+                .withEnv("KAFKA_REST_BOOTSTRAP_SERVERS", "localhost:9092")
+                .withEnv("PATH", 
"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin")
+                .withEnv("container", "oci")
+                .withEnv("LANG", "C.UTF-8")
+                .withEnv("UB_CLASSPATH", "/usr/share/java/cp-base-lite/*")
+                .withEnv("KAFKA_ZOOKEEPER_CONNECT", "")
+                .withEnv("CLUSTER_ID", 
UUID.randomUUID().toString().replace("-", "").substring(0, 22))
+                .withNetwork(network)
+                .withCreateContainerCmdModifier(createContainerCmd -> 
setupContainer(name, createContainerCmd))
+                .withCommand("sh", "-c",
+                        "/etc/confluent/docker/run")
+                .waitingFor(Wait.forLogMessage(".*Kafka Server started.*", 1));
+    }
+
+    private void setupContainer(String name, CreateContainerCmd 
createContainerCmd) {
+        createContainerCmd.withHostName(name);
+        createContainerCmd.withName(name);
+    }
+
+    public int getKafkaPort() {
+        return getMappedPort(KAFKA_PORT);
+    }
+
+    @Override
+    public void start() {
+        addFixedExposedPort(KAFKA_PORT, KAFKA_PORT);
+        super.start();
+    }
+}
diff --git 
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentInfraService.java
 
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentInfraService.java
new file mode 100644
index 00000000000..42c79fc717b
--- /dev/null
+++ 
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentInfraService.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.test.infra.kafka.services;
+
+import org.apache.camel.spi.annotations.InfraService;
+import org.apache.camel.test.infra.common.TestUtils;
+import org.apache.camel.test.infra.common.services.ContainerService;
+import org.apache.camel.test.infra.kafka.common.KafkaProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+
+@InfraService(service = KafkaInfraService.class,
+              description = "Apache Kafka, Distributed event streaming 
platform",
+              serviceAlias = "kafka", serviceImplementationAlias = "confluent")
+public class ConfluentInfraService implements KafkaInfraService, 
ContainerService<ConfluentContainer> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ConfluentInfraService.class);
+
+    private final ConfluentContainer confluentContainer;
+
+    public ConfluentInfraService() {
+        this("confluent-" + TestUtils.randomWithRange(1, 100));
+    }
+
+    public ConfluentInfraService(String confluentInstanceName) {
+        Network network = Network.newNetwork();
+        confluentContainer = initConfluentContainer(network, 
confluentInstanceName);
+    }
+
+    public ConfluentInfraService(ConfluentContainer confluentContainer) {
+        this.confluentContainer = confluentContainer;
+    }
+
+    protected ConfluentContainer initConfluentContainer(Network network, 
String instanceName) {
+        return new ConfluentContainer(network, instanceName);
+    }
+
+    protected Integer getKafkaPort() {
+        return confluentContainer.getKafkaPort();
+    }
+
+    @Override
+    public String getBootstrapServers() {
+        return confluentContainer.getHost() + ":" + getKafkaPort();
+    }
+
+    @Override
+    public void registerProperties() {
+        System.setProperty(KafkaProperties.KAFKA_BOOTSTRAP_SERVERS, 
getBootstrapServers());
+    }
+
+    @Override
+    public void initialize() {
+        confluentContainer.start();
+
+        registerProperties();
+        LOG.info("Kafka bootstrap server running at address {}", 
getBootstrapServers());
+    }
+
+    private boolean stopped() {
+        return !confluentContainer.isRunning();
+    }
+
+    @Override
+    public void shutdown() {
+        try {
+            LOG.info("Stopping Kafka container");
+            confluentContainer.stop();
+        } finally {
+            TestUtils.waitFor(this::stopped);
+        }
+    }
+
+    @Override
+    public ConfluentContainer getContainer() {
+        return confluentContainer;
+    }
+}
diff --git 
a/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/KafkaServiceFactory.java
 
b/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/KafkaServiceFactory.java
index 0ca9ac80075..fc0fa2a7368 100644
--- 
a/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/KafkaServiceFactory.java
+++ 
b/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/KafkaServiceFactory.java
@@ -60,6 +60,7 @@ public final class KafkaServiceFactory {
 
         return 
builder.addLocalMapping(ContainerLocalKafkaService::kafka3Container)
                 .addMapping("local-strimzi-container", StrimziService::new)
+                .addMapping("local-confluent-container", ConfluentService::new)
                 .addRemoteMapping(RemoteKafkaService::new)
                 .addMapping("local-kafka3-container", 
ContainerLocalKafkaService::kafka3Container)
                 .addMapping("local-redpanda-container", RedpandaService::new)
@@ -80,6 +81,8 @@ public final class KafkaServiceFactory {
                     .addRemoteMapping(RemoteKafkaService::new)
                     .addMapping("local-kafka3-container",
                             () -> new 
SingletonKafkaService(ContainerLocalKafkaService.kafka3Container(), "kafka3"))
+                    .addMapping("local-confluent-container",
+                            () -> new SingletonKafkaService(new 
ConfluentService(), "confluent"))
                     .addMapping("local-strimzi-container",
                             () -> new SingletonKafkaService(new 
StrimziService(), "strimzi"))
                     .addMapping("local-redpanda-container",
@@ -104,6 +107,9 @@ public final class KafkaServiceFactory {
         }
     }
 
+    public static class ConfluentService extends ConfluentInfraService 
implements KafkaService {
+    }
+
     public static class StrimziService extends StrimziInfraService implements 
KafkaService {
     }
 

Reply via email to