This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 58966a9af30 CAMEL-19358 - changes to address flakiness of KafkaConsumerFullIT test (#14423) 58966a9af30 is described below commit 58966a9af30239549c702c880ceaa86f5547d0d0 Author: Jang-Vijay Singh <jvs...@gmail.com> AuthorDate: Mon Jun 10 05:56:49 2024 +0100 CAMEL-19358 - changes to address flakiness of KafkaConsumerFullIT test (#14423) changes to address flakiness of KafkaConsumerFullIT test: - unique topic & route names - ensure topic is actually deleted in @AfterEach --- .../kafka/integration/KafkaConsumerFullIT.java | 31 +++++++++++++++------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java index 318ecc59e22..0957733834c 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java @@ -19,6 +19,7 @@ package org.apache.camel.component.kafka.integration; import java.util.Collections; import java.util.Map; import java.util.Properties; +import java.util.concurrent.TimeUnit; import java.util.stream.StreamSupport; import org.apache.camel.BindToRegistry; @@ -33,7 +34,9 @@ import org.apache.camel.component.kafka.integration.common.KafkaTestUtil; import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.test.infra.core.annotations.RouteFixture; +import org.apache.kafka.clients.admin.DeleteTopicsResult; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.header.internals.RecordHeader; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -54,7 +57,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class KafkaConsumerFullIT extends BaseKafkaTestSupport { - public static final String TOPIC = "test-full-KafkaConsumerFullIT"; //CAMEL-20722: try a more unique name to avoid clash + public static final String TOPIC = "test-full-" + Uuid.randomUuid(); //CAMEL-20722: a more unique name to avoid clash + public static final String ROUTE = "full-it-" + Uuid.randomUuid(); //CAMEL-20722: a more unique name to avoid clash private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerFullIT.class); @@ -80,7 +84,14 @@ public class KafkaConsumerFullIT extends BaseKafkaTestSupport { producer.close(); } // clean all test topics - kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all(); + DeleteTopicsResult r = kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)); + + // wait necessary to ensure the topic is actually deleted, and avoid chance of clash in unrelate tests + Awaitility.await() + .timeout(60, TimeUnit.SECONDS) + .pollDelay(3, TimeUnit.SECONDS) + .untilAsserted(() -> assertTrue(r.all().isDone())); + } @RouteFixture @@ -94,7 +105,7 @@ public class KafkaConsumerFullIT extends BaseKafkaTestSupport { public void configure() { from(FROM_URI) .process(exchange -> LOG.trace("Captured on the processor: {}", exchange.getMessage().getBody())) - .routeId("full-it").to(KafkaTestUtil.MOCK_RESULT); + .routeId(ROUTE).to(KafkaTestUtil.MOCK_RESULT); } }; } @@ -174,12 +185,12 @@ public class KafkaConsumerFullIT extends BaseKafkaTestSupport { // Restart endpoint CamelContext context = contextExtension.getContext(); - context.getRouteController().stopRoute("full-it"); + context.getRouteController().stopRoute(ROUTE); KafkaEndpoint kafkaEndpoint = (KafkaEndpoint) context.getEndpoint(FROM_URI); kafkaEndpoint.getConfiguration().setSeekTo(SeekPolicy.BEGINNING); - context.getRouteController().startRoute("full-it"); + context.getRouteController().startRoute(ROUTE); // As wee set seek to beginning we should re-consume all messages to.assertIsSatisfied(3000); @@ -205,12 +216,12 @@ public class KafkaConsumerFullIT extends BaseKafkaTestSupport { // Restart endpoint CamelContext context = contextExtension.getContext(); - context.getRouteController().stopRoute("full-it"); + context.getRouteController().stopRoute(ROUTE); KafkaEndpoint kafkaEndpoint = (KafkaEndpoint) context.getEndpoint(FROM_URI); kafkaEndpoint.getConfiguration().setSeekTo(SeekPolicy.END); - context.getRouteController().startRoute("full-it"); + context.getRouteController().startRoute(ROUTE); to.assertIsSatisfied(3000); } @@ -246,17 +257,17 @@ public class KafkaConsumerFullIT extends BaseKafkaTestSupport { // suspend route CamelContext context = contextExtension.getContext(); - context.getRouteController().suspendRoute("full-it"); + context.getRouteController().suspendRoute(ROUTE); // wait until the kafka client is really paused - KafkaConsumer kc = (KafkaConsumer) context.getRoute("full-it").getConsumer(); + KafkaConsumer kc = (KafkaConsumer) context.getRoute(ROUTE).getConsumer(); Awaitility.await().until(() -> { boolean paused = kc.isKafkaPaused(); LOG.info("Waiting for kafka client to be paused: {}", paused); return paused; }); - context.getRouteController().resumeRoute("full-it"); + context.getRouteController().resumeRoute(ROUTE); to.reset();