This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit f2dc1aa69b420ec84e65b949ac15a3959a60e5c5 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Wed Feb 3 14:52:35 2021 +0100 Convert the SJMS2 tests to the new reusable sink test base class --- .../common/test/CamelSinkTestSupport.java | 29 ++++- .../common/test/TestMessageProducer.java | 23 ++++ .../sjms2/sink/CamelSinkIdempotentJMSITCase.java | 73 ++++++------- .../sjms2/sink/CamelSinkJMSITCase.java | 118 ++++++++------------- 4 files changed, 127 insertions(+), 116 deletions(-) diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java index 9f8460f..bd02eef 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java @@ -20,6 +20,7 @@ package org.apache.camel.kafkaconnector.common.test; import java.time.Duration; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -60,7 +61,26 @@ public abstract class CamelSinkTestSupport extends AbstractKafkaTest { } } - public void runTest(ConnectorPropertyFactory connectorPropertyFactory, String topic, int count) throws Exception { + /** + * A simple test runner that follows the steps: initialize, start consumer, produce messages, verify results + * + * @param connectorPropertyFactory A factory for connector properties + * @param topic the topic to send the messages to + * @param count the number of messages to send + * @throws Exception For test-specific exceptions + */ + protected void runTest(ConnectorPropertyFactory connectorPropertyFactory, String topic, int count) throws Exception { + runTest(connectorPropertyFactory, () -> produceMessages(topic, count)); + } + + /** + * A more flexible test runner that can use a custom producer of test messages + * @param connectorPropertyFactory a factory for connector properties + * @param producer the test message producer + * @throws ExecutionException + * @throws InterruptedException + */ + protected void runTest(ConnectorPropertyFactory connectorPropertyFactory, TestMessageProducer producer) throws ExecutionException, InterruptedException { connectorPropertyFactory.log(); getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1); @@ -70,13 +90,16 @@ public abstract class CamelSinkTestSupport extends AbstractKafkaTest { CountDownLatch latch = new CountDownLatch(1); service.submit(() -> consumeMessages(latch)); - LOG.debug("Creating the producer and sending messages ..."); - produceMessages(topic, count); + producer.producerMessages(); + + LOG.debug("Waiting for the messages to be processed"); + service.shutdown(); LOG.debug("Waiting for the test to complete"); verifyMessages(latch); } + protected boolean waitForData() { try { Thread.sleep(Duration.ofSeconds(1).toMillis()); diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java new file mode 100644 index 0000000..dedcf97 --- /dev/null +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.common.test; + +@FunctionalInterface +public interface TestMessageProducer { + void producerMessages(); +} diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java index d2f06a7..432a20a 100644 --- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java +++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java @@ -20,9 +20,7 @@ package org.apache.camel.kafkaconnector.sjms2.sink; import java.util.HashMap; import java.util.Map; import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javax.jms.JMSException; @@ -31,9 +29,9 @@ import javax.jms.MessageConsumer; import javax.jms.TextMessage; import org.apache.camel.kafkaconnector.CamelSinkTask; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient; import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common; @@ -55,12 +53,7 @@ import static org.junit.jupiter.api.Assertions.fail; * Integration tests for the JMS sink using idempotent features */ @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest { - @FunctionalInterface - interface Producer { - void producerMessages(); - } - +public class CamelSinkIdempotentJMSITCase extends CamelSinkTestSupport { @RegisterExtension public static MessagingService jmsService = MessagingServiceBuilder .newBuilder(DispatchRouterContainer::new) @@ -97,25 +90,13 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest { destinationName = SJMS2Common.DEFAULT_JMS_QUEUE + "-" + TestUtils.randomWithRange(0, 100); } - private boolean checkRecord(Message jmsMessage) { - if (jmsMessage instanceof TextMessage) { - try { - LOG.debug("Received: {}", ((TextMessage) jmsMessage).getText()); - - received++; - - return true; - } catch (JMSException e) { - LOG.error("Failed to read message: {}", e.getMessage(), e); - fail("Failed to read message: " + e.getMessage()); - } - } - - return false; + @Override + protected Map<String, String> messageHeaders(String text, int current) { + return null; } - - private void consumeJMSMessages() { + @Override + protected void consumeMessages(CountDownLatch latch) { JMSClient jmsClient = null; try { @@ -145,31 +126,39 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest { LOG.error("JMS test failed: {}", e.getMessage(), e); fail(e.getMessage()); } finally { + latch.countDown(); + if (jmsClient != null) { jmsClient.stop(); } } } - private void runTest(ConnectorPropertyFactory connectorPropertyFactory, Producer producer) throws ExecutionException, InterruptedException { - connectorPropertyFactory.log(); - getKafkaConnectService().initializeConnector(connectorPropertyFactory); - - ExecutorService service = Executors.newCachedThreadPool(); - - LOG.debug("Creating the consumer ..."); - service.submit(() -> consumeJMSMessages()); + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { + if (latch.await(25, TimeUnit.SECONDS)) { + assertEquals(received, expect, "Didn't process the expected amount of messages: " + received + + " != " + expect); + } else { + fail("Failed to receive the messages within the specified time"); + } + } - producer.producerMessages(); + private boolean checkRecord(Message jmsMessage) { + if (jmsMessage instanceof TextMessage) { + try { + LOG.debug("Received: {}", ((TextMessage) jmsMessage).getText()); - LOG.debug("Waiting for the messages to be processed"); - service.shutdown(); + received++; - if (service.awaitTermination(25, TimeUnit.SECONDS)) { - assertEquals(received, expect, "Didn't process the expected amount of messages: " + received + " != " + expect); - } else { - fail("Failed to receive the messages within the specified time"); + return true; + } catch (JMSException e) { + LOG.error("Failed to read message: {}", e.getMessage(), e); + fail("Failed to read message: " + e.getMessage()); + } } + + return false; } private void produceMessagesNoProperties() { diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java index 41b87a8..5e9b66d 100644 --- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java +++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java @@ -17,11 +17,9 @@ package org.apache.camel.kafkaconnector.sjms2.sink; +import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import javax.jms.JMSException; @@ -29,10 +27,8 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.TextMessage; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; -import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient; import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common; import org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer; @@ -53,7 +49,7 @@ import static org.junit.jupiter.api.Assertions.fail; * Integration tests for the JMS sink */ @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class CamelSinkJMSITCase extends AbstractKafkaTest { +public class CamelSinkJMSITCase extends CamelSinkTestSupport { @RegisterExtension public static MessagingService jmsService = MessagingServiceBuilder .newBuilder(DispatchRouterContainer::new) @@ -62,6 +58,7 @@ public class CamelSinkJMSITCase extends AbstractKafkaTest { private static final Logger LOG = LoggerFactory.getLogger(CamelSinkJMSITCase.class); + private String topicName; private int received; private final int expect = 10; @@ -83,6 +80,22 @@ public class CamelSinkJMSITCase extends AbstractKafkaTest { public void setUp() { LOG.info("JMS service running at {}", jmsService.defaultEndpoint()); received = 0; + + topicName = getTopicForTest(this); + } + + @Override + protected Map<String, String> messageHeaders(String text, int current) { + return null; + } + + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { + if (latch.await(35, TimeUnit.SECONDS)) { + assertEquals(received, expect, "Didn't process the expected amount of messages: " + received + " != " + expect); + } else { + fail("Failed to receive the messages within the specified time"); + } } private boolean checkRecord(Message jmsMessage) { @@ -106,70 +119,8 @@ public class CamelSinkJMSITCase extends AbstractKafkaTest { return false; } - private void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { - connectorPropertyFactory.log(); - getKafkaConnectService().initializeConnector(connectorPropertyFactory); - - CountDownLatch latch = new CountDownLatch(1); - - ExecutorService service = Executors.newCachedThreadPool(); - - LOG.debug("Creating the consumer ..."); - service.submit(() -> consumeJMSMessages(latch)); - - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - - for (int i = 0; i < expect; i++) { - kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test message " + i); - } - - LOG.debug("Created the consumer ... About to receive messages"); - - if (latch.await(35, TimeUnit.SECONDS)) { - assertEquals(received, expect, "Didn't process the expected amount of messages: " + received + " != " + expect); - } else { - fail("Failed to receive the messages within the specified time"); - } - } - - @Test - @Timeout(90) - public void testBasicSendReceive() { - try { - ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory - .basic() - .withTopics(TestUtils.getDefaultTestTopic(this.getClass())) - .withConnectionProperties(connectionProperties()) - .withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE); - - runTest(connectorPropertyFactory); - - } catch (Exception e) { - LOG.error("JMS test failed: {}", e.getMessage(), e); - fail(e.getMessage()); - } - } - - @Test - @Timeout(90) - public void testBasicSendReceiveUsingUrl() { - try { - ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory - .basic() - .withTopics(TestUtils.getDefaultTestTopic(this.getClass())) - .withConnectionProperties(connectionProperties()) - .withUrl(SJMS2Common.DEFAULT_JMS_QUEUE) - .buildUrl(); - - runTest(connectorPropertyFactory); - - } catch (Exception e) { - LOG.error("JMS test failed: {}", e.getMessage(), e); - fail(e.getMessage()); - } - } - - private void consumeJMSMessages(CountDownLatch latch) { + @Override + protected void consumeMessages(CountDownLatch latch) { JMSClient jmsClient = null; try { @@ -193,4 +144,29 @@ public class CamelSinkJMSITCase extends AbstractKafkaTest { } } } + + @Test + @Timeout(90) + public void testBasicSendReceive() throws Exception { + ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory + .basic() + .withTopics(topicName) + .withConnectionProperties(connectionProperties()) + .withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE); + + runTest(connectorPropertyFactory, topicName, expect); + } + + @Test + @Timeout(90) + public void testBasicSendReceiveUsingUrl() throws Exception { + ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory + .basic() + .withTopics(topicName) + .withConnectionProperties(connectionProperties()) + .withUrl(SJMS2Common.DEFAULT_JMS_QUEUE) + .buildUrl(); + + runTest(connectorPropertyFactory, topicName, expect); + } }