This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 4b5dace6f2130b7fe2079fd8fbb890d752ea9c93
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Wed Feb 3 09:11:34 2021 +0100

    Added idempotency test for SJMS2 using header expressions
---
 .../sjms2/sink/CamelSinkIdempotentJMSITCase.java   | 80 +++++++++++++++++++---
 1 file changed, 69 insertions(+), 11 deletions(-)

diff --git 
a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
 
b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
index 566e823..d2f06a7 100644
--- 
a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
+++ 
b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
@@ -17,6 +17,8 @@
 
 package org.apache.camel.kafkaconnector.sjms2.sink;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -28,6 +30,7 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.TextMessage;
 
+import org.apache.camel.kafkaconnector.CamelSinkTask;
 import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
@@ -53,6 +56,11 @@ import static org.junit.jupiter.api.Assertions.fail;
  */
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
+    @FunctionalInterface
+    interface Producer {
+        void producerMessages();
+    }
+
     @RegisterExtension
     public static MessagingService jmsService = MessagingServiceBuilder
             .newBuilder(DispatchRouterContainer::new)
@@ -84,7 +92,8 @@ public class CamelSinkIdempotentJMSITCase extends 
AbstractKafkaTest {
     public void setUp() {
         LOG.info("JMS service running at {}", jmsService.defaultEndpoint());
         received = 0;
-        topic = TestUtils.getDefaultTestTopic(this.getClass());
+
+        topic = TestUtils.getDefaultTestTopic(this.getClass()) + 
TestUtils.randomWithRange(0, 100);
         destinationName = SJMS2Common.DEFAULT_JMS_QUEUE + "-" + 
TestUtils.randomWithRange(0, 100);
     }
 
@@ -142,7 +151,7 @@ public class CamelSinkIdempotentJMSITCase extends 
AbstractKafkaTest {
         }
     }
 
-    private void runTest(ConnectorPropertyFactory connectorPropertyFactory) 
throws ExecutionException, InterruptedException {
+    private void runTest(ConnectorPropertyFactory connectorPropertyFactory, 
Producer producer) throws ExecutionException, InterruptedException {
         connectorPropertyFactory.log();
         getKafkaConnectService().initializeConnector(connectorPropertyFactory);
 
@@ -151,14 +160,7 @@ public class CamelSinkIdempotentJMSITCase extends 
AbstractKafkaTest {
         LOG.debug("Creating the consumer ...");
         service.submit(() -> consumeJMSMessages());
 
-        KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
-
-        for (int i = 0; i < expect; i++) {
-            LOG.debug("Sending message 1/2");
-            kafkaClient.produce(topic, "Sink test message " + i);
-            LOG.debug("Sending message 2/2");
-            kafkaClient.produce(topic, "Sink test message " + i);
-        }
+        producer.producerMessages();
 
         LOG.debug("Waiting for the messages to be processed");
         service.shutdown();
@@ -170,6 +172,39 @@ public class CamelSinkIdempotentJMSITCase extends 
AbstractKafkaTest {
         }
     }
 
+    private void produceMessagesNoProperties() {
+        try {
+            KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
+
+            for (int i = 0; i < expect; i++) {
+                LOG.debug("Sending message 1/2");
+                kafkaClient.produce(topic, "Sink test message " + i);
+                LOG.debug("Sending message 2/2");
+                kafkaClient.produce(topic, "Sink test message " + i);
+            }
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
+    private void produceMessagesWithProperties() {
+        try {
+            KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
+
+            for (int i = 0; i < expect; i++) {
+                Map<String, String> headers = new HashMap<>();
+                int randomNumber = TestUtils.randomWithRange(1, 1000);
+
+                headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + 
"MessageNumber", String.valueOf(i));
+
+                kafkaClient.produce(topic, "Sink test message " + 
randomNumber, headers);
+                kafkaClient.produce(topic, "Sink test message " + randomNumber 
+ 1, headers);
+            }
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
     @Test
     @Timeout(90)
     public void testIdempotentBodySendReceive() {
@@ -184,7 +219,30 @@ public class CamelSinkIdempotentJMSITCase extends 
AbstractKafkaTest {
                         .withExpressionType("body")
                         .end();
 
-            runTest(connectorPropertyFactory);
+            runTest(connectorPropertyFactory, 
this::produceMessagesNoProperties);
+
+        } catch (Exception e) {
+            LOG.error("JMS test failed: {}", e.getMessage(), e);
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    @Timeout(90)
+    public void testIdempotentHeaderSendReceive() {
+        try {
+            ConnectorPropertyFactory connectorPropertyFactory = 
CamelJMSPropertyFactory
+                    .basic()
+                    .withTopics(topic)
+                    .withConnectionProperties(connectionProperties())
+                    .withDestinationName(destinationName)
+                    .withIdempotency()
+                    .withRepositoryType("memory")
+                    .withExpressionType("header")
+                    .withExpressionHeader("MessageNumber")
+                    .end();
+
+            runTest(connectorPropertyFactory, 
this::produceMessagesWithProperties);
 
         } catch (Exception e) {
             LOG.error("JMS test failed: {}", e.getMessage(), e);

Reply via email to