This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 748a184  camel-kafka: test fixes for remote mode and Kafka 3.0
748a184 is described below

commit 748a184977fe33e9b4bb7e74ddd0c7a5986d77ed
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Thu Sep 23 13:42:52 2021 +0200

    camel-kafka: test fixes for remote mode and Kafka 3.0
---
 .../kafka/integration/KafkaConsumerFullIT.java          |  2 +-
 .../kafka/integration/KafkaConsumerIdempotentIT.java    |  7 +++----
 .../integration/KafkaConsumerIdempotentTestSupport.java |  6 +++---
 .../KafkaConsumerIdempotentWithCustomSerializerIT.java  |  8 ++++----
 .../KafkaConsumerIdempotentWithProcessorIT.java         |  7 ++++---
 .../kafka/KafkaIdempotentRepositoryPersistenceIT.java   | 17 ++++++++++++++---
 6 files changed, 29 insertions(+), 18 deletions(-)

diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
index daf1454..b7dd102 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
@@ -80,7 +80,7 @@ public class KafkaConsumerFullIT extends 
BaseEmbeddedKafkaTestSupport {
             producer.close();
         }
         // clean all test topics
-        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
     }
 
     @Override
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
index 058b885..3f222d5 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
@@ -16,7 +16,7 @@
  */
 package org.apache.camel.component.kafka.integration;
 
-import java.util.Collections;
+import java.util.Arrays;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
@@ -58,14 +58,13 @@ public class KafkaConsumerIdempotentIT extends 
KafkaConsumerIdempotentTestSuppor
 
     @BeforeEach
     public void before() throws ExecutionException, InterruptedException, 
TimeoutException {
+        kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, 
"TEST_IDEMPOTENT")).all();
         doSend(size, TOPIC);
     }
 
     @AfterEach
     public void after() {
-
-        // clean all test topics
-        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+        kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, 
"TEST_IDEMPOTENT")).all();
     }
 
     @Override
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentTestSupport.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentTestSupport.java
index e3f62d6..fed1fee 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentTestSupport.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentTestSupport.java
@@ -21,8 +21,6 @@ import java.math.BigInteger;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.component.mock.MockEndpoint;
@@ -34,7 +32,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public abstract class KafkaConsumerIdempotentTestSupport extends 
BaseEmbeddedKafkaTestSupport {
 
-    protected void doSend(int size, String topic) throws ExecutionException, 
InterruptedException, TimeoutException {
+    protected void doSend(int size, String topic) {
         Properties props = getDefaultProperties();
         org.apache.kafka.clients.producer.KafkaProducer<String, String> 
producer
                 = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
@@ -59,6 +57,8 @@ public abstract class KafkaConsumerIdempotentTestSupport 
extends BaseEmbeddedKaf
 
         List<Exchange> exchangeList = mockEndpoint.getReceivedExchanges();
 
+        Thread.sleep(5000);
+
         mockEndpoint.assertIsSatisfied(10000);
 
         assertEquals(size, exchangeList.size());
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
index 14a5d92..d11ec71 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
@@ -16,7 +16,7 @@
  */
 package org.apache.camel.component.kafka.integration;
 
-import java.util.Collections;
+import java.util.Arrays;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
@@ -54,14 +54,14 @@ public class KafkaConsumerIdempotentWithCustomSerializerIT 
extends KafkaConsumer
 
     @BeforeEach
     public void before() throws ExecutionException, InterruptedException, 
TimeoutException {
+        kafkaIdempotentRepository.clear();
+        kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, 
"TEST_IDEMPOTENT")).all();
         doSend(size, TOPIC);
     }
 
     @AfterEach
     public void after() {
-
-        // clean all test topics
-        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+        kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, 
"TEST_IDEMPOTENT")).all();
     }
 
     @Override
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
index c8c2e14..d4cbf2b 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
@@ -17,7 +17,7 @@
 package org.apache.camel.component.kafka.integration;
 
 import java.math.BigInteger;
-import java.util.Collections;
+import java.util.Arrays;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
@@ -53,14 +53,15 @@ public class KafkaConsumerIdempotentWithProcessorIT extends 
KafkaConsumerIdempot
 
     @BeforeEach
     public void before() throws ExecutionException, InterruptedException, 
TimeoutException {
+        kafkaIdempotentRepository.clear();
+        kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, 
"TEST_IDEMPOTENT")).all();
         doSend(size, TOPIC);
     }
 
     @AfterEach
     public void after() {
-
         // clean all test topics
-        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+        kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, 
"TEST_IDEMPOTENT")).all();
     }
 
     @Override
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
index 15e5f06..433dcd6 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
@@ -16,16 +16,20 @@
  */
 package org.apache.camel.processor.idempotent.kafka;
 
+import java.util.Arrays;
+
 import org.apache.camel.BindToRegistry;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.RoutesBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import 
org.apache.camel.component.kafka.integration.BaseEmbeddedKafkaTestSupport;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.MethodOrderer;
 import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestMethodOrder;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -41,7 +45,7 @@ public class KafkaIdempotentRepositoryPersistenceIT extends 
BaseEmbeddedKafkaTes
 
     // Every instance of the repository must use a different topic to 
guarantee isolation between tests
     @BindToRegistry("kafkaIdempotentRepository")
-    private KafkaIdempotentRepository kafkaIdempotentRepository
+    private final KafkaIdempotentRepository kafkaIdempotentRepository
             = new KafkaIdempotentRepository("TEST_PERSISTENCE", 
getBootstrapServers());
 
     @EndpointInject("mock:out")
@@ -50,6 +54,11 @@ public class KafkaIdempotentRepositoryPersistenceIT extends 
BaseEmbeddedKafkaTes
     @EndpointInject("mock:before")
     private MockEndpoint mockBefore;
 
+    @BeforeEach
+    void clearTopics() {
+        kafkaAdminClient.deleteTopics(Arrays.asList("TEST_PERSISTENCE")).all();
+    }
+
     @Override
     protected RoutesBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
@@ -63,7 +72,7 @@ public class KafkaIdempotentRepositoryPersistenceIT extends 
BaseEmbeddedKafkaTes
 
     @Order(1)
     @Test
-    public void testFirstPassFiltersAsExpected() throws InterruptedException {
+    public void testFirstPassFiltersAsExpected() {
         for (int i = 0; i < 10; i++) {
             template.sendBodyAndHeader("direct:in", "Test message", "id", i % 
5);
         }
@@ -80,7 +89,9 @@ public class KafkaIdempotentRepositoryPersistenceIT extends 
BaseEmbeddedKafkaTes
 
     @Order(2)
     @Test
-    public void testSecondPassFiltersEverything() throws InterruptedException {
+    @DisabledIfSystemProperty(named = "kafka.instance.type", matches = 
"remote",
+                              disabledReason = "Remote may not allow deleting 
the topic, may contain data, etc")
+    public void testSecondPassFiltersEverything() {
         for (int i = 0; i < 10; i++) {
             template.sendBodyAndHeader("direct:in", "Test message", "id", i % 
5);
         }

Reply via email to