This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 307bcdf7a50 CAMEL-20593 use ASF docker image (#15926)
307bcdf7a50 is described below

commit 307bcdf7a50a779d02de3ef15849d3c543d199c1
Author: Jono Morris <j...@apache.org>
AuthorDate: Sat Oct 12 18:47:47 2024 +1300

    CAMEL-20593 use ASF docker image (#15926)
---
 ...KafkaConsumerAutoInstResumeRouteStrategyIT.java |  1 +
 .../KafkaConsumerIdempotentGroupIdIT.java          | 30 ++++++++++++++++----
 .../integration/KafkaConsumerIdempotentIT.java     | 30 ++++++++++++++++----
 ...kaConsumerIdempotentWithCustomSerializerIT.java | 32 +++++++++++++++------
 .../KafkaConsumerIdempotentWithProcessorIT.java    | 33 +++++++++++++++++-----
 .../kafka/integration/common/KafkaTestUtil.java    | 27 ++++++++++++++++++
 .../kafka/KafkaIdempotentRepositoryEagerIT.java    | 11 +++++++-
 .../kafka/KafkaIdempotentRepositoryNonEagerIT.java | 10 ++++++-
 .../KafkaIdempotentRepositoryPersistenceIT.java    | 15 ++++++++--
 .../services/ContainerLocalAuthKafkaService.java   |  4 +--
 .../kafka/services/ContainerLocalKafkaService.java |  7 ++---
 .../test/infra/kafka/services/container.properties |  2 +-
 12 files changed, 161 insertions(+), 41 deletions(-)

diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java
index ece0f903fe5..8c61cd65eb6 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java
@@ -53,6 +53,7 @@ public class KafkaConsumerAutoInstResumeRouteStrategyIT 
extends BaseKafkaTestSup
     @BeforeEach
     public void before() {
         Properties props = KafkaTestUtil.getDefaultProperties(service);
+        KafkaTestUtil.createTopic(service, TOPIC, 1);
         KafkaProducer<Object, Object> producer = new KafkaProducer<>(props);
 
         for (int i = 0; i < 10; i++) {
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentGroupIdIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentGroupIdIT.java
index edd237cef27..316da6c8c51 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentGroupIdIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentGroupIdIT.java
@@ -16,14 +16,17 @@
  */
 package org.apache.camel.component.kafka.integration;
 
-import java.util.Arrays;
+import java.util.Collections;
+import java.util.UUID;
 
 import org.apache.camel.BindToRegistry;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.kafka.integration.common.KafkaTestUtil;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
@@ -34,23 +37,38 @@ import static 
org.apache.camel.component.kafka.serde.KafkaSerdeHelper.numericHea
 @DisabledIfSystemProperty(named = "enable.kafka.consumer.idempotency.tests", 
matches = "false")
 public class KafkaConsumerIdempotentGroupIdIT extends 
KafkaConsumerIdempotentTestSupport {
 
-    public static final String TOPIC = "idempt";
-
+    private static final String TOPIC;
+    private static final String REPOSITORY_TOPIC;
     private final int size = 200;
 
+    static {
+        UUID topicId = UUID.randomUUID();
+        TOPIC = "idempt_" + topicId;
+        REPOSITORY_TOPIC = "TEST_IDEMPOTENT_" + topicId;
+    }
+
+    @BeforeAll
+    public static void createRepositoryTopic() {
+        KafkaTestUtil.createTopic(service, REPOSITORY_TOPIC, 1);
+    }
+
+    @AfterAll
+    public static void removeRepositoryTopic() {
+        
kafkaAdminClient.deleteTopics(Collections.singleton(REPOSITORY_TOPIC)).all();
+    }
+
     @BindToRegistry("kafkaIdempotentRepository")
     private final KafkaIdempotentRepository testIdempotent
-            = new KafkaIdempotentRepository("TEST_IDEMPOTENT", 
getBootstrapServers(), "test_1");
+            = new KafkaIdempotentRepository(REPOSITORY_TOPIC, 
getBootstrapServers(), "test_1");
 
     @BeforeEach
     public void before() {
-        kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, 
"TEST_IDEMPOTENT")).all();
         doSend(size, TOPIC);
     }
 
     @AfterEach
     public void after() {
-        kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, 
"TEST_IDEMPOTENT")).all();
+        kafkaAdminClient.deleteTopics(Collections.singleton(TOPIC)).all();
     }
 
     protected RouteBuilder createRouteBuilder() {
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
index 242e2488755..3e979fa52c3 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
@@ -16,14 +16,17 @@
  */
 package org.apache.camel.component.kafka.integration;
 
-import java.util.Arrays;
+import java.util.Collections;
+import java.util.UUID;
 
 import org.apache.camel.BindToRegistry;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.kafka.integration.common.KafkaTestUtil;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Tag;
@@ -39,23 +42,38 @@ import static 
org.apache.camel.component.kafka.serde.KafkaSerdeHelper.numericHea
 @Tags({ @Tag("idempotent") })
 public class KafkaConsumerIdempotentIT extends 
KafkaConsumerIdempotentTestSupport {
 
-    public static final String TOPIC = "idempt";
-
+    private static final String TOPIC;
+    private static final String REPOSITORY_TOPIC;
     private final int size = 200;
 
+    static {
+        UUID topicId = UUID.randomUUID();
+        TOPIC = "idempt_" + topicId;
+        REPOSITORY_TOPIC = "TEST_IDEMPOTENT_" + topicId;
+    }
+
+    @BeforeAll
+    public static void createRepositoryTopic() {
+        KafkaTestUtil.createTopic(service, REPOSITORY_TOPIC, 1);
+    }
+
+    @AfterAll
+    public static void removeRepositoryTopic() {
+        
kafkaAdminClient.deleteTopics(Collections.singleton(REPOSITORY_TOPIC)).all();
+    }
+
     @BindToRegistry("kafkaIdempotentRepository")
     private final KafkaIdempotentRepository testIdempotent
-            = new KafkaIdempotentRepository("TEST_IDEMPOTENT", 
getBootstrapServers());
+            = new KafkaIdempotentRepository(REPOSITORY_TOPIC, 
getBootstrapServers());
 
     @BeforeEach
     public void before() {
-        kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, 
"TEST_IDEMPOTENT")).all();
         doSend(size, TOPIC);
     }
 
     @AfterEach
     public void after() {
-        kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, 
"TEST_IDEMPOTENT")).all();
+        kafkaAdminClient.deleteTopics(Collections.singleton(TOPIC)).all();
     }
 
     protected RouteBuilder createRouteBuilder() {
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
index e95d6f8066a..c504b41ca08 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
@@ -16,36 +16,50 @@
  */
 package org.apache.camel.component.kafka.integration;
 
-import java.util.Arrays;
+import java.util.Collections;
+import java.util.UUID;
 
 import org.apache.camel.BindToRegistry;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.kafka.integration.common.KafkaTestUtil;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.*;
 
 public class KafkaConsumerIdempotentWithCustomSerializerIT extends 
KafkaConsumerIdempotentTestSupport {
 
-    public static final String TOPIC = "idempt2";
-
+    private static final String TOPIC;
+    private static final String REPOSITORY_TOPIC;
     private final int size = 200;
 
+    static {
+        UUID topicId = UUID.randomUUID();
+        TOPIC = "idempt_" + topicId;
+        REPOSITORY_TOPIC = "TEST_IDEMPOTENT_" + topicId;
+    }
+
+    @BeforeAll
+    public static void createRepositoryTopic() {
+        KafkaTestUtil.createTopic(service, REPOSITORY_TOPIC, 1);
+    }
+
+    @AfterAll
+    public static void removeRepositoryTopic() {
+        
kafkaAdminClient.deleteTopics(Collections.singleton(REPOSITORY_TOPIC)).all();
+    }
+
     @BindToRegistry("kafkaIdempotentRepository")
     private final KafkaIdempotentRepository kafkaIdempotentRepository
-            = new KafkaIdempotentRepository("TEST_IDEMPOTENT", 
getBootstrapServers());
+            = new KafkaIdempotentRepository(REPOSITORY_TOPIC, 
getBootstrapServers());
 
     @BeforeEach
     public void before() {
-        kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, 
"TEST_IDEMPOTENT")).all();
         doSend(size, TOPIC);
     }
 
     @AfterEach
     public void after() {
-        kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, 
"TEST_IDEMPOTENT")).all();
+        kafkaAdminClient.deleteTopics(Collections.singleton(TOPIC)).all();
     }
 
     protected RouteBuilder createRouteBuilder() {
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
index 1c89b7ec7cf..a0bb4bf6ea3 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
@@ -17,35 +17,54 @@
 package org.apache.camel.component.kafka.integration;
 
 import java.math.BigInteger;
-import java.util.Arrays;
+import java.util.Collections;
+import java.util.UUID;
 
 import org.apache.camel.BindToRegistry;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.kafka.integration.common.KafkaTestUtil;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 public class KafkaConsumerIdempotentWithProcessorIT extends 
KafkaConsumerIdempotentTestSupport {
-    public static final String TOPIC = "testidemp3";
-
+    private static final String TOPIC;
+    private static final String REPOSITORY_TOPIC;
     private final int size = 200;
+
+    static {
+        UUID topicId = UUID.randomUUID();
+        TOPIC = "idempt_" + topicId;
+        REPOSITORY_TOPIC = "TEST_IDEMPOTENT_" + topicId;
+    }
+
+    @BeforeAll
+    public static void createRepositoryTopic() {
+        KafkaTestUtil.createTopic(service, REPOSITORY_TOPIC, 1);
+    }
+
+    @AfterAll
+    public static void removeRepositoryTopic() {
+        
kafkaAdminClient.deleteTopics(Collections.singleton(REPOSITORY_TOPIC)).all();
+    }
+
     @BindToRegistry("kafkaIdempotentRepository")
     private final KafkaIdempotentRepository kafkaIdempotentRepository
-            = new KafkaIdempotentRepository("TEST_IDEMPOTENT", 
getBootstrapServers());
+            = new KafkaIdempotentRepository(REPOSITORY_TOPIC, 
getBootstrapServers());
 
     @BeforeEach
     public void before() {
-        kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, 
"TEST_IDEMPOTENT")).all();
         doSend(size, TOPIC);
     }
 
     @AfterEach
     public void after() {
-        // clean all test topics
-        kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, 
"TEST_IDEMPOTENT")).all();
+        // clean test topic
+        kafkaAdminClient.deleteTopics(Collections.singleton(TOPIC)).all();
     }
 
     @Override
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/common/KafkaTestUtil.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/common/KafkaTestUtil.java
index 8d4d15e2361..d173d8d0c49 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/common/KafkaTestUtil.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/common/KafkaTestUtil.java
@@ -17,7 +17,10 @@
 
 package org.apache.camel.component.kafka.integration.common;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.component.kafka.KafkaComponent;
@@ -26,10 +29,18 @@ import 
org.apache.camel.test.infra.kafka.services.KafkaService;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public final class KafkaTestUtil {
     public static final String MOCK_RESULT = "mock:result";
     public static final String MOCK_RESULT_BAR = "mock:resultBar";
@@ -78,4 +89,20 @@ public final class KafkaTestUtil {
         kafka.getConfiguration().setBrokers(bootstrapServers);
         context.addComponent("kafka", kafka);
     }
+
+    public static void createTopic(KafkaService service, String topic, int 
numPartitions) {
+        AdminClient kafkaAdminClient = createAdminClient(service);
+        NewTopic testTopic = new NewTopic(topic, numPartitions, 
CreateTopicsRequest.NO_REPLICATION_FACTOR);
+        kafkaAdminClient.createTopics(Collections.singleton(testTopic));
+        KafkaFuture<TopicDescription> tdFuture
+                = 
kafkaAdminClient.describeTopics(Collections.singletonList(topic)).topicNameValues().get(topic);
+
+        try {
+            TopicDescription td = tdFuture.get(5L, TimeUnit.SECONDS);
+            List<TopicPartitionInfo> pi = td.partitions();
+            assertEquals(numPartitions, pi.size());
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
 }
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerIT.java
index b08ef00b21f..d758afda1fb 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerIT.java
@@ -22,7 +22,9 @@ import org.apache.camel.BindToRegistry;
 import org.apache.camel.CamelExecutionException;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.integration.common.KafkaTestUtil;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -32,9 +34,16 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
  */
 public class KafkaIdempotentRepositoryEagerIT extends SimpleIdempotentTest {
 
+    private static final String REPOSITORY_TOPIC = "TEST_EAGER_" + 
UUID.randomUUID();
+
+    @BeforeAll
+    public static void createRepositoryTopic() {
+        KafkaTestUtil.createTopic(service, REPOSITORY_TOPIC, 1);
+    }
+
     @BindToRegistry("kafkaIdempotentRepositoryEager")
     private final KafkaIdempotentRepository idempotentRepository
-            = new KafkaIdempotentRepository("TEST_EAGER_" + UUID.randomUUID(), 
service.getBootstrapServers());
+            = new KafkaIdempotentRepository(REPOSITORY_TOPIC, 
service.getBootstrapServers());
 
     @Override
     protected RouteBuilder createRouteBuilder() {
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerIT.java
index 8c47892e7e1..985848008a4 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerIT.java
@@ -27,6 +27,7 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.kafka.integration.common.KafkaTestUtil;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.infra.core.annotations.ContextFixture;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.MethodOrderer;
 import org.junit.jupiter.api.Order;
@@ -42,9 +43,16 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
 public class KafkaIdempotentRepositoryNonEagerIT extends SimpleIdempotentTest {
 
+    private static final String REPOSITORY_TOPIC = "TEST_NON_EAGER_" + 
UUID.randomUUID();
+
+    @BeforeAll
+    public static void createRepositoryTopic() {
+        KafkaTestUtil.createTopic(service, REPOSITORY_TOPIC, 1);
+    }
+
     @BindToRegistry("kafkaIdempotentRepositoryNonEager")
     private final KafkaIdempotentRepository kafkaIdempotentRepository
-            = new KafkaIdempotentRepository("TEST_NON_EAGER_" + 
UUID.randomUUID(), service.getBootstrapServers());
+            = new KafkaIdempotentRepository(REPOSITORY_TOPIC, 
service.getBootstrapServers());
 
     @ContextFixture
     public void configureKafka(CamelContext context) {
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
index 1de12059224..83dca938bfc 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
@@ -16,7 +16,8 @@
  */
 package org.apache.camel.processor.idempotent.kafka;
 
-import java.util.Arrays;
+import java.util.Collections;
+import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
@@ -25,9 +26,11 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.kafka.integration.BaseKafkaTestSupport;
+import org.apache.camel.component.kafka.integration.common.KafkaTestUtil;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.infra.core.annotations.ContextFixture;
 import org.apache.camel.test.infra.core.api.ConfigurableContext;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.MethodOrderer;
 import org.junit.jupiter.api.Order;
@@ -55,16 +58,22 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class KafkaIdempotentRepositoryPersistenceIT extends 
BaseKafkaTestSupport implements ConfigurableContext {
 
+    private static String REPOSITORY_TOPIC = "TEST_PERSISTENCE_" + 
UUID.randomUUID();
     private KafkaIdempotentRepository kafkaIdempotentRepository;
 
+    @BeforeAll
+    public static void createRepositoryTopic() {
+        KafkaTestUtil.createTopic(service, REPOSITORY_TOPIC, 1);
+    }
+
     void clearTopics() {
-        kafkaAdminClient.deleteTopics(Arrays.asList("TEST_PERSISTENCE")).all();
+        
kafkaAdminClient.deleteTopics(Collections.singleton(REPOSITORY_TOPIC)).all();
     }
 
     @Override
     @ContextFixture
     public void configureContext(CamelContext context) {
-        kafkaIdempotentRepository = new 
KafkaIdempotentRepository("TEST_PERSISTENCE", getBootstrapServers());
+        kafkaIdempotentRepository = new 
KafkaIdempotentRepository(REPOSITORY_TOPIC, getBootstrapServers());
         context.getRegistry().bind("kafkaIdempotentRepositoryPersistence", 
kafkaIdempotentRepository);
     }
 
diff --git 
a/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/ContainerLocalAuthKafkaService.java
 
b/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/ContainerLocalAuthKafkaService.java
index 1d878169cb1..e0934710c80 100644
--- 
a/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/ContainerLocalAuthKafkaService.java
+++ 
b/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/ContainerLocalAuthKafkaService.java
@@ -21,7 +21,7 @@ import 
org.apache.camel.test.infra.common.services.ContainerService;
 import org.apache.camel.test.infra.kafka.common.KafkaProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.kafka.KafkaContainer;
 import org.testcontainers.utility.DockerImageName;
 import org.testcontainers.utility.MountableFile;
 
@@ -36,8 +36,6 @@ public class ContainerLocalAuthKafkaService implements 
KafkaService, ContainerSe
                     ContainerLocalKafkaService.KAFKA3_IMAGE_NAME))
                     
.asCompatibleSubstituteFor(ContainerLocalKafkaService.KAFKA3_IMAGE_NAME));
 
-            withEmbeddedZookeeper();
-
             final MountableFile mountableFile = 
MountableFile.forClasspathResource(jaasConfigFile);
             LOG.debug("Using mountable file at: {}", 
mountableFile.getFilesystemPath());
             withCopyFileToContainer(mountableFile, "/tmp/kafka-jaas.config")
diff --git 
a/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/ContainerLocalKafkaService.java
 
b/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/ContainerLocalKafkaService.java
index 5e958377de6..4450498288f 100644
--- 
a/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/ContainerLocalKafkaService.java
+++ 
b/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/ContainerLocalKafkaService.java
@@ -22,7 +22,7 @@ import 
org.apache.camel.test.infra.common.services.ContainerService;
 import org.apache.camel.test.infra.kafka.common.KafkaProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.kafka.KafkaContainer;
 import org.testcontainers.utility.DockerImageName;
 
 public class ContainerLocalKafkaService implements KafkaService, 
ContainerService<KafkaContainer> {
@@ -42,8 +42,8 @@ public class ContainerLocalKafkaService implements 
KafkaService, ContainerServic
     }
 
     protected KafkaContainer initContainer() {
-        return new 
KafkaContainer(DockerImageName.parse(System.getProperty(KafkaProperties.KAFKA_CONTAINER,
 KAFKA3_IMAGE_NAME)))
-                .withEmbeddedZookeeper();
+        return new KafkaContainer(
+                
DockerImageName.parse(System.getProperty(KafkaProperties.KAFKA_CONTAINER, 
KAFKA3_IMAGE_NAME)));
     }
 
     public String getBootstrapServers() {
@@ -79,7 +79,6 @@ public class ContainerLocalKafkaService implements 
KafkaService, ContainerServic
                 = new KafkaContainer(
                         
DockerImageName.parse(System.getProperty(KafkaProperties.KAFKA_CONTAINER, 
KAFKA3_IMAGE_NAME))
                                 
.asCompatibleSubstituteFor(ContainerLocalKafkaService.KAFKA3_IMAGE_NAME));
-        container = container.withEmbeddedZookeeper();
 
         return new ContainerLocalKafkaService(container);
     }
diff --git 
a/test-infra/camel-test-infra-kafka/src/test/resources/org/apache/camel/test/infra/kafka/services/container.properties
 
b/test-infra/camel-test-infra-kafka/src/test/resources/org/apache/camel/test/infra/kafka/services/container.properties
index baf466772b4..d456dddc934 100644
--- 
a/test-infra/camel-test-infra-kafka/src/test/resources/org/apache/camel/test/infra/kafka/services/container.properties
+++ 
b/test-infra/camel-test-infra-kafka/src/test/resources/org/apache/camel/test/infra/kafka/services/container.properties
@@ -14,6 +14,6 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-kafka3.container=confluentinc/cp-kafka:7.4.5
+kafka3.container=apache/kafka:3.8.0
 redpanda.container.image=redpandadata/redpanda:v24.1.16
 strimzi.container.image=quay.io/strimzi/kafka:latest-kafka-3.7.0

Reply via email to