This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 58d450c2c44fa89f0d5f2d21e56a780088dab4d5
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Wed Feb 3 14:52:35 2021 +0100

    Convert the SJMS2 tests to the new reusable sink test base class
---
 .../common/test/CamelSinkTestSupport.java          |  29 ++++-
 .../common/test/TestMessageProducer.java           |  23 ++++
 .../sjms2/sink/CamelSinkIdempotentJMSITCase.java   |  73 ++++++-------
 .../sjms2/sink/CamelSinkJMSITCase.java             | 118 ++++++++-------------
 4 files changed, 127 insertions(+), 116 deletions(-)

diff --git 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
index 9f8460f..bd02eef 100644
--- 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
+++ 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
@@ -20,6 +20,7 @@ package org.apache.camel.kafkaconnector.common.test;
 import java.time.Duration;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -60,7 +61,26 @@ public abstract class CamelSinkTestSupport extends 
AbstractKafkaTest {
         }
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory, 
String topic, int count) throws Exception {
+    /**
+     * A simple test runner that follows the steps: initialize, start 
consumer, produce messages, verify results
+     *
+     * @param connectorPropertyFactory A factory for connector properties
+     * @param topic the topic to send the messages to
+     * @param count the number of messages to send
+     * @throws Exception For test-specific exceptions
+     */
+    protected void runTest(ConnectorPropertyFactory connectorPropertyFactory, 
String topic, int count) throws Exception {
+        runTest(connectorPropertyFactory, () -> produceMessages(topic, count));
+    }
+
+    /**
+     * A more flexible test runner that can use a custom producer of test 
messages
+     * @param connectorPropertyFactory a factory for connector properties
+     * @param producer the test message producer
+     * @throws ExecutionException
+     * @throws InterruptedException
+     */
+    protected void runTest(ConnectorPropertyFactory connectorPropertyFactory, 
TestMessageProducer producer) throws ExecutionException, InterruptedException {
         connectorPropertyFactory.log();
         
getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 
1);
 
@@ -70,13 +90,16 @@ public abstract class CamelSinkTestSupport extends 
AbstractKafkaTest {
         CountDownLatch latch = new CountDownLatch(1);
         service.submit(() -> consumeMessages(latch));
 
-        LOG.debug("Creating the producer and sending messages ...");
-        produceMessages(topic, count);
+        producer.producerMessages();
+
+        LOG.debug("Waiting for the messages to be processed");
+        service.shutdown();
 
         LOG.debug("Waiting for the test to complete");
         verifyMessages(latch);
     }
 
+
     protected boolean waitForData() {
         try {
             Thread.sleep(Duration.ofSeconds(1).toMillis());
diff --git 
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java
 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java
new file mode 100644
index 0000000..dedcf97
--- /dev/null
+++ 
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.common.test;
+
+@FunctionalInterface
+public interface TestMessageProducer {
+    void producerMessages();
+}
diff --git 
a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
 
b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
index d2f06a7..432a20a 100644
--- 
a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
+++ 
b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
@@ -20,9 +20,7 @@ package org.apache.camel.kafkaconnector.sjms2.sink;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.JMSException;
@@ -31,9 +29,9 @@ import javax.jms.MessageConsumer;
 import javax.jms.TextMessage;
 
 import org.apache.camel.kafkaconnector.CamelSinkTask;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient;
 import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
@@ -55,12 +53,7 @@ import static org.junit.jupiter.api.Assertions.fail;
  * Integration tests for the JMS sink using idempotent features
  */
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
-    @FunctionalInterface
-    interface Producer {
-        void producerMessages();
-    }
-
+public class CamelSinkIdempotentJMSITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static MessagingService jmsService = MessagingServiceBuilder
             .newBuilder(DispatchRouterContainer::new)
@@ -97,25 +90,13 @@ public class CamelSinkIdempotentJMSITCase extends 
AbstractKafkaTest {
         destinationName = SJMS2Common.DEFAULT_JMS_QUEUE + "-" + 
TestUtils.randomWithRange(0, 100);
     }
 
-    private boolean checkRecord(Message jmsMessage) {
-        if (jmsMessage instanceof TextMessage) {
-            try {
-                LOG.debug("Received: {}", ((TextMessage) 
jmsMessage).getText());
-
-                received++;
-
-                return true;
-            } catch (JMSException e) {
-                LOG.error("Failed to read message: {}", e.getMessage(), e);
-                fail("Failed to read message: " + e.getMessage());
-            }
-        }
-
-        return false;
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
     }
 
-
-    private void consumeJMSMessages() {
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
         JMSClient jmsClient = null;
 
         try {
@@ -145,31 +126,39 @@ public class CamelSinkIdempotentJMSITCase extends 
AbstractKafkaTest {
             LOG.error("JMS test failed: {}", e.getMessage(), e);
             fail(e.getMessage());
         } finally {
+            latch.countDown();
+
             if (jmsClient != null) {
                 jmsClient.stop();
             }
         }
     }
 
-    private void runTest(ConnectorPropertyFactory connectorPropertyFactory, 
Producer producer) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
-        ExecutorService service = Executors.newCachedThreadPool();
-
-        LOG.debug("Creating the consumer ...");
-        service.submit(() -> consumeJMSMessages());
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws 
InterruptedException {
+        if (latch.await(25, TimeUnit.SECONDS)) {
+            assertEquals(received, expect, "Didn't process the expected amount 
of messages: " + received
+                    + " != " + expect);
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
+    }
 
-        producer.producerMessages();
+    private boolean checkRecord(Message jmsMessage) {
+        if (jmsMessage instanceof TextMessage) {
+            try {
+                LOG.debug("Received: {}", ((TextMessage) 
jmsMessage).getText());
 
-        LOG.debug("Waiting for the messages to be processed");
-        service.shutdown();
+                received++;
 
-        if (service.awaitTermination(25, TimeUnit.SECONDS)) {
-            assertEquals(received, expect, "Didn't process the expected amount 
of messages: " + received + " != " + expect);
-        } else {
-            fail("Failed to receive the messages within the specified time");
+                return true;
+            } catch (JMSException e) {
+                LOG.error("Failed to read message: {}", e.getMessage(), e);
+                fail("Failed to read message: " + e.getMessage());
+            }
         }
+
+        return false;
     }
 
     private void produceMessagesNoProperties() {
diff --git 
a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
 
b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
index 41b87a8..5e9b66d 100644
--- 
a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
+++ 
b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
@@ -17,11 +17,9 @@
 
 package org.apache.camel.kafkaconnector.sjms2.sink;
 
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.JMSException;
@@ -29,10 +27,8 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.TextMessage;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient;
 import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
 import 
org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer;
@@ -53,7 +49,7 @@ import static org.junit.jupiter.api.Assertions.fail;
  * Integration tests for the JMS sink
  */
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkJMSITCase extends AbstractKafkaTest {
+public class CamelSinkJMSITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static MessagingService jmsService = MessagingServiceBuilder
             .newBuilder(DispatchRouterContainer::new)
@@ -62,6 +58,7 @@ public class CamelSinkJMSITCase extends AbstractKafkaTest {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CamelSinkJMSITCase.class);
 
+    private String topicName;
     private int received;
     private final int expect = 10;
 
@@ -83,6 +80,22 @@ public class CamelSinkJMSITCase extends AbstractKafkaTest {
     public void setUp() {
         LOG.info("JMS service running at {}", jmsService.defaultEndpoint());
         received = 0;
+
+        topicName = getTopicForTest(this);
+    }
+
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws 
InterruptedException {
+        if (latch.await(35, TimeUnit.SECONDS)) {
+            assertEquals(received, expect, "Didn't process the expected amount 
of messages: " + received + " != " + expect);
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
     }
 
     private boolean checkRecord(Message jmsMessage) {
@@ -106,70 +119,8 @@ public class CamelSinkJMSITCase extends AbstractKafkaTest {
         return false;
     }
 
-    private void runTest(ConnectorPropertyFactory connectorPropertyFactory) 
throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
-        CountDownLatch latch = new CountDownLatch(1);
-
-        ExecutorService service = Executors.newCachedThreadPool();
-
-        LOG.debug("Creating the consumer ...");
-        service.submit(() -> consumeJMSMessages(latch));
-
-        KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
-
-        for (int i = 0; i < expect; i++) {
-            
kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test 
message " + i);
-        }
-
-        LOG.debug("Created the consumer ... About to receive messages");
-
-        if (latch.await(35, TimeUnit.SECONDS)) {
-            assertEquals(received, expect, "Didn't process the expected amount 
of messages: " + received + " != " + expect);
-        } else {
-            fail("Failed to receive the messages within the specified time");
-        }
-    }
-
-    @Test
-    @Timeout(90)
-    public void testBasicSendReceive() {
-        try {
-            ConnectorPropertyFactory connectorPropertyFactory = 
CamelJMSPropertyFactory
-                    .basic()
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
-                    .withConnectionProperties(connectionProperties())
-                    .withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE);
-
-            runTest(connectorPropertyFactory);
-
-        } catch (Exception e) {
-            LOG.error("JMS test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
-    }
-
-    @Test
-    @Timeout(90)
-    public void testBasicSendReceiveUsingUrl() {
-        try {
-            ConnectorPropertyFactory connectorPropertyFactory = 
CamelJMSPropertyFactory
-                    .basic()
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
-                    .withConnectionProperties(connectionProperties())
-                        .withUrl(SJMS2Common.DEFAULT_JMS_QUEUE)
-                        .buildUrl();
-
-            runTest(connectorPropertyFactory);
-
-        } catch (Exception e) {
-            LOG.error("JMS test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
-    }
-
-    private void consumeJMSMessages(CountDownLatch latch) {
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
         JMSClient jmsClient = null;
 
         try {
@@ -193,4 +144,29 @@ public class CamelSinkJMSITCase extends AbstractKafkaTest {
             }
         }
     }
+
+    @Test
+    @Timeout(90)
+    public void testBasicSendReceive() throws Exception {
+        ConnectorPropertyFactory connectorPropertyFactory = 
CamelJMSPropertyFactory
+                .basic()
+                .withTopics(topicName)
+                .withConnectionProperties(connectionProperties())
+                .withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE);
+
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
+
+    @Test
+    @Timeout(90)
+    public void testBasicSendReceiveUsingUrl() throws Exception {
+        ConnectorPropertyFactory connectorPropertyFactory = 
CamelJMSPropertyFactory
+                .basic()
+                .withTopics(topicName)
+                .withConnectionProperties(connectionProperties())
+                    .withUrl(SJMS2Common.DEFAULT_JMS_QUEUE)
+                    .buildUrl();
+
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
 }

Reply via email to