This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new d20fcdd  Adds basic aggregation test (github issue #291)
     new 0fa09ac  Merge pull request #388 from orpiske/aggregation-test
d20fcdd is described below

commit d20fcdd5aca5ddfef25fee60d0a6dcf3936a9f9a
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Thu Aug 20 16:56:32 2020 +0200

    Adds basic aggregation test (github issue #291)
---
 .../common/SourceConnectorPropertyFactory.java     |   8 ++
 .../common/clients/kafka/KafkaClient.java          |  16 +++
 .../kafkaconnector/sjms2/clients/JMSClient.java    |  54 ++++++++++
 .../sjms2/source/CamelSourceJMSITCase.java         |  50 ++-------
 .../source/CamelSourceJMSWithAggregation.java      | 120 +++++++++++++++++++++
 5 files changed, 204 insertions(+), 44 deletions(-)

diff --git 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SourceConnectorPropertyFactory.java
 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SourceConnectorPropertyFactory.java
index 8a6d661..b1d5cb9 100644
--- 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SourceConnectorPropertyFactory.java
+++ 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SourceConnectorPropertyFactory.java
@@ -30,4 +30,12 @@ public abstract class SourceConnectorPropertyFactory<T 
extends SourceConnectorPr
 
         return (T) this;
     }
+
+    public T withAggregate(String aggregate, int size, int timeout) {
+        withBeans("aggregate", classRef(aggregate));
+        getProperties().put("camel.beans.aggregation.size", size);
+        getProperties().put("camel.beans.aggregation.timeout", timeout);
+
+        return (T) this;
+    }
 }
diff --git 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java
 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java
index e9846ed..a124a5d 100644
--- 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java
+++ 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.function.Consumer;
 import java.util.function.Predicate;
 
 import org.apache.kafka.clients.admin.AdminClient;
@@ -82,6 +83,21 @@ public class KafkaClient<K, V> {
         consumer = new 
KafkaConsumer<>(consumerPropertyFactory.getProperties());
     }
 
+    /**
+     * Consumes message from the given topic
+     *
+     * @param topic     the topic to consume the messages from
+     * @param recordConsumer the a function to consume the received messages
+     */
+    public void consumeAvailable(String topic, Consumer<ConsumerRecord<K, V>> 
recordConsumer) {
+        consumer.subscribe(Arrays.asList(topic));
+
+        ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(100));
+        for (ConsumerRecord<K, V> record : records) {
+            recordConsumer.accept(record);
+        }
+    }
+
 
     /**
      * Consumes message from the given topic until the predicate returns false
diff --git 
a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/clients/JMSClient.java
 
b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/clients/JMSClient.java
index 0e5a175..7aefc5a 100644
--- 
a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/clients/JMSClient.java
+++ 
b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/clients/JMSClient.java
@@ -34,6 +34,8 @@ import org.junit.jupiter.api.Assertions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.junit.jupiter.api.Assertions.fail;
+
 /**
  * A basic multi-protocol JMS client
  */
@@ -235,4 +237,56 @@ public class JMSClient {
             capturingClose(producer);
         }
     }
+
+
+    public static void produceMessages(JMSClient jmsProducer, String queue, 
int count, Function<Integer, String> supplier) {
+        try {
+            jmsProducer.start();
+            for (int i = 0; i < count; i++) {
+                jmsProducer.send(queue, supplier.apply(i));
+            }
+        } catch (JMSException e) {
+            LOG.error("JMS exception trying to send messages to the queue: 
{}", e.getMessage(), e);
+            fail(e.getMessage());
+        } catch (Exception e) {
+            LOG.error("Failed to send messages to the queue: {}", 
e.getMessage(), e);
+            fail(e.getMessage());
+        } finally {
+            jmsProducer.stop();
+        }
+    }
+
+    public static void produceMessages(JMSClient jmsProducer, String queue, 
int count, String baseText) {
+        try {
+            jmsProducer.start();
+            for (int i = 0; i < count; i++) {
+                jmsProducer.send(queue, baseText + " " + i);
+            }
+        } catch (JMSException e) {
+            LOG.error("JMS exception trying to send messages to the queue: 
{}", e.getMessage(), e);
+            fail(e.getMessage());
+        } catch (Exception e) {
+            LOG.error("Failed to send messages to the queue: {}", 
e.getMessage(), e);
+            fail(e.getMessage());
+        } finally {
+            jmsProducer.stop();
+        }
+    }
+
+    public static void produceMessages(JMSClient jmsProducer, String queue, 
int count) {
+        try {
+            jmsProducer.start();
+            for (int i = 0; i < count; i++) {
+                jmsProducer.send(queue, i);
+            }
+        } catch (JMSException e) {
+            LOG.error("JMS exception trying to send messages to the queue: 
{}", e.getMessage(), e);
+            fail(e.getMessage());
+        } catch (Exception e) {
+            LOG.error("Failed to send messages to the queue: {}", 
e.getMessage(), e);
+            fail(e.getMessage());
+        } finally {
+            jmsProducer.stop();
+        }
+    }
 }
diff --git 
a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java
 
b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java
index 1dd8cae..addc2b5 100644
--- 
a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java
+++ 
b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java
@@ -19,8 +19,6 @@ package org.apache.camel.kafkaconnector.sjms2.source;
 
 import java.util.concurrent.ExecutionException;
 
-import javax.jms.JMSException;
-
 import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
@@ -55,6 +53,7 @@ public class CamelSourceJMSITCase extends AbstractKafkaTest {
 
     private int received;
     private final int expect = 10;
+    private JMSClient jmsClient;
 
     @Override
     protected String[] getConnectorsInTest() {
@@ -64,6 +63,7 @@ public class CamelSourceJMSITCase extends AbstractKafkaTest {
     @BeforeEach
     public void setUp() {
         received = 0;
+        jmsClient = jmsService.getClient();
     }
 
     private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
@@ -77,53 +77,13 @@ public class CamelSourceJMSITCase extends AbstractKafkaTest 
{
         return true;
     }
 
-    private void produceMessages(String queue, String baseText) {
-        JMSClient jmsProducer = null;
 
-        try {
-            jmsProducer = jmsService.getClient();
-
-            jmsProducer.start();
-            for (int i = 0; i < expect; i++) {
-                jmsProducer.send(queue, baseText + " " + i);
-            }
-        } catch (JMSException e) {
-            LOG.error("JMS exception trying to send messages to the queue: 
{}", e.getMessage(), e);
-            fail(e.getMessage());
-        } catch (Exception e) {
-            LOG.error("Failed to send messages to the queue: {}", 
e.getMessage(), e);
-            fail(e.getMessage());
-        } finally {
-            jmsProducer.stop();
-        }
-    }
-
-    private void produceMessages(String queue) {
-        JMSClient jmsProducer = null;
-
-        try {
-            jmsProducer = jmsService.getClient();
-
-            jmsProducer.start();
-            for (int i = 0; i < expect; i++) {
-                jmsProducer.send(queue, i);
-            }
-        } catch (JMSException e) {
-            LOG.error("JMS exception trying to send messages to the queue: 
{}", e.getMessage(), e);
-            fail(e.getMessage());
-        } catch (Exception e) {
-            LOG.error("Failed to send messages to the queue: {}", 
e.getMessage(), e);
-            fail(e.getMessage());
-        } finally {
-            jmsProducer.stop();
-        }
-    }
 
     public void runBasicStringTest(ConnectorPropertyFactory 
connectorPropertyFactory) throws ExecutionException, InterruptedException {
         connectorPropertyFactory.log();
         getKafkaConnectService().initializeConnector(connectorPropertyFactory);
 
-        produceMessages(SJMS2Common.DEFAULT_JMS_QUEUE, "Test string message");
+        JMSClient.produceMessages(jmsClient, SJMS2Common.DEFAULT_JMS_QUEUE, 
expect, "Test string message");
 
         LOG.debug("Creating the consumer ...");
         KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
@@ -184,7 +144,7 @@ public class CamelSourceJMSITCase extends AbstractKafkaTest 
{
             connectorPropertyFactory.log();
             
getKafkaConnectService().initializeConnector(connectorPropertyFactory);
 
-            produceMessages(jmsQueueName);
+            JMSClient.produceMessages(jmsClient, jmsQueueName, expect);
 
             LOG.debug("Creating the consumer ...");
             KafkaClient<String, Integer> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
@@ -198,4 +158,6 @@ public class CamelSourceJMSITCase extends AbstractKafkaTest 
{
         }
 
     }
+
+
 }
diff --git 
a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java
 
b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java
new file mode 100644
index 0000000..e9ea5f4
--- /dev/null
+++ 
b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.sjms2.source;
+
+import java.util.concurrent.ExecutionException;
+
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient;
+import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
+import org.apache.camel.kafkaconnector.sjms2.services.JMSService;
+import org.apache.camel.kafkaconnector.sjms2.services.JMSServiceFactory;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class CamelSourceJMSWithAggregation extends AbstractKafkaTest {
+    @RegisterExtension
+    public static JMSService jmsService = JMSServiceFactory.createService();
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CamelSourceJMSITCase.class);
+
+    private int received;
+    private final int sentSize = 10;
+    private final int expect = 1;
+    private JMSClient jmsClient;
+    private String receivedMessage = "";
+    private String expectedMessage = "";
+    private String queueName;
+
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-sjms2-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        received = 0;
+        jmsClient = jmsService.getClient();
+
+        for (int i = 0; i < sentSize - 1; i++) {
+            expectedMessage += "hello;\n";
+        }
+
+        expectedMessage += "hello;";
+        queueName = SJMS2Common.DEFAULT_JMS_QUEUE + "." + 
TestUtils.randomWithRange(1, 100);
+    }
+
+    private void checkRecord(ConsumerRecord<String, String> record) {
+        receivedMessage += record.value();
+        LOG.debug("Received: {}", receivedMessage);
+
+        received++;
+    }
+
+    private static String textToSend(Integer i) {
+        return "hello;";
+    }
+
+
+    public void runBasicStringTest(ConnectorPropertyFactory 
connectorPropertyFactory) throws ExecutionException, InterruptedException {
+        connectorPropertyFactory.log();
+        
getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 
1);
+
+        JMSClient.produceMessages(jmsClient, queueName, sentSize,
+                CamelSourceJMSWithAggregation::textToSend);
+
+        LOG.debug("Creating the consumer ...");
+        KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
+        
kafkaClient.consumeAvailable(TestUtils.getDefaultTestTopic(this.getClass()), 
this::checkRecord);
+        LOG.debug("Created the consumer ...");
+
+        assertEquals(expect, received, "Didn't process the expected amount of 
messages");
+        assertEquals(expectedMessage, receivedMessage, "The messages don't 
match");
+    }
+
+    @Test
+    @Timeout(90)
+    public void testBasicSendReceive() {
+        try {
+            ConnectorPropertyFactory connectorPropertyFactory = 
CamelJMSPropertyFactory
+                    .basic()
+                    
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                    .withDestinationName(queueName)
+                    
.withConnectionProperties(jmsService.getConnectionProperties())
+                    
.withAggregate("org.apache.camel.kafkaconnector.aggregator.StringAggregator", 
sentSize,
+                            1000);
+
+            runBasicStringTest(connectorPropertyFactory);
+        } catch (Exception e) {
+            LOG.error("JMS test failed: {}", e.getMessage(), e);
+            fail(e.getMessage());
+        }
+    }
+}

Reply via email to