This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 748a184 camel-kafka: test fixes for remote mode and Kafka 3.0 748a184 is described below commit 748a184977fe33e9b4bb7e74ddd0c7a5986d77ed Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Thu Sep 23 13:42:52 2021 +0200 camel-kafka: test fixes for remote mode and Kafka 3.0 --- .../kafka/integration/KafkaConsumerFullIT.java | 2 +- .../kafka/integration/KafkaConsumerIdempotentIT.java | 7 +++---- .../integration/KafkaConsumerIdempotentTestSupport.java | 6 +++--- .../KafkaConsumerIdempotentWithCustomSerializerIT.java | 8 ++++---- .../KafkaConsumerIdempotentWithProcessorIT.java | 7 ++++--- .../kafka/KafkaIdempotentRepositoryPersistenceIT.java | 17 ++++++++++++++--- 6 files changed, 29 insertions(+), 18 deletions(-) diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java index daf1454..b7dd102 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java @@ -80,7 +80,7 @@ public class KafkaConsumerFullIT extends BaseEmbeddedKafkaTestSupport { producer.close(); } // clean all test topics - kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)); + kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all(); } @Override diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java index 058b885..3f222d5 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java @@ -16,7 +16,7 @@ */ package org.apache.camel.component.kafka.integration; -import java.util.Collections; +import java.util.Arrays; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @@ -58,14 +58,13 @@ public class KafkaConsumerIdempotentIT extends KafkaConsumerIdempotentTestSuppor @BeforeEach public void before() throws ExecutionException, InterruptedException, TimeoutException { + kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, "TEST_IDEMPOTENT")).all(); doSend(size, TOPIC); } @AfterEach public void after() { - - // clean all test topics - kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)); + kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, "TEST_IDEMPOTENT")).all(); } @Override diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentTestSupport.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentTestSupport.java index e3f62d6..fed1fee 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentTestSupport.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentTestSupport.java @@ -21,8 +21,6 @@ import java.math.BigInteger; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; import org.apache.camel.Exchange; import org.apache.camel.component.mock.MockEndpoint; @@ -34,7 +32,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public abstract class KafkaConsumerIdempotentTestSupport extends BaseEmbeddedKafkaTestSupport { - protected void doSend(int size, String topic) throws ExecutionException, InterruptedException, TimeoutException { + protected void doSend(int size, String topic) { Properties props = getDefaultProperties(); org.apache.kafka.clients.producer.KafkaProducer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); @@ -59,6 +57,8 @@ public abstract class KafkaConsumerIdempotentTestSupport extends BaseEmbeddedKaf List<Exchange> exchangeList = mockEndpoint.getReceivedExchanges(); + Thread.sleep(5000); + mockEndpoint.assertIsSatisfied(10000); assertEquals(size, exchangeList.size()); diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java index 14a5d92..d11ec71 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java @@ -16,7 +16,7 @@ */ package org.apache.camel.component.kafka.integration; -import java.util.Collections; +import java.util.Arrays; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @@ -54,14 +54,14 @@ public class KafkaConsumerIdempotentWithCustomSerializerIT extends KafkaConsumer @BeforeEach public void before() throws ExecutionException, InterruptedException, TimeoutException { + kafkaIdempotentRepository.clear(); + kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, "TEST_IDEMPOTENT")).all(); doSend(size, TOPIC); } @AfterEach public void after() { - - // clean all test topics - kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)); + kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, "TEST_IDEMPOTENT")).all(); } @Override diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java index c8c2e14..d4cbf2b 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java @@ -17,7 +17,7 @@ package org.apache.camel.component.kafka.integration; import java.math.BigInteger; -import java.util.Collections; +import java.util.Arrays; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @@ -53,14 +53,15 @@ public class KafkaConsumerIdempotentWithProcessorIT extends KafkaConsumerIdempot @BeforeEach public void before() throws ExecutionException, InterruptedException, TimeoutException { + kafkaIdempotentRepository.clear(); + kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, "TEST_IDEMPOTENT")).all(); doSend(size, TOPIC); } @AfterEach public void after() { - // clean all test topics - kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)); + kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, "TEST_IDEMPOTENT")).all(); } @Override diff --git a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java index 15e5f06..433dcd6 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java @@ -16,16 +16,20 @@ */ package org.apache.camel.processor.idempotent.kafka; +import java.util.Arrays; + import org.apache.camel.BindToRegistry; import org.apache.camel.EndpointInject; import org.apache.camel.RoutesBuilder; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.kafka.integration.BaseEmbeddedKafkaTestSupport; import org.apache.camel.component.mock.MockEndpoint; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.api.condition.DisabledIfSystemProperty; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -41,7 +45,7 @@ public class KafkaIdempotentRepositoryPersistenceIT extends BaseEmbeddedKafkaTes // Every instance of the repository must use a different topic to guarantee isolation between tests @BindToRegistry("kafkaIdempotentRepository") - private KafkaIdempotentRepository kafkaIdempotentRepository + private final KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("TEST_PERSISTENCE", getBootstrapServers()); @EndpointInject("mock:out") @@ -50,6 +54,11 @@ public class KafkaIdempotentRepositoryPersistenceIT extends BaseEmbeddedKafkaTes @EndpointInject("mock:before") private MockEndpoint mockBefore; + @BeforeEach + void clearTopics() { + kafkaAdminClient.deleteTopics(Arrays.asList("TEST_PERSISTENCE")).all(); + } + @Override protected RoutesBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @@ -63,7 +72,7 @@ public class KafkaIdempotentRepositoryPersistenceIT extends BaseEmbeddedKafkaTes @Order(1) @Test - public void testFirstPassFiltersAsExpected() throws InterruptedException { + public void testFirstPassFiltersAsExpected() { for (int i = 0; i < 10; i++) { template.sendBodyAndHeader("direct:in", "Test message", "id", i % 5); } @@ -80,7 +89,9 @@ public class KafkaIdempotentRepositoryPersistenceIT extends BaseEmbeddedKafkaTes @Order(2) @Test - public void testSecondPassFiltersEverything() throws InterruptedException { + @DisabledIfSystemProperty(named = "kafka.instance.type", matches = "remote", + disabledReason = "Remote may not allow deleting the topic, may contain data, etc") + public void testSecondPassFiltersEverything() { for (int i = 0; i < 10; i++) { template.sendBodyAndHeader("direct:in", "Test message", "id", i % 5); }