This is an automated email from the ASF dual-hosted git repository.

oalsafi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 131a2e6  CAMEL-14957: Fix Kafka testcontainers integration tests 
(#3807)
131a2e6 is described below

commit 131a2e6a5e42585109db44197131b1b93d95168b
Author: Omar Al-Safi <omars...@gmail.com>
AuthorDate: Wed May 6 19:01:06 2020 +0200

    CAMEL-14957: Fix Kafka testcontainers integration tests (#3807)
---
 .../component/kafka/BaseEmbeddedKafkaTest.java     | 24 +++++++--
 .../kafka/KafkaConsumerBatchSizeTest.java          |  3 ++
 .../component/kafka/KafkaConsumerFullTest.java     |  3 ++
 .../kafka/KafkaConsumerLastRecordHeaderTest.java   |  3 ++
 .../kafka/KafkaConsumerManualCommitTest.java       |  3 ++
 .../kafka/KafkaConsumerRebalanceTest.java          |  8 +++
 .../kafka/KafkaConsumerTopicIsPatternTest.java     |  3 ++
 .../component/kafka/KafkaProducerFullTest.java     | 62 +++++++++++++++-------
 8 files changed, 86 insertions(+), 23 deletions(-)

diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
index 4a2c772..3c1f2ef 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
@@ -20,23 +20,30 @@ import java.util.Properties;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.junit.BeforeClass;
-import org.junit.ClassRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.containers.wait.strategy.Wait;
 
-public class BaseEmbeddedKafkaTest extends CamelTestSupport {
+public abstract class BaseEmbeddedKafkaTest extends CamelTestSupport {
+    protected static AdminClient kafkaAdminClient;
+
     private static final String CONFLUENT_PLATFORM_VERSION = "5.5.0";
 
-    @ClassRule
-    public static KafkaContainer kafkaBroker = new 
KafkaContainer(CONFLUENT_PLATFORM_VERSION)
+    private static final Logger LOG = 
LoggerFactory.getLogger(BaseEmbeddedKafkaTest.class);
+
+    protected static KafkaContainer kafkaBroker = new 
KafkaContainer(CONFLUENT_PLATFORM_VERSION)
         .withEmbeddedZookeeper()
         .waitingFor(Wait.forListeningPort());
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(BaseEmbeddedKafkaTest.class);
+    static {
+        kafkaBroker.start();
+        kafkaAdminClient = createAdminClient();
+    }
 
     @BeforeClass
     public static void beforeClass() {
@@ -71,4 +78,11 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport {
     protected static String getBootstrapServers() {
         return kafkaBroker.getBootstrapServers();
     }
+
+    private static AdminClient createAdminClient() {
+        final Properties properties = new Properties();
+        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaBroker.getBootstrapServers());
+
+        return KafkaAdminClient.create(properties);
+    }
 }
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
index 06d5de0..4588d8f 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.kafka;
 
+import java.util.Collections;
 import java.util.Properties;
 
 import org.apache.camel.Endpoint;
@@ -50,6 +51,8 @@ public class KafkaConsumerBatchSizeTest extends 
BaseEmbeddedKafkaTest {
         if (producer != null) {
             producer.close();
         }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
     }
 
     @Override
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
index d6a3150..3673b3f 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.kafka;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
 import java.util.stream.StreamSupport;
@@ -62,6 +63,8 @@ public class KafkaConsumerFullTest extends 
BaseEmbeddedKafkaTest {
         if (producer != null) {
             producer.close();
         }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
     }
 
     @Override
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerLastRecordHeaderTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerLastRecordHeaderTest.java
index c29de33..1cb8673 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerLastRecordHeaderTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerLastRecordHeaderTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.kafka;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 
@@ -47,6 +48,8 @@ public class KafkaConsumerLastRecordHeaderTest extends 
BaseEmbeddedKafkaTest {
         if (producer != null) {
             producer.close();
         }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
     }
 
     /**
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerManualCommitTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerManualCommitTest.java
index 476e346..0e6524d 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerManualCommitTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerManualCommitTest.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.kafka;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Properties;
 import java.util.stream.StreamSupport;
 
@@ -55,6 +56,8 @@ public class KafkaConsumerManualCommitTest extends 
BaseEmbeddedKafkaTest {
         if (producer != null) {
             producer.close();
         }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
     }
 
     @Override
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalanceTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalanceTest.java
index 856087a..ca19ce0 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalanceTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalanceTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.kafka;
 
+import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -24,6 +25,7 @@ import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.spi.StateRepository;
+import org.junit.After;
 import org.junit.Test;
 
 public class KafkaConsumerRebalanceTest extends BaseEmbeddedKafkaTest {
@@ -48,6 +50,12 @@ public class KafkaConsumerRebalanceTest extends 
BaseEmbeddedKafkaTest {
         assertTrue("StateRepository.getState should have been called twice for 
topic " + TOPIC + ". Remaining count : " + messagesLatch.getCount(), 
offsetGetStateCalled);
     }
 
+    @After
+    public void after() {
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTopicIsPatternTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTopicIsPatternTest.java
index 9a54c8f..65325aa 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTopicIsPatternTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTopicIsPatternTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.kafka;
 
+import java.util.Collections;
 import java.util.Properties;
 import java.util.stream.StreamSupport;
 
@@ -54,6 +55,8 @@ public class KafkaConsumerTopicIsPatternTest extends 
BaseEmbeddedKafkaTest {
         if (producer != null) {
             producer.close();
         }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
     }
 
     @Override
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index 355c433..19d8b8d 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -104,24 +104,20 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
 
     @BeforeClass
     public static void before() {
-        Properties stringsProps = new Properties();
+        stringsConsumerConn = createStringKafkaConsumer("DemoConsumer");
+        bytesConsumerConn = createByteKafkaConsumer(GROUP_BYTES);
+    }
 
-        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
 getBootstrapServers());
-        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG,
 "DemoConsumer");
-        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
 "true");
-        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
 "1000");
-        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
 "30000");
-        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 "org.apache.kafka.common.serialization.StringDeserializer");
-        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 "org.apache.kafka.common.serialization.StringDeserializer");
-        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
 "earliest");
-        stringsConsumerConn = new KafkaConsumer<>(stringsProps);
-
-        Properties bytesProps = new Properties();
-        bytesProps.putAll(stringsProps);
-        bytesProps.put("group.id", GROUP_BYTES);
-        
bytesProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        
bytesProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        bytesConsumerConn = new KafkaConsumer<>(bytesProps);
+    @AfterClass
+    public static void after() {
+        // clean all test topics
+        final List<String> topics = new ArrayList<>();
+        topics.add(TOPIC_BYTES);
+        topics.add(TOPIC_INTERCEPTED);
+        topics.add(TOPIC_PROPAGATED_HEADERS);
+        topics.add(TOPIC_STRINGS);
+
+        kafkaAdminClient.deleteTopics(topics);
     }
 
     @Override
@@ -329,7 +325,7 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
         CountDownLatch messagesLatch = new CountDownLatch(1);
         propagatedHeadersTemplate.sendBodyAndHeaders("Some test message", 
camelHeaders);
 
-        List<ConsumerRecord<String, String>> records = 
pollForRecords(stringsConsumerConn, TOPIC_PROPAGATED_HEADERS, messagesLatch);
+        List<ConsumerRecord<String, String>> records = 
pollForRecords(createStringKafkaConsumer("propagatedHeaderConsumer"), 
TOPIC_PROPAGATED_HEADERS, messagesLatch);
         boolean allMessagesReceived = messagesLatch.await(10_000, 
TimeUnit.MILLISECONDS);
 
         assertTrue("Not all messages were published to the kafka topics. Not 
received: " + messagesLatch.getCount(), allMessagesReceived);
@@ -365,6 +361,36 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
         return foundHeader.value();
     }
 
+    private static KafkaConsumer<String, String> 
createStringKafkaConsumer(final String groupId) {
+        Properties stringsProps = new Properties();
+
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
 getBootstrapServers());
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG,
 groupId);
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
 "true");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
 "1000");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
 "30000");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 "org.apache.kafka.common.serialization.StringDeserializer");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 "org.apache.kafka.common.serialization.StringDeserializer");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
 "earliest");
+
+        return new KafkaConsumer<>(stringsProps);
+    }
+
+    private static KafkaConsumer<byte[], byte[]> createByteKafkaConsumer(final 
String groupId) {
+        Properties stringsProps = new Properties();
+
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
 getBootstrapServers());
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG,
 groupId);
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
 "true");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
 "1000");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
 "30000");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
 "earliest");
+
+        return new KafkaConsumer<>(stringsProps);
+    }
+
     private List<ConsumerRecord<String, String>> 
pollForRecords(KafkaConsumer<String, String> consumerConn, String topic, 
CountDownLatch messagesLatch) {
 
         List<ConsumerRecord<String, String>> consumedRecords = new 
ArrayList<>();

Reply via email to