This is an automated email from the ASF dual-hosted git repository. oalsafi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 131a2e6 CAMEL-14957: Fix Kafka testcontainers integration tests (#3807) 131a2e6 is described below commit 131a2e6a5e42585109db44197131b1b93d95168b Author: Omar Al-Safi <omars...@gmail.com> AuthorDate: Wed May 6 19:01:06 2020 +0200 CAMEL-14957: Fix Kafka testcontainers integration tests (#3807) --- .../component/kafka/BaseEmbeddedKafkaTest.java | 24 +++++++-- .../kafka/KafkaConsumerBatchSizeTest.java | 3 ++ .../component/kafka/KafkaConsumerFullTest.java | 3 ++ .../kafka/KafkaConsumerLastRecordHeaderTest.java | 3 ++ .../kafka/KafkaConsumerManualCommitTest.java | 3 ++ .../kafka/KafkaConsumerRebalanceTest.java | 8 +++ .../kafka/KafkaConsumerTopicIsPatternTest.java | 3 ++ .../component/kafka/KafkaProducerFullTest.java | 62 +++++++++++++++------- 8 files changed, 86 insertions(+), 23 deletions(-) diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java index 4a2c772..3c1f2ef 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java @@ -20,23 +20,30 @@ import java.util.Properties; import org.apache.camel.CamelContext; import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.producer.ProducerConfig; import org.junit.BeforeClass; -import org.junit.ClassRule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.wait.strategy.Wait; -public class BaseEmbeddedKafkaTest extends CamelTestSupport { +public abstract class BaseEmbeddedKafkaTest extends CamelTestSupport { + protected static AdminClient kafkaAdminClient; + private static final String CONFLUENT_PLATFORM_VERSION = "5.5.0"; - @ClassRule - public static KafkaContainer kafkaBroker = new KafkaContainer(CONFLUENT_PLATFORM_VERSION) + private static final Logger LOG = LoggerFactory.getLogger(BaseEmbeddedKafkaTest.class); + + protected static KafkaContainer kafkaBroker = new KafkaContainer(CONFLUENT_PLATFORM_VERSION) .withEmbeddedZookeeper() .waitingFor(Wait.forListeningPort()); - private static final Logger LOG = LoggerFactory.getLogger(BaseEmbeddedKafkaTest.class); + static { + kafkaBroker.start(); + kafkaAdminClient = createAdminClient(); + } @BeforeClass public static void beforeClass() { @@ -71,4 +78,11 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport { protected static String getBootstrapServers() { return kafkaBroker.getBootstrapServers(); } + + private static AdminClient createAdminClient() { + final Properties properties = new Properties(); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker.getBootstrapServers()); + + return KafkaAdminClient.create(properties); + } } diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java index 06d5de0..4588d8f 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.kafka; +import java.util.Collections; import java.util.Properties; import org.apache.camel.Endpoint; @@ -50,6 +51,8 @@ public class KafkaConsumerBatchSizeTest extends BaseEmbeddedKafkaTest { if (producer != null) { producer.close(); } + // clean all test topics + kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)); } @Override diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java index d6a3150..3673b3f 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java @@ -17,6 +17,7 @@ package org.apache.camel.component.kafka; import java.io.IOException; +import java.util.Collections; import java.util.Map; import java.util.Properties; import java.util.stream.StreamSupport; @@ -62,6 +63,8 @@ public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest { if (producer != null) { producer.close(); } + // clean all test topics + kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)); } @Override diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerLastRecordHeaderTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerLastRecordHeaderTest.java index c29de33..1cb8673 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerLastRecordHeaderTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerLastRecordHeaderTest.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.kafka; +import java.util.Collections; import java.util.List; import java.util.Properties; @@ -47,6 +48,8 @@ public class KafkaConsumerLastRecordHeaderTest extends BaseEmbeddedKafkaTest { if (producer != null) { producer.close(); } + // clean all test topics + kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)); } /** diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerManualCommitTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerManualCommitTest.java index 476e346..0e6524d 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerManualCommitTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerManualCommitTest.java @@ -17,6 +17,7 @@ package org.apache.camel.component.kafka; import java.io.IOException; +import java.util.Collections; import java.util.Properties; import java.util.stream.StreamSupport; @@ -55,6 +56,8 @@ public class KafkaConsumerManualCommitTest extends BaseEmbeddedKafkaTest { if (producer != null) { producer.close(); } + // clean all test topics + kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)); } @Override diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalanceTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalanceTest.java index 856087a..ca19ce0 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalanceTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalanceTest.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.kafka; +import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -24,6 +25,7 @@ import org.apache.camel.EndpointInject; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.spi.StateRepository; +import org.junit.After; import org.junit.Test; public class KafkaConsumerRebalanceTest extends BaseEmbeddedKafkaTest { @@ -48,6 +50,12 @@ public class KafkaConsumerRebalanceTest extends BaseEmbeddedKafkaTest { assertTrue("StateRepository.getState should have been called twice for topic " + TOPIC + ". Remaining count : " + messagesLatch.getCount(), offsetGetStateCalled); } + @After + public void after() { + // clean all test topics + kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)); + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTopicIsPatternTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTopicIsPatternTest.java index 9a54c8f..65325aa 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTopicIsPatternTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTopicIsPatternTest.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.kafka; +import java.util.Collections; import java.util.Properties; import java.util.stream.StreamSupport; @@ -54,6 +55,8 @@ public class KafkaConsumerTopicIsPatternTest extends BaseEmbeddedKafkaTest { if (producer != null) { producer.close(); } + // clean all test topics + kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)); } @Override diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java index 355c433..19d8b8d 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java @@ -104,24 +104,20 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { @BeforeClass public static void before() { - Properties stringsProps = new Properties(); + stringsConsumerConn = createStringKafkaConsumer("DemoConsumer"); + bytesConsumerConn = createByteKafkaConsumer(GROUP_BYTES); + } - stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers()); - stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer"); - stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); - stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); - stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); - stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - stringsConsumerConn = new KafkaConsumer<>(stringsProps); - - Properties bytesProps = new Properties(); - bytesProps.putAll(stringsProps); - bytesProps.put("group.id", GROUP_BYTES); - bytesProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - bytesProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - bytesConsumerConn = new KafkaConsumer<>(bytesProps); + @AfterClass + public static void after() { + // clean all test topics + final List<String> topics = new ArrayList<>(); + topics.add(TOPIC_BYTES); + topics.add(TOPIC_INTERCEPTED); + topics.add(TOPIC_PROPAGATED_HEADERS); + topics.add(TOPIC_STRINGS); + + kafkaAdminClient.deleteTopics(topics); } @Override @@ -329,7 +325,7 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { CountDownLatch messagesLatch = new CountDownLatch(1); propagatedHeadersTemplate.sendBodyAndHeaders("Some test message", camelHeaders); - List<ConsumerRecord<String, String>> records = pollForRecords(stringsConsumerConn, TOPIC_PROPAGATED_HEADERS, messagesLatch); + List<ConsumerRecord<String, String>> records = pollForRecords(createStringKafkaConsumer("propagatedHeaderConsumer"), TOPIC_PROPAGATED_HEADERS, messagesLatch); boolean allMessagesReceived = messagesLatch.await(10_000, TimeUnit.MILLISECONDS); assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived); @@ -365,6 +361,36 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { return foundHeader.value(); } + private static KafkaConsumer<String, String> createStringKafkaConsumer(final String groupId) { + Properties stringsProps = new Properties(); + + stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers()); + stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG, groupId); + stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); + stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); + stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + return new KafkaConsumer<>(stringsProps); + } + + private static KafkaConsumer<byte[], byte[]> createByteKafkaConsumer(final String groupId) { + Properties stringsProps = new Properties(); + + stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers()); + stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG, groupId); + stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); + stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); + stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + return new KafkaConsumer<>(stringsProps); + } + private List<ConsumerRecord<String, String>> pollForRecords(KafkaConsumer<String, String> consumerConn, String topic, CountDownLatch messagesLatch) { List<ConsumerRecord<String, String>> consumedRecords = new ArrayList<>();