This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 58966a9af30 CAMEL-19358 - changes to address flakiness of 
KafkaConsumerFullIT test (#14423)
58966a9af30 is described below

commit 58966a9af30239549c702c880ceaa86f5547d0d0
Author: Jang-Vijay Singh <jvs...@gmail.com>
AuthorDate: Mon Jun 10 05:56:49 2024 +0100

    CAMEL-19358 - changes to address flakiness of KafkaConsumerFullIT test 
(#14423)
    
    changes to address flakiness of KafkaConsumerFullIT test:
    - unique topic & route names
    - ensure topic is actually deleted in @AfterEach
---
 .../kafka/integration/KafkaConsumerFullIT.java     | 31 +++++++++++++++-------
 1 file changed, 21 insertions(+), 10 deletions(-)

diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
index 318ecc59e22..0957733834c 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.kafka.integration;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.StreamSupport;
 
 import org.apache.camel.BindToRegistry;
@@ -33,7 +34,9 @@ import 
org.apache.camel.component.kafka.integration.common.KafkaTestUtil;
 import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.infra.core.annotations.RouteFixture;
+import org.apache.kafka.clients.admin.DeleteTopicsResult;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -54,7 +57,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class KafkaConsumerFullIT extends BaseKafkaTestSupport {
-    public static final String TOPIC = "test-full-KafkaConsumerFullIT"; 
//CAMEL-20722: try a more unique name to avoid clash
+    public static final String TOPIC = "test-full-" + Uuid.randomUuid(); 
//CAMEL-20722: a more unique name to avoid clash
+    public static final String ROUTE = "full-it-" + Uuid.randomUuid();   
//CAMEL-20722: a more unique name to avoid clash
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaConsumerFullIT.class);
 
@@ -80,7 +84,14 @@ public class KafkaConsumerFullIT extends 
BaseKafkaTestSupport {
             producer.close();
         }
         // clean all test topics
-        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
+        DeleteTopicsResult r = 
kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+
+        // wait necessary to ensure the topic is actually deleted, and avoid 
chance of clash in unrelate tests
+        Awaitility.await()
+                .timeout(60, TimeUnit.SECONDS)
+                .pollDelay(3, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertTrue(r.all().isDone()));
+
     }
 
     @RouteFixture
@@ -94,7 +105,7 @@ public class KafkaConsumerFullIT extends 
BaseKafkaTestSupport {
             public void configure() {
                 from(FROM_URI)
                         .process(exchange -> LOG.trace("Captured on the 
processor: {}", exchange.getMessage().getBody()))
-                        .routeId("full-it").to(KafkaTestUtil.MOCK_RESULT);
+                        .routeId(ROUTE).to(KafkaTestUtil.MOCK_RESULT);
             }
         };
     }
@@ -174,12 +185,12 @@ public class KafkaConsumerFullIT extends 
BaseKafkaTestSupport {
 
         // Restart endpoint
         CamelContext context = contextExtension.getContext();
-        context.getRouteController().stopRoute("full-it");
+        context.getRouteController().stopRoute(ROUTE);
 
         KafkaEndpoint kafkaEndpoint = (KafkaEndpoint) 
context.getEndpoint(FROM_URI);
         kafkaEndpoint.getConfiguration().setSeekTo(SeekPolicy.BEGINNING);
 
-        context.getRouteController().startRoute("full-it");
+        context.getRouteController().startRoute(ROUTE);
 
         // As wee set seek to beginning we should re-consume all messages
         to.assertIsSatisfied(3000);
@@ -205,12 +216,12 @@ public class KafkaConsumerFullIT extends 
BaseKafkaTestSupport {
 
         // Restart endpoint
         CamelContext context = contextExtension.getContext();
-        context.getRouteController().stopRoute("full-it");
+        context.getRouteController().stopRoute(ROUTE);
 
         KafkaEndpoint kafkaEndpoint = (KafkaEndpoint) 
context.getEndpoint(FROM_URI);
         kafkaEndpoint.getConfiguration().setSeekTo(SeekPolicy.END);
 
-        context.getRouteController().startRoute("full-it");
+        context.getRouteController().startRoute(ROUTE);
 
         to.assertIsSatisfied(3000);
     }
@@ -246,17 +257,17 @@ public class KafkaConsumerFullIT extends 
BaseKafkaTestSupport {
 
         // suspend route
         CamelContext context = contextExtension.getContext();
-        context.getRouteController().suspendRoute("full-it");
+        context.getRouteController().suspendRoute(ROUTE);
 
         // wait until the kafka client is really paused
-        KafkaConsumer kc = (KafkaConsumer) 
context.getRoute("full-it").getConsumer();
+        KafkaConsumer kc = (KafkaConsumer) 
context.getRoute(ROUTE).getConsumer();
         Awaitility.await().until(() -> {
             boolean paused = kc.isKafkaPaused();
             LOG.info("Waiting for kafka client to be paused: {}", paused);
             return paused;
         });
 
-        context.getRouteController().resumeRoute("full-it");
+        context.getRouteController().resumeRoute(ROUTE);
 
         to.reset();
 

Reply via email to