This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new d20fcdd Adds basic aggregation test (github issue #291) new 0fa09ac Merge pull request #388 from orpiske/aggregation-test d20fcdd is described below commit d20fcdd5aca5ddfef25fee60d0a6dcf3936a9f9a Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Thu Aug 20 16:56:32 2020 +0200 Adds basic aggregation test (github issue #291) --- .../common/SourceConnectorPropertyFactory.java | 8 ++ .../common/clients/kafka/KafkaClient.java | 16 +++ .../kafkaconnector/sjms2/clients/JMSClient.java | 54 ++++++++++ .../sjms2/source/CamelSourceJMSITCase.java | 50 ++------- .../source/CamelSourceJMSWithAggregation.java | 120 +++++++++++++++++++++ 5 files changed, 204 insertions(+), 44 deletions(-) diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SourceConnectorPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SourceConnectorPropertyFactory.java index 8a6d661..b1d5cb9 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SourceConnectorPropertyFactory.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SourceConnectorPropertyFactory.java @@ -30,4 +30,12 @@ public abstract class SourceConnectorPropertyFactory<T extends SourceConnectorPr return (T) this; } + + public T withAggregate(String aggregate, int size, int timeout) { + withBeans("aggregate", classRef(aggregate)); + getProperties().put("camel.beans.aggregation.size", size); + getProperties().put("camel.beans.aggregation.timeout", timeout); + + return (T) this; + } } diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java index e9846ed..a124a5d 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.function.Consumer; import java.util.function.Predicate; import org.apache.kafka.clients.admin.AdminClient; @@ -82,6 +83,21 @@ public class KafkaClient<K, V> { consumer = new KafkaConsumer<>(consumerPropertyFactory.getProperties()); } + /** + * Consumes message from the given topic + * + * @param topic the topic to consume the messages from + * @param recordConsumer the a function to consume the received messages + */ + public void consumeAvailable(String topic, Consumer<ConsumerRecord<K, V>> recordConsumer) { + consumer.subscribe(Arrays.asList(topic)); + + ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord<K, V> record : records) { + recordConsumer.accept(record); + } + } + /** * Consumes message from the given topic until the predicate returns false diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/clients/JMSClient.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/clients/JMSClient.java index 0e5a175..7aefc5a 100644 --- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/clients/JMSClient.java +++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/clients/JMSClient.java @@ -34,6 +34,8 @@ import org.junit.jupiter.api.Assertions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.junit.jupiter.api.Assertions.fail; + /** * A basic multi-protocol JMS client */ @@ -235,4 +237,56 @@ public class JMSClient { capturingClose(producer); } } + + + public static void produceMessages(JMSClient jmsProducer, String queue, int count, Function<Integer, String> supplier) { + try { + jmsProducer.start(); + for (int i = 0; i < count; i++) { + jmsProducer.send(queue, supplier.apply(i)); + } + } catch (JMSException e) { + LOG.error("JMS exception trying to send messages to the queue: {}", e.getMessage(), e); + fail(e.getMessage()); + } catch (Exception e) { + LOG.error("Failed to send messages to the queue: {}", e.getMessage(), e); + fail(e.getMessage()); + } finally { + jmsProducer.stop(); + } + } + + public static void produceMessages(JMSClient jmsProducer, String queue, int count, String baseText) { + try { + jmsProducer.start(); + for (int i = 0; i < count; i++) { + jmsProducer.send(queue, baseText + " " + i); + } + } catch (JMSException e) { + LOG.error("JMS exception trying to send messages to the queue: {}", e.getMessage(), e); + fail(e.getMessage()); + } catch (Exception e) { + LOG.error("Failed to send messages to the queue: {}", e.getMessage(), e); + fail(e.getMessage()); + } finally { + jmsProducer.stop(); + } + } + + public static void produceMessages(JMSClient jmsProducer, String queue, int count) { + try { + jmsProducer.start(); + for (int i = 0; i < count; i++) { + jmsProducer.send(queue, i); + } + } catch (JMSException e) { + LOG.error("JMS exception trying to send messages to the queue: {}", e.getMessage(), e); + fail(e.getMessage()); + } catch (Exception e) { + LOG.error("Failed to send messages to the queue: {}", e.getMessage(), e); + fail(e.getMessage()); + } finally { + jmsProducer.stop(); + } + } } diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java index 1dd8cae..addc2b5 100644 --- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java +++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java @@ -19,8 +19,6 @@ package org.apache.camel.kafkaconnector.sjms2.source; import java.util.concurrent.ExecutionException; -import javax.jms.JMSException; - import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; @@ -55,6 +53,7 @@ public class CamelSourceJMSITCase extends AbstractKafkaTest { private int received; private final int expect = 10; + private JMSClient jmsClient; @Override protected String[] getConnectorsInTest() { @@ -64,6 +63,7 @@ public class CamelSourceJMSITCase extends AbstractKafkaTest { @BeforeEach public void setUp() { received = 0; + jmsClient = jmsService.getClient(); } private <T> boolean checkRecord(ConsumerRecord<String, T> record) { @@ -77,53 +77,13 @@ public class CamelSourceJMSITCase extends AbstractKafkaTest { return true; } - private void produceMessages(String queue, String baseText) { - JMSClient jmsProducer = null; - try { - jmsProducer = jmsService.getClient(); - - jmsProducer.start(); - for (int i = 0; i < expect; i++) { - jmsProducer.send(queue, baseText + " " + i); - } - } catch (JMSException e) { - LOG.error("JMS exception trying to send messages to the queue: {}", e.getMessage(), e); - fail(e.getMessage()); - } catch (Exception e) { - LOG.error("Failed to send messages to the queue: {}", e.getMessage(), e); - fail(e.getMessage()); - } finally { - jmsProducer.stop(); - } - } - - private void produceMessages(String queue) { - JMSClient jmsProducer = null; - - try { - jmsProducer = jmsService.getClient(); - - jmsProducer.start(); - for (int i = 0; i < expect; i++) { - jmsProducer.send(queue, i); - } - } catch (JMSException e) { - LOG.error("JMS exception trying to send messages to the queue: {}", e.getMessage(), e); - fail(e.getMessage()); - } catch (Exception e) { - LOG.error("Failed to send messages to the queue: {}", e.getMessage(), e); - fail(e.getMessage()); - } finally { - jmsProducer.stop(); - } - } public void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { connectorPropertyFactory.log(); getKafkaConnectService().initializeConnector(connectorPropertyFactory); - produceMessages(SJMS2Common.DEFAULT_JMS_QUEUE, "Test string message"); + JMSClient.produceMessages(jmsClient, SJMS2Common.DEFAULT_JMS_QUEUE, expect, "Test string message"); LOG.debug("Creating the consumer ..."); KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); @@ -184,7 +144,7 @@ public class CamelSourceJMSITCase extends AbstractKafkaTest { connectorPropertyFactory.log(); getKafkaConnectService().initializeConnector(connectorPropertyFactory); - produceMessages(jmsQueueName); + JMSClient.produceMessages(jmsClient, jmsQueueName, expect); LOG.debug("Creating the consumer ..."); KafkaClient<String, Integer> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); @@ -198,4 +158,6 @@ public class CamelSourceJMSITCase extends AbstractKafkaTest { } } + + } diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java new file mode 100644 index 0000000..e9ea5f4 --- /dev/null +++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.sjms2.source; + +import java.util.concurrent.ExecutionException; + +import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient; +import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common; +import org.apache.camel.kafkaconnector.sjms2.services.JMSService; +import org.apache.camel.kafkaconnector.sjms2.services.JMSServiceFactory; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +public class CamelSourceJMSWithAggregation extends AbstractKafkaTest { + @RegisterExtension + public static JMSService jmsService = JMSServiceFactory.createService(); + + private static final Logger LOG = LoggerFactory.getLogger(CamelSourceJMSITCase.class); + + private int received; + private final int sentSize = 10; + private final int expect = 1; + private JMSClient jmsClient; + private String receivedMessage = ""; + private String expectedMessage = ""; + private String queueName; + + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-sjms2-kafka-connector"}; + } + + @BeforeEach + public void setUp() { + received = 0; + jmsClient = jmsService.getClient(); + + for (int i = 0; i < sentSize - 1; i++) { + expectedMessage += "hello;\n"; + } + + expectedMessage += "hello;"; + queueName = SJMS2Common.DEFAULT_JMS_QUEUE + "." + TestUtils.randomWithRange(1, 100); + } + + private void checkRecord(ConsumerRecord<String, String> record) { + receivedMessage += record.value(); + LOG.debug("Received: {}", receivedMessage); + + received++; + } + + private static String textToSend(Integer i) { + return "hello;"; + } + + + public void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { + connectorPropertyFactory.log(); + getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1); + + JMSClient.produceMessages(jmsClient, queueName, sentSize, + CamelSourceJMSWithAggregation::textToSend); + + LOG.debug("Creating the consumer ..."); + KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + kafkaClient.consumeAvailable(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord); + LOG.debug("Created the consumer ..."); + + assertEquals(expect, received, "Didn't process the expected amount of messages"); + assertEquals(expectedMessage, receivedMessage, "The messages don't match"); + } + + @Test + @Timeout(90) + public void testBasicSendReceive() { + try { + ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory + .basic() + .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) + .withDestinationName(queueName) + .withConnectionProperties(jmsService.getConnectionProperties()) + .withAggregate("org.apache.camel.kafkaconnector.aggregator.StringAggregator", sentSize, + 1000); + + runBasicStringTest(connectorPropertyFactory); + } catch (Exception e) { + LOG.error("JMS test failed: {}", e.getMessage(), e); + fail(e.getMessage()); + } + } +}