This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 4b5dace6f2130b7fe2079fd8fbb890d752ea9c93 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Wed Feb 3 09:11:34 2021 +0100 Added idempotency test for SJMS2 using header expressions --- .../sjms2/sink/CamelSinkIdempotentJMSITCase.java | 80 +++++++++++++++++++--- 1 file changed, 69 insertions(+), 11 deletions(-) diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java index 566e823..d2f06a7 100644 --- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java +++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java @@ -17,6 +17,8 @@ package org.apache.camel.kafkaconnector.sjms2.sink; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -28,6 +30,7 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.TextMessage; +import org.apache.camel.kafkaconnector.CamelSinkTask; import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; @@ -53,6 +56,11 @@ import static org.junit.jupiter.api.Assertions.fail; */ @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest { + @FunctionalInterface + interface Producer { + void producerMessages(); + } + @RegisterExtension public static MessagingService jmsService = MessagingServiceBuilder .newBuilder(DispatchRouterContainer::new) @@ -84,7 +92,8 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest { public void setUp() { LOG.info("JMS service running at {}", jmsService.defaultEndpoint()); received = 0; - topic = TestUtils.getDefaultTestTopic(this.getClass()); + + topic = TestUtils.getDefaultTestTopic(this.getClass()) + TestUtils.randomWithRange(0, 100); destinationName = SJMS2Common.DEFAULT_JMS_QUEUE + "-" + TestUtils.randomWithRange(0, 100); } @@ -142,7 +151,7 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest { } } - private void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { + private void runTest(ConnectorPropertyFactory connectorPropertyFactory, Producer producer) throws ExecutionException, InterruptedException { connectorPropertyFactory.log(); getKafkaConnectService().initializeConnector(connectorPropertyFactory); @@ -151,14 +160,7 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest { LOG.debug("Creating the consumer ..."); service.submit(() -> consumeJMSMessages()); - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - - for (int i = 0; i < expect; i++) { - LOG.debug("Sending message 1/2"); - kafkaClient.produce(topic, "Sink test message " + i); - LOG.debug("Sending message 2/2"); - kafkaClient.produce(topic, "Sink test message " + i); - } + producer.producerMessages(); LOG.debug("Waiting for the messages to be processed"); service.shutdown(); @@ -170,6 +172,39 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest { } } + private void produceMessagesNoProperties() { + try { + KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + + for (int i = 0; i < expect; i++) { + LOG.debug("Sending message 1/2"); + kafkaClient.produce(topic, "Sink test message " + i); + LOG.debug("Sending message 2/2"); + kafkaClient.produce(topic, "Sink test message " + i); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } + + private void produceMessagesWithProperties() { + try { + KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + + for (int i = 0; i < expect; i++) { + Map<String, String> headers = new HashMap<>(); + int randomNumber = TestUtils.randomWithRange(1, 1000); + + headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "MessageNumber", String.valueOf(i)); + + kafkaClient.produce(topic, "Sink test message " + randomNumber, headers); + kafkaClient.produce(topic, "Sink test message " + randomNumber + 1, headers); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } + @Test @Timeout(90) public void testIdempotentBodySendReceive() { @@ -184,7 +219,30 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest { .withExpressionType("body") .end(); - runTest(connectorPropertyFactory); + runTest(connectorPropertyFactory, this::produceMessagesNoProperties); + + } catch (Exception e) { + LOG.error("JMS test failed: {}", e.getMessage(), e); + fail(e.getMessage()); + } + } + + @Test + @Timeout(90) + public void testIdempotentHeaderSendReceive() { + try { + ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory + .basic() + .withTopics(topic) + .withConnectionProperties(connectionProperties()) + .withDestinationName(destinationName) + .withIdempotency() + .withRepositoryType("memory") + .withExpressionType("header") + .withExpressionHeader("MessageNumber") + .end(); + + runTest(connectorPropertyFactory, this::produceMessagesWithProperties); } catch (Exception e) { LOG.error("JMS test failed: {}", e.getMessage(), e);