This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit e95b9fb3bd2f1874625fa81587a0400b52ce505f Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Mon Feb 8 21:21:28 2021 +0100 Converted the SJMS2 source test cases to use the reusable source base class --- .../common/test/AbstractTestMessageConsumer.java | 8 +- .../common/test/CamelSourceTestSupport.java | 45 +++++++- .../common/test/IntegerMessageConsumer.java | 29 +++++ .../sjms2/source/CamelSourceJMSITCase.java | 127 ++++++++------------- .../source/CamelSourceJMSWithAggregation.java | 110 +++++++++++------- 5 files changed, 186 insertions(+), 133 deletions(-) diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageConsumer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageConsumer.java index 2fcf42f..744abbf 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageConsumer.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageConsumer.java @@ -28,8 +28,8 @@ import org.slf4j.LoggerFactory; public abstract class AbstractTestMessageConsumer<T> implements TestMessageConsumer<T> { private static final Logger LOG = LoggerFactory.getLogger(AbstractTestMessageConsumer.class); - private final KafkaClient<String, T> kafkaClient; - private final String topicName; + protected final KafkaClient<String, T> kafkaClient; + protected final String topicName; private final int count; private final List<ConsumerRecord<String, T>> receivedMessages; private volatile int received; @@ -42,7 +42,7 @@ public abstract class AbstractTestMessageConsumer<T> implements TestMessageConsu receivedMessages = new ArrayList<>(count); } - private boolean checkRecord(ConsumerRecord<String, T> record) { + public boolean checkRecord(ConsumerRecord<String, T> record) { LOG.debug("Received: {}", record.value()); received++; receivedMessages.add(record); @@ -63,4 +63,6 @@ public abstract class AbstractTestMessageConsumer<T> implements TestMessageConsu public List<ConsumerRecord<String, T>> consumedMessages() { return receivedMessages; } + + } diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java index 35626a3..7f8b03c 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java @@ -33,7 +33,7 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest { protected abstract void verifyMessages(TestMessageConsumer<?> consumer); /** - * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results + * A simple test runner that follows the steps: initialize, start producer, consume messages, verify results * * @param connectorPropertyFactory A factory for connector properties * @param topic the topic to send the messages to @@ -49,7 +49,7 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest { /** - * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results + * A simple test runner that follows the steps: initialize, start producer, consume messages, verify results * * @param connectorPropertyFactory A factory for connector properties * @param consumer A Kafka consumer consumer for the test messages @@ -60,7 +60,7 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest { } /** - * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results + * A simple test runner that follows the steps: initialize, start producer, consume messages, verify results * * @param connectorPropertyFactory A factory for connector properties * @param consumer A Kafka consumer consumer for the test messages @@ -71,7 +71,6 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest { FunctionalTestMessageProducer producer) throws ExecutionException, InterruptedException { connectorPropertyFactory.log(); LOG.debug("Initialized the connector and put the data for the test execution"); -// getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1); getKafkaConnectService().initializeConnector(connectorPropertyFactory); LOG.debug("Producing test data to be collected by the connector and sent to Kafka"); @@ -86,4 +85,42 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest { LOG.debug("Verified messages"); } + /** + * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results + * + * @param connectorPropertyFactory A factory for connector properties + * @param consumer A Kafka consumer consumer for the test messages + * @throws Exception For test-specific exceptions + */ + public void runTestBlocking(ConnectorPropertyFactory connectorPropertyFactory, TestMessageConsumer<?> consumer) throws ExecutionException, InterruptedException { + runTestBlocking(connectorPropertyFactory, consumer, this::produceTestData); + } + + /** + * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results + * + * @param connectorPropertyFactory A factory for connector properties + * @param consumer A Kafka consumer consumer for the test messages + * @param producer A producer for the test messages + * @throws Exception For test-specific exceptions + */ + public void runTestBlocking(ConnectorPropertyFactory connectorPropertyFactory, TestMessageConsumer<?> consumer, + FunctionalTestMessageProducer producer) throws ExecutionException, InterruptedException { + connectorPropertyFactory.log(); + LOG.debug("Initialized the connector and put the data for the test execution"); + getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1); + + LOG.debug("Producing test data to be collected by the connector and sent to Kafka"); + producer.produceMessages(); + + LOG.debug("Creating the Kafka consumer ..."); + consumer.consumeMessages(); + LOG.debug("Ran the Kafka consumer ..."); + + LOG.debug("Verifying messages"); + verifyMessages(consumer); + LOG.debug("Verified messages"); + } + + } diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/IntegerMessageConsumer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/IntegerMessageConsumer.java new file mode 100644 index 0000000..a49e00e --- /dev/null +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/IntegerMessageConsumer.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.common.test; + +import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; + +/** + * A consumer that receives the 'count' amount of text messages from the Kafka broker + */ +public class IntegerMessageConsumer extends AbstractTestMessageConsumer<Integer> { + public IntegerMessageConsumer(KafkaClient<String, Integer> kafkaClient, String topicName, int count) { + super(kafkaClient, topicName, count); + } +} diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java index 5729c15..781e029 100644 --- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java +++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java @@ -20,26 +20,24 @@ package org.apache.camel.kafkaconnector.sjms2.source; import java.util.Properties; import java.util.concurrent.ExecutionException; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; -import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport; +import org.apache.camel.kafkaconnector.common.test.IntegerMessageConsumer; +import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient; import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common; import org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer; import org.apache.camel.test.infra.messaging.services.MessagingService; import org.apache.camel.test.infra.messaging.services.MessagingServiceBuilder; -import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.fail; /** @@ -47,16 +45,14 @@ import static org.junit.jupiter.api.Assertions.fail; * messages */ @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class CamelSourceJMSITCase extends AbstractKafkaTest { +public class CamelSourceJMSITCase extends CamelSourceTestSupport { @RegisterExtension public static MessagingService jmsService = MessagingServiceBuilder .newBuilder(DispatchRouterContainer::new) .withEndpointProvider(DispatchRouterContainer::defaultEndpoint) .build(); - private static final Logger LOG = LoggerFactory.getLogger(CamelSourceJMSITCase.class); - - private int received; + private String topicName; private final int expect = 10; private JMSClient jmsClient; @@ -76,101 +72,68 @@ public class CamelSourceJMSITCase extends AbstractKafkaTest { @BeforeEach public void setUp() { - received = 0; - jmsClient = JMSClient.newClient(jmsService.defaultEndpoint()); - } - - private <T> boolean checkRecord(ConsumerRecord<String, T> record) { - LOG.debug("Received: {}", record.value()); - received++; - - if (received == expect) { - return false; - } + topicName = getTopicForTest(this); - return true; } + @BeforeAll + public void setupClient() { + jmsClient = JMSClient.newClient(jmsService.defaultEndpoint()); + } - - public void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { - connectorPropertyFactory.log(); - getKafkaConnectService().initializeConnector(connectorPropertyFactory); - + @Override + protected void produceTestData() { JMSClient.produceMessages(jmsClient, SJMS2Common.DEFAULT_JMS_QUEUE, expect, "Test string message"); + } - LOG.debug("Creating the consumer ..."); - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord); - LOG.debug("Created the consumer ..."); - + @Override + protected void verifyMessages(TestMessageConsumer<?> consumer) { + int received = consumer.consumedMessages().size(); assertEquals(received, expect, "Didn't process the expected amount of messages"); } + @Test @Timeout(90) - public void testBasicSendReceive() { - try { - ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory - .basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) - .withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE) - .withConnectionProperties(connectionProperties()); - - runBasicStringTest(connectorPropertyFactory); - } catch (Exception e) { - LOG.error("JMS test failed: {}", e.getMessage(), e); - fail(e.getMessage()); - } + public void testBasicSendReceive() throws ExecutionException, InterruptedException { + ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory + .basic() + .withKafkaTopic(topicName) + .withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE) + .withConnectionProperties(connectionProperties()); + + runTest(connectorPropertyFactory, topicName, expect); } @Test @Timeout(90) - public void testBasicSendReceiveUsingUrl() { - try { - ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory - .basic() - .withConnectionProperties(connectionProperties()) - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) - .withUrl(SJMS2Common.DEFAULT_JMS_QUEUE) - .buildUrl(); - - runBasicStringTest(connectorPropertyFactory); - } catch (Exception e) { - LOG.error("JMS test failed: {}", e.getMessage(), e); - fail(e.getMessage()); - } + public void testBasicSendReceiveUsingUrl() throws ExecutionException, InterruptedException { + ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory + .basic() + .withConnectionProperties(connectionProperties()) + .withKafkaTopic(topicName) + .withUrl(SJMS2Common.DEFAULT_JMS_QUEUE) + .buildUrl(); + + runTest(connectorPropertyFactory, topicName, expect); } @Test @Timeout(90) - public void testIntSendReceive() { - try { - final String jmsQueueName = "testIntSendReceive"; - - ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory - .basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()) + jmsQueueName) - .withDestinationName(jmsQueueName) - .withConnectionProperties(connectionProperties()); - - connectorPropertyFactory.log(); - getKafkaConnectService().initializeConnector(connectorPropertyFactory); - - JMSClient.produceMessages(jmsClient, jmsQueueName, expect); + public void testIntSendReceive() throws ExecutionException, InterruptedException { + final String jmsQueueName = "testIntSendReceive"; - LOG.debug("Creating the consumer ..."); - KafkaClient<String, Integer> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()) + "testIntSendReceive", this::checkRecord); - LOG.debug("Created the consumer ..."); + ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory + .basic() + .withKafkaTopic(topicName) + .withDestinationName(jmsQueueName) + .withConnectionProperties(connectionProperties()); - assertEquals(received, expect, "Didn't process the expected amount of messages"); - } catch (Exception e) { - LOG.error("JMS test failed: {}", e.getMessage(), e); - fail(e.getMessage()); - } + KafkaClient<String, Integer> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + IntegerMessageConsumer consumer = new IntegerMessageConsumer(kafkaClient, topicName, expect); + runTest(connectorPropertyFactory, consumer, () -> JMSClient.produceMessages(jmsClient, jmsQueueName, expect)); } diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java index 0603a78..6b95304 100644 --- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java +++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java @@ -20,44 +20,65 @@ package org.apache.camel.kafkaconnector.sjms2.source; import java.util.Properties; import java.util.concurrent.ExecutionException; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport; +import org.apache.camel.kafkaconnector.common.test.StringMessageConsumer; +import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient; import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common; import org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer; import org.apache.camel.test.infra.messaging.services.MessagingService; import org.apache.camel.test.infra.messaging.services.MessagingServiceBuilder; -import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class CamelSourceJMSWithAggregation extends AbstractKafkaTest { +public class CamelSourceJMSWithAggregation extends CamelSourceTestSupport { @RegisterExtension public static MessagingService jmsService = MessagingServiceBuilder .newBuilder(DispatchRouterContainer::new) .withEndpointProvider(DispatchRouterContainer::defaultEndpoint) .build(); - private static final Logger LOG = LoggerFactory.getLogger(CamelSourceJMSITCase.class); - - private int received; private final int sentSize = 10; private final int expect = 1; private JMSClient jmsClient; - private String receivedMessage = ""; private String expectedMessage = ""; private String queueName; + private String topicName; + + class GreedyConsumer extends StringMessageConsumer { + + public GreedyConsumer(KafkaClient<String, String> kafkaClient, String topicName, int count) { + super(kafkaClient, topicName, count); + } + + @Override + public void consumeMessages() { + int retries = 10; + + do { + kafkaClient.consumeAvailable(super.topicName, super::checkRecord); + if (consumedMessages().size() == 0) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + break; + } + } + + } while (consumedMessages().size() == 0); + } + } private Properties connectionProperties() { Properties properties = new Properties(); @@ -73,9 +94,8 @@ public class CamelSourceJMSWithAggregation extends AbstractKafkaTest { return new String[] {"camel-sjms2-kafka-connector"}; } - @BeforeEach - public void setUp() { - received = 0; + @BeforeAll + public void setupClient() { jmsClient = JMSClient.newClient(jmsService.defaultEndpoint()); for (int i = 0; i < sentSize - 1; i++) { @@ -83,53 +103,55 @@ public class CamelSourceJMSWithAggregation extends AbstractKafkaTest { } expectedMessage += "hello;"; - queueName = SJMS2Common.DEFAULT_JMS_QUEUE + "." + TestUtils.randomWithRange(1, 100); } - private void checkRecord(ConsumerRecord<String, String> record) { - receivedMessage += record.value(); - LOG.debug("Received: {}", receivedMessage); + @BeforeEach + public void setUp() { + topicName = getTopicForTest(this); - received++; + queueName = SJMS2Common.DEFAULT_JMS_QUEUE + "." + TestUtils.randomWithRange(1, 100); } - private static String textToSend(Integer i) { - return "hello;"; + @Override + protected void produceTestData() { + JMSClient.produceMessages(jmsClient, queueName, sentSize, + CamelSourceJMSWithAggregation::textToSend); } + @Override + protected void verifyMessages(TestMessageConsumer<?> consumer) { + int received = consumer.consumedMessages().size(); - public void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { - connectorPropertyFactory.log(); - getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1); - - JMSClient.produceMessages(jmsClient, queueName, sentSize, - CamelSourceJMSWithAggregation::textToSend); + Object receivedObject = consumer.consumedMessages().get(0).value(); + if (!(receivedObject instanceof String)) { + fail("Unexpected message type"); + } - LOG.debug("Creating the consumer ..."); - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - kafkaClient.consumeAvailable(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord); - LOG.debug("Created the consumer ..."); + String receivedMessage = (String) receivedObject; assertEquals(expect, received, "Didn't process the expected amount of messages"); assertEquals(expectedMessage, receivedMessage, "The messages don't match"); } + + private static String textToSend(Integer i) { + return "hello;"; + } + @Test @Timeout(90) - public void testBasicSendReceive() { - try { - ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory - .basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) - .withDestinationName(queueName) - .withConnectionProperties(connectionProperties()) - .withAggregate("org.apache.camel.kafkaconnector.aggregator.StringAggregator", sentSize, - 1000); - - runBasicStringTest(connectorPropertyFactory); - } catch (Exception e) { - LOG.error("JMS test failed: {}", e.getMessage(), e); - fail(e.getMessage()); - } + public void testBasicSendReceive() throws ExecutionException, InterruptedException { + ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory + .basic() + .withKafkaTopic(topicName) + .withDestinationName(queueName) + .withConnectionProperties(connectionProperties()) + .withAggregate("org.apache.camel.kafkaconnector.aggregator.StringAggregator", sentSize, + 1000); + + KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + GreedyConsumer greedyConsumer = new GreedyConsumer(kafkaClient, topicName, expect); + + runTestBlocking(connectorPropertyFactory, greedyConsumer); } }