This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 4b7ba490a56b7196774ba9e5e00c87e0f4a7bc74
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Tue Feb 2 17:43:09 2021 +0100

    Added a basic test for idempotency
---
 .../common/BasicConnectorPropertyFactory.java      |   4 +
 .../common/IdempotencyConfigBuilder.java           |  78 +++++++++
 .../kafkaconnector/sjms2/clients/JMSClient.java    |  43 +++--
 .../sjms2/sink/CamelSinkIdempotentJMSITCase.java   | 192 +++++++++++++++++++++
 4 files changed, 307 insertions(+), 10 deletions(-)

diff --git 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java
 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java
index a9d012e..0e98490 100644
--- 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java
+++ 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java
@@ -63,6 +63,10 @@ public abstract class BasicConnectorPropertyFactory<T 
extends BasicConnectorProp
         return (T) this;
     }
 
+    public IdempotencyConfigBuilder<T> withIdempotency() {
+        return new IdempotencyConfigBuilder<>((T) this, connectorProps);
+    }
+
     /**
      * This enables sending failed records to the DLQ. Note: it automatically 
configure other required/recommended
      * options!
diff --git 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/IdempotencyConfigBuilder.java
 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/IdempotencyConfigBuilder.java
new file mode 100644
index 0000000..2cde885
--- /dev/null
+++ 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/IdempotencyConfigBuilder.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.common;
+
+import java.util.Properties;
+
+public class IdempotencyConfigBuilder<T extends ConnectorPropertyFactory> {
+    private final T handle;
+    private final Properties properties;
+
+    public IdempotencyConfigBuilder(T handle, Properties properties) {
+        this.handle = handle;
+        this.properties = properties;
+
+        withEnabled(true);
+    }
+
+    private IdempotencyConfigBuilder<T> withEntry(String key, Object value) {
+        properties.put("camel.idempotency." + key, value);
+
+        return this;
+    }
+
+    public IdempotencyConfigBuilder<T> withEnabled(boolean value) {
+        return withEntry("enabled", value);
+    }
+
+    public IdempotencyConfigBuilder<T> withRepositoryType(String value) {
+        return withEntry("repository.type", value);
+    }
+
+    public IdempotencyConfigBuilder<T> withExpressionType(String value) {
+        return withEntry("expression.type", value);
+    }
+
+    public IdempotencyConfigBuilder<T> withExpressionHeader(String value) {
+        return withEntry("expression.header", value);
+    }
+
+    public IdempotencyConfigBuilder<T> withMemoryDimension(String value) {
+        return withEntry("memory.dimension", value);
+    }
+
+    public IdempotencyConfigBuilder<T> withKafkaTopic(String value) {
+        return withEntry("kafka.topic", value);
+    }
+
+    public IdempotencyConfigBuilder<T> withKafkaBootstrapServers(String value) 
{
+        return withEntry("kafka.bootstrap.servers", value);
+    }
+
+    public IdempotencyConfigBuilder<T> withKafkaMaxCacheSize(String value) {
+        return withEntry("kafka.max.cache.size", value);
+    }
+
+    public IdempotencyConfigBuilder<T> withKafkaPollDurationMs(String value) {
+        return withEntry("kafka.poll.duration.ms", value);
+    }
+
+    public T end() {
+        return handle;
+    }
+}
diff --git 
a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/clients/JMSClient.java
 
b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/clients/JMSClient.java
index 43586c6..42e4103 100644
--- 
a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/clients/JMSClient.java
+++ 
b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/clients/JMSClient.java
@@ -161,6 +161,37 @@ public class JMSClient {
         }
     }
 
+    /**
+     * Receives data from a JMS queue or topic
+     *
+     * @param predicate the predicate used to test each received message
+     * @throws JMSException
+     */
+    public void receive(MessageConsumer consumer, Predicate<Message> 
predicate, long timeout) throws JMSException {
+        while (true) {
+            final Message message = consumer.receive(timeout);
+
+            if (!predicate.test(message)) {
+                return;
+            }
+        }
+    }
+
+
+    /**
+     * Receives data from a JMS queue or topic
+     *
+     * @param predicate the predicate used to test each received message
+     * @throws JMSException
+     */
+    public void receive(MessageConsumer consumer, Predicate<Message> 
predicate) throws JMSException {
+        receive(consumer, predicate, 3000);
+    }
+
+    public MessageConsumer createConsumer(String queue) throws JMSException {
+        return session.createConsumer(createDestination(queue));
+    }
+
 
     /**
      * Receives data from a JMS queue or topic
@@ -170,20 +201,12 @@ public class JMSClient {
      * @throws JMSException
      */
     public void receive(final String queue, Predicate<Message> predicate) 
throws JMSException {
-        final long timeout = 3000;
-
         MessageConsumer consumer = null;
 
         try {
-            consumer = session.createConsumer(createDestination(queue));
-
-            while (true) {
-                final Message message = consumer.receive(timeout);
+            consumer = createConsumer(queue);
 
-                if (!predicate.test(message)) {
-                    return;
-                }
-            }
+            receive(consumer, predicate);
         } finally {
             capturingClose(consumer);
         }
diff --git 
a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
 
b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
new file mode 100644
index 0000000..8eceee2
--- /dev/null
+++ 
b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.sjms2.sink;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient;
+import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
+import 
org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer;
+import org.apache.camel.test.infra.messaging.services.MessagingService;
+import org.apache.camel.test.infra.messaging.services.MessagingServiceBuilder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Integration tests for the JMS sink using idempotent features
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
+    @RegisterExtension
+    public static MessagingService jmsService = MessagingServiceBuilder
+            .newBuilder(DispatchRouterContainer::new)
+            .withEndpointProvider(DispatchRouterContainer::defaultEndpoint)
+            .build();
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CamelSinkIdempotentJMSITCase.class);
+
+    private String topic;
+    private int received;
+    private final int expect = 10;
+
+    private Properties connectionProperties() {
+        Properties properties = new Properties();
+
+        properties.put("camel.component.sjms2.connection-factory", 
"#class:org.apache.qpid.jms.JmsConnectionFactory");
+        properties.put("camel.component.sjms2.connection-factory.remoteURI", 
jmsService.defaultEndpoint());
+
+        return properties;
+    }
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-sjms2-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        LOG.info("JMS service running at {}", jmsService.defaultEndpoint());
+        received = 0;
+        topic = TestUtils.getDefaultTestTopic(this.getClass());
+    }
+
+    private boolean checkRecord(Message jmsMessage) {
+        if (jmsMessage instanceof TextMessage) {
+            try {
+                LOG.debug("Received: {}", ((TextMessage) 
jmsMessage).getText());
+
+                received++;
+
+                return true;
+            } catch (JMSException e) {
+                LOG.error("Failed to read message: {}", e.getMessage(), e);
+                fail("Failed to read message: " + e.getMessage());
+            }
+        }
+
+        return false;
+    }
+
+
+    private void consumeJMSMessages() {
+        JMSClient jmsClient = null;
+
+        try {
+            jmsClient = JMSClient.newClient(jmsService.defaultEndpoint());
+            jmsClient.start();
+
+            try (MessageConsumer consumer = 
jmsClient.createConsumer(SJMS2Common.DEFAULT_JMS_QUEUE)) {
+                // number of retries until stale
+                int retries = 10;
+
+                while (retries > 0) {
+                    LOG.debug("Waiting for JMS messages (received {} of {} / 
retry {})", received, expect, retries);
+                    jmsClient.receive(consumer, this::checkRecord, 1000);
+
+                    // Once staled for 'retries', then it means no more data 
to receive (hopefully)
+                    if (expect == received) {
+                        retries--;
+                    } else {
+                        retries = 10;
+                    }
+                }
+            }
+        } catch (InterruptedException e) {
+            LOG.warn("Interrupted, stopping ...");
+            Thread.currentThread().interrupt();
+        } catch (Exception e) {
+            LOG.error("JMS test failed: {}", e.getMessage(), e);
+            fail(e.getMessage());
+        } finally {
+            if (jmsClient != null) {
+                jmsClient.stop();
+            }
+        }
+    }
+
+    private void runTest(ConnectorPropertyFactory connectorPropertyFactory) 
throws ExecutionException, InterruptedException {
+        connectorPropertyFactory.log();
+        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+
+        ExecutorService service = Executors.newCachedThreadPool();
+
+        LOG.debug("Creating the consumer ...");
+        service.submit(() -> consumeJMSMessages());
+
+        KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
+
+        for (int i = 0; i < expect; i++) {
+            LOG.debug("Sending message 1/2");
+            kafkaClient.produce(topic, "Sink test message " + i);
+            LOG.debug("Sending message 2/2");
+            kafkaClient.produce(topic, "Sink test message " + i);
+        }
+
+        LOG.debug("Waiting for the messages to be processed");
+        service.shutdown();
+
+        if (service.awaitTermination(25, TimeUnit.SECONDS)) {
+            assertEquals(received, expect, "Didn't process the expected amount 
of messages: " + received + " != " + expect);
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
+    }
+
+    @Test
+    @Timeout(90)
+    public void testIdempotentBodySendReceive() {
+        try {
+            ConnectorPropertyFactory connectorPropertyFactory = 
CamelJMSPropertyFactory
+                    .basic()
+                    .withTopics(topic)
+                    .withConnectionProperties(connectionProperties())
+                    .withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE)
+                    .withIdempotency()
+                        .withRepositoryType("memory")
+                        .withExpressionType("body")
+                        .end();
+
+            runTest(connectorPropertyFactory);
+
+        } catch (Exception e) {
+            LOG.error("JMS test failed: {}", e.getMessage(), e);
+            fail(e.getMessage());
+        }
+    }
+}

Reply via email to