This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a4ffef  Added support for using an external JMS broker
     new 1abbc79  Merge pull request #83 from 
orpiske/add-support-for-remote-jms-broker
8a4ffef is described below

commit 8a4ffef5fa28a96025cd9de2c4aed3d1d3177c58
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Wed Jan 29 18:09:03 2020 +0100

    Added support for using an external JMS broker
---
 .../apache/camel/kafkaconnector/PropertyUtils.java |  7 +-
 .../kafkaconnector/clients/jms/JMSClient.java      | 79 ++++++----------------
 .../{ArtemisService.java => ArtemisContainer.java} | 27 ++++++--
 .../services/jms/ContainerLocalService.java        | 60 ++++++++++++++++
 .../jms/{JMSService.java => JMSContainer.java}     | 28 ++++++--
 .../kafkaconnector/services/jms/JMSService.java    | 35 ++++++++--
 .../services/jms/JMSServiceFactory.java            | 10 ++-
 ...rvice.java => QpidDispatchRouterContainer.java} | 22 +++++-
 .../jms/{JMSService.java => RemoteJMSService.java} | 38 ++++++++---
 .../sink/jms/CamelSinkJMSITCase.java               |  8 +--
 .../source/jms/CamelSourceJMSITCase.java           | 62 +++++++++--------
 11 files changed, 251 insertions(+), 125 deletions(-)

diff --git 
a/tests/src/test/java/org/apache/camel/kafkaconnector/PropertyUtils.java 
b/tests/src/test/java/org/apache/camel/kafkaconnector/PropertyUtils.java
index a18971c..4081b78 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/PropertyUtils.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/PropertyUtils.java
@@ -29,11 +29,16 @@ import org.slf4j.LoggerFactory;
 
 public final class PropertyUtils {
     private static final Logger LOG = 
LoggerFactory.getLogger(PropertyUtils.class);
+    private static Properties properties = new Properties();
 
     private PropertyUtils() {
 
     }
 
+    public static Properties getProperties() {
+        return properties;
+    }
+
     public static void load() {
         String fileName = System.getProperty("test.properties");
 
@@ -44,8 +49,6 @@ public final class PropertyUtils {
         }
 
         try (InputStream stream = new FileInputStream(fileName)) {
-            Properties properties = new Properties();
-
             properties.load(stream);
 
             System.getProperties().putAll(properties);
diff --git 
a/tests/src/test/java/org/apache/camel/kafkaconnector/clients/jms/JMSClient.java
 
b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/jms/JMSClient.java
index 2a24264..4949126 100644
--- 
a/tests/src/test/java/org/apache/camel/kafkaconnector/clients/jms/JMSClient.java
+++ 
b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/jms/JMSClient.java
@@ -17,7 +17,6 @@
 
 package org.apache.camel.kafkaconnector.clients.jms;
 
-import java.util.Properties;
 import java.util.function.Function;
 import java.util.function.Predicate;
 
@@ -29,9 +28,9 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
-import javax.jms.Queue;
 import javax.jms.Session;
 
+import org.junit.jupiter.api.Assertions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,21 +40,27 @@ import org.slf4j.LoggerFactory;
 public class JMSClient {
     private static final Logger LOG = LoggerFactory.getLogger(JMSClient.class);
 
-    private final String url;
     private Connection connection;
     private Session session;
 
-    private final Function<String, ? extends ConnectionFactory> 
connectionFactory;
-    private final Function<String, ? extends Queue> destinationFactory;
+    private ConnectionFactory factory;
 
     public JMSClient(Function<String, ? extends ConnectionFactory> 
connectionFactory,
-                     Function<String, ? extends Queue> destinationFactory,
                      String url) {
-        this.connectionFactory = connectionFactory;
-        this.destinationFactory = destinationFactory;
-        this.url = url;
+        factory = connectionFactory.apply(url);
     }
 
+    public JMSClient(String className, String url) {
+        Class<? extends ConnectionFactory> clazz;
+        try {
+            clazz = (Class<? extends ConnectionFactory>) 
Class.forName(className);
+
+            factory = clazz.getConstructor(String.class).newInstance(url);
+        } catch (Exception e) {
+            LOG.error("Unable to create the JMS client classL {}", 
e.getMessage(), e);
+            Assertions.fail(e);
+        }
+    }
 
     @SuppressWarnings("UnusedReturnValue")
     public static Throwable capturingClose(MessageProducer closeable) {
@@ -113,8 +118,6 @@ public class JMSClient {
         LOG.debug("Starting the JMS client");
 
         try {
-            final ConnectionFactory factory = connectionFactory.apply(url);
-
             LOG.debug("Creating the connection");
             connection = factory.createConnection();
             LOG.debug("Connection created successfully");
@@ -146,7 +149,14 @@ public class JMSClient {
     }
 
     private Destination createDestination(final String destinationName) {
-        return destinationFactory.apply(destinationName);
+        try {
+            return session.createQueue(destinationName);
+        } catch (JMSException e) {
+            Assertions.fail(e.getMessage());
+
+            // unreachable
+            return null;
+        }
     }
 
 
@@ -225,49 +235,4 @@ public class JMSClient {
             capturingClose(producer);
         }
     }
-
-    public static JMSClient createClient(String url) {
-        String jmsInstanceType = 
System.getProperty("jms-service.instance.type");
-
-        if (jmsInstanceType == null || 
jmsInstanceType.equals("local-dispatch-router-container")) {
-            return new JMSClient(org.apache.qpid.jms.JmsConnectionFactory::new,
-                    org.apache.qpid.jms.JmsQueue::new, url);
-        }
-
-        if (jmsInstanceType.equals("local-artemis-container")) {
-            return new JMSClient(
-                    org.apache.activemq.ActiveMQConnectionFactory::new,
-                    org.apache.activemq.command.ActiveMQQueue::new,
-                    url);
-        }
-
-        LOG.error("Invalid JMS instance type: {}. Must be one of 
'local-artemis-container' or 'local-dispatch-router-container",
-                jmsInstanceType);
-        throw new UnsupportedOperationException("Invalid JMS instance type:");
-    }
-
-    public static Properties getConnectionProperties(String url) {
-        Properties properties = new Properties();
-
-        String jmsInstanceType = 
System.getProperty("jms-service.instance.type");
-
-        if (jmsInstanceType == null || 
jmsInstanceType.equals("local-dispatch-router-container")) {
-            properties.put("camel.component.sjms2.connection-factory", 
"#class:org.apache.qpid.jms.JmsConnectionFactory");
-            
properties.put("camel.component.sjms2.connection-factory.remoteURI", url);
-
-            return properties;
-        }
-
-        if (jmsInstanceType.equals("local-artemis-container")) {
-            properties.put("camel.component.sjms2.connection-factory", 
"#class:org.apache.activemq.ActiveMQConnectionFactory");
-            
properties.put("camel.component.sjms2.connection-factory.brokerURL", url);
-
-            return properties;
-        }
-
-        LOG.error("Invalid JMS instance type: {}. Must be one of 
'local-artemis-container' or 'local-dispatch-router-container",
-                jmsInstanceType);
-        throw new UnsupportedOperationException("Invalid JMS instance type:");
-    }
-
 }
diff --git 
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/ArtemisService.java
 
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/ArtemisContainer.java
similarity index 85%
rename from 
tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/ArtemisService.java
rename to 
tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/ArtemisContainer.java
index a1e828f..e9ee3a7 100644
--- 
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/ArtemisService.java
+++ 
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/ArtemisContainer.java
@@ -17,21 +17,20 @@
 
 package org.apache.camel.kafkaconnector.services.jms;
 
+import java.util.Properties;
+
+import org.apache.camel.kafkaconnector.clients.jms.JMSClient;
 import org.testcontainers.containers.wait.strategy.Wait;
 import org.testcontainers.images.builder.ImageFromDockerfile;
 
-/**
- * A specialized container that can be used to create Apache Artemis broker
- * instances.
- */
-public class ArtemisService extends JMSService {
+public class ArtemisContainer extends JMSContainer {
     private static final int DEFAULT_MQTT_PORT = 1883;
     private static final int DEFAULT_AMQP_PORT = 5672;
     private static final int DEFAULT_ADMIN_PORT = 8161;
     private static final int DEFAULT_ACCEPTOR_PORT = 61616;
 
 
-    public ArtemisService() {
+    public ArtemisContainer() {
         super(new ImageFromDockerfile("apache-artemis:ckc", false)
                 .withFileFromClasspath("Dockerfile",
                         
"org/apache/camel/kafkaconnector/services/jms/artemis/Dockerfile"));
@@ -115,7 +114,6 @@ public class ArtemisService extends JMSService {
     }
 
 
-
     /**
      * Gets the port number used for exchanging messages using the Openwire 
protocol
      * @return the port number
@@ -132,4 +130,19 @@ public class ArtemisService extends JMSService {
     public String getOpenwireEndpoint() {
         return String.format("tcp://localhost:%d", getOpenwirePort());
     }
+
+    @Override
+    public Properties getConnectionProperties() {
+        Properties properties = new Properties();
+
+        properties.put("camel.component.sjms2.connection-factory", 
"#class:org.apache.activemq.ActiveMQConnectionFactory");
+        properties.put("camel.component.sjms2.connection-factory.brokerURL", 
getDefaultEndpoint());
+
+        return properties;
+    }
+
+    @Override
+    public JMSClient getClient() {
+        return new 
JMSClient(org.apache.activemq.ActiveMQConnectionFactory::new, 
getDefaultEndpoint());
+    }
 }
diff --git 
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/ContainerLocalService.java
 
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/ContainerLocalService.java
new file mode 100644
index 0000000..93e81c8
--- /dev/null
+++ 
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/ContainerLocalService.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.services.jms;
+
+import java.util.Properties;
+
+import org.apache.camel.kafkaconnector.clients.jms.JMSClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A specialized container that can be used to create Apache Artemis broker
+ * instances.
+ */
+public class ContainerLocalService implements JMSService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ContainerLocalService.class);
+
+    private final JMSContainer container;
+
+    public ContainerLocalService(JMSContainer container) {
+        this.container = container;
+
+        container.start();
+    }
+
+    @Override
+    public Properties getConnectionProperties() {
+        return container.getConnectionProperties();
+    }
+
+    @Override
+    public JMSClient getClient() {
+        return container.getClient();
+    }
+
+    @Override
+    public String getDefaultEndpoint() {
+        return container.getDefaultEndpoint();
+    }
+
+    @Override
+    public void initialize() {
+        LOG.info("JMS broker running at address {}", 
container.getDefaultEndpoint());
+    }
+}
diff --git 
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java
 
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSContainer.java
similarity index 58%
copy from 
tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java
copy to 
tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSContainer.java
index 7480d28..9366373 100644
--- 
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java
+++ 
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSContainer.java
@@ -17,19 +17,35 @@
 
 package org.apache.camel.kafkaconnector.services.jms;
 
-import java.util.concurrent.Future;
+import java.util.Properties;
 
+import org.apache.camel.kafkaconnector.clients.jms.JMSClient;
 import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.images.builder.ImageFromDockerfile;
 
-public abstract class JMSService extends GenericContainer {
+public abstract class JMSContainer extends GenericContainer {
 
-    public JMSService(Future<String> image) {
-        super(image);
+
+    public JMSContainer(ImageFromDockerfile dockerfile) {
+        super(dockerfile);
     }
 
     /**
-     * Gets the default endpoint for the JMS service (ie.: amqp://host:port, 
or tcp://host:port, etc)
-     * @return the endpoint URL as a string in the specific format used by the 
service
+     * Gets the connection properties for accessing the service
+     * @return
+     */
+    public abstract Properties getConnectionProperties();
+
+
+    /**
+     * Get a client that can access the container
+     * @return
+     */
+    public abstract JMSClient getClient();
+
+    /**
+     * Gets the end point URL used exchanging messages through the default 
acceptor port
+     * @return the end point URL as a string
      */
     public abstract String getDefaultEndpoint();
 }
diff --git 
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java
 
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java
index 7480d28..99a5abd 100644
--- 
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java
+++ 
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java
@@ -17,19 +17,40 @@
 
 package org.apache.camel.kafkaconnector.services.jms;
 
-import java.util.concurrent.Future;
+import java.util.Properties;
 
-import org.testcontainers.containers.GenericContainer;
+import org.apache.camel.kafkaconnector.clients.jms.JMSClient;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
 
-public abstract class JMSService extends GenericContainer {
+public interface JMSService extends BeforeAllCallback {
 
-    public JMSService(Future<String> image) {
-        super(image);
-    }
+    /**
+     * Gets the connection properties for accessing the service
+     * @return
+     */
+    Properties getConnectionProperties();
+
+    /**
+     * Get the appropriate client for the service
+     * @return
+     */
+    JMSClient getClient();
 
     /**
      * Gets the default endpoint for the JMS service (ie.: amqp://host:port, 
or tcp://host:port, etc)
      * @return the endpoint URL as a string in the specific format used by the 
service
      */
-    public abstract String getDefaultEndpoint();
+    String getDefaultEndpoint();
+
+    /**
+     * Perform any initialization necessary
+     */
+    void initialize();
+
+
+    @Override
+    default void beforeAll(ExtensionContext extensionContext) throws Exception 
{
+        initialize();
+    }
 }
diff --git 
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSServiceFactory.java
 
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSServiceFactory.java
index aa18498..0855010 100644
--- 
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSServiceFactory.java
+++ 
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSServiceFactory.java
@@ -30,14 +30,18 @@ public final class JMSServiceFactory {
         String jmsInstanceType = 
System.getProperty("jms-service.instance.type");
 
         if (jmsInstanceType == null || 
jmsInstanceType.equals("local-dispatch-router-container")) {
-            return new QpidDispatchRouterService();
+            return new ContainerLocalService(new 
QpidDispatchRouterContainer());
         }
 
         if (jmsInstanceType.equals("local-artemis-container")) {
-            return new ArtemisService();
+            return new ContainerLocalService(new ArtemisContainer());
         }
 
-        LOG.error("Invalid JMS instance type: {}. Must be one of 
'local-artemis-container' or 'local-dispatch-router-container",
+        if (jmsInstanceType.equals("remote")) {
+            return new RemoteJMSService();
+        }
+
+        LOG.error("Invalid JMS instance type: {}. Must be one of 'remote', 
'local-artemis-container' or 'local-dispatch-router-container",
                 jmsInstanceType);
         throw new UnsupportedOperationException("Invalid JMS instance type:");
     }
diff --git 
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/QpidDispatchRouterService.java
 
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/QpidDispatchRouterContainer.java
similarity index 73%
rename from 
tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/QpidDispatchRouterService.java
rename to 
tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/QpidDispatchRouterContainer.java
index 5eaebdb..1d93d21 100644
--- 
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/QpidDispatchRouterService.java
+++ 
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/QpidDispatchRouterContainer.java
@@ -17,14 +17,17 @@
 
 package org.apache.camel.kafkaconnector.services.jms;
 
+import java.util.Properties;
+
+import org.apache.camel.kafkaconnector.clients.jms.JMSClient;
 import org.testcontainers.containers.wait.strategy.Wait;
 import org.testcontainers.images.builder.ImageFromDockerfile;
 
-public class QpidDispatchRouterService extends JMSService {
+public class QpidDispatchRouterContainer extends JMSContainer {
     private static final int DEFAULT_AMQP_PORT = 5672;
 
 
-    public QpidDispatchRouterService() {
+    public QpidDispatchRouterContainer() {
         super(new ImageFromDockerfile("qpid-dispatch:ckc", false)
                 .withFileFromClasspath("Dockerfile",
                         
"org/apache/camel/kafkaconnector/services/jms/qpid-dispatch-router/Dockerfile"));
@@ -56,4 +59,19 @@ public class QpidDispatchRouterService extends JMSService {
     public String getDefaultEndpoint() {
         return getAMQPEndpoint();
     }
+
+    @Override
+    public Properties getConnectionProperties() {
+        Properties properties = new Properties();
+
+        properties.put("camel.component.sjms2.connection-factory", 
"#class:org.apache.qpid.jms.JmsConnectionFactory");
+        properties.put("camel.component.sjms2.connection-factory.remoteURI", 
getDefaultEndpoint());
+
+        return properties;
+    }
+
+    @Override
+    public JMSClient getClient() {
+        return new JMSClient(org.apache.qpid.jms.JmsConnectionFactory::new, 
getDefaultEndpoint());
+    }
 }
diff --git 
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java
 
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/RemoteJMSService.java
similarity index 51%
copy from 
tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java
copy to 
tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/RemoteJMSService.java
index 7480d28..c0f88f0 100644
--- 
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java
+++ 
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/RemoteJMSService.java
@@ -17,19 +17,37 @@
 
 package org.apache.camel.kafkaconnector.services.jms;
 
-import java.util.concurrent.Future;
+import java.util.Properties;
 
-import org.testcontainers.containers.GenericContainer;
+import org.apache.camel.kafkaconnector.PropertyUtils;
+import org.apache.camel.kafkaconnector.clients.jms.JMSClient;
 
-public abstract class JMSService extends GenericContainer {
+public class RemoteJMSService implements JMSService {
 
-    public JMSService(Future<String> image) {
-        super(image);
+
+    @Override
+    public void initialize() {
+        // NO-OP
+    }
+
+    @Override
+    public Properties getConnectionProperties() {
+        return PropertyUtils.getProperties();
+    }
+
+    @Override
+    public String getDefaultEndpoint() {
+        return System.getProperty("jms.broker.address");
     }
 
-    /**
-     * Gets the default endpoint for the JMS service (ie.: amqp://host:port, 
or tcp://host:port, etc)
-     * @return the endpoint URL as a string in the specific format used by the 
service
-     */
-    public abstract String getDefaultEndpoint();
+    @Override
+    public JMSClient getClient() {
+        String tmpConnectionFactory = 
System.getProperty("camel.component.sjms2.connection-factory");
+
+        String connectionFactory = tmpConnectionFactory.replace("#class:", "");
+
+        return new JMSClient(connectionFactory, getDefaultEndpoint());
+
+
+    }
 }
diff --git 
a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/jms/CamelSinkJMSITCase.java
 
b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/jms/CamelSinkJMSITCase.java
index 899349c..f14c801 100644
--- 
a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/jms/CamelSinkJMSITCase.java
+++ 
b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/jms/CamelSinkJMSITCase.java
@@ -37,9 +37,9 @@ import 
org.apache.camel.kafkaconnector.services.jms.JMSServiceFactory;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
 import static org.junit.Assert.fail;
@@ -53,7 +53,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 public class CamelSinkJMSITCase extends AbstractKafkaTest {
     private static final Logger LOG = 
LoggerFactory.getLogger(CamelSinkJMSITCase.class);
 
-    @Container
+    @RegisterExtension
     public JMSService jmsService = JMSServiceFactory.createService();
 
     private int received;
@@ -89,7 +89,7 @@ public class CamelSinkJMSITCase extends AbstractKafkaTest {
     @Timeout(90)
     public void testBasicSendReceive() {
         try {
-            Properties connectionProperties = 
JMSClient.getConnectionProperties(jmsService.getDefaultEndpoint());
+            Properties connectionProperties = 
jmsService.getConnectionProperties();
 
             ConnectorPropertyFactory testProperties = new 
CamelJMSPropertyFactory(1,
                     TestCommon.getDefaultTestTopic(this.getClass()),
@@ -128,7 +128,7 @@ public class CamelSinkJMSITCase extends AbstractKafkaTest {
         JMSClient jmsClient = null;
 
         try {
-            jmsClient = 
JMSClient.createClient(jmsService.getDefaultEndpoint());
+            jmsClient = jmsService.getClient();
 
             jmsClient.start();
 
diff --git 
a/tests/src/test/java/org/apache/camel/kafkaconnector/source/jms/CamelSourceJMSITCase.java
 
b/tests/src/test/java/org/apache/camel/kafkaconnector/source/jms/CamelSourceJMSITCase.java
index 6e76879..91e4f39 100644
--- 
a/tests/src/test/java/org/apache/camel/kafkaconnector/source/jms/CamelSourceJMSITCase.java
+++ 
b/tests/src/test/java/org/apache/camel/kafkaconnector/source/jms/CamelSourceJMSITCase.java
@@ -19,6 +19,8 @@ package org.apache.camel.kafkaconnector.source.jms;
 
 import java.util.Properties;
 
+import javax.jms.JMSException;
+
 import org.apache.camel.kafkaconnector.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.TestCommon;
@@ -27,12 +29,11 @@ import 
org.apache.camel.kafkaconnector.clients.kafka.KafkaClient;
 import org.apache.camel.kafkaconnector.services.jms.JMSService;
 import org.apache.camel.kafkaconnector.services.jms.JMSServiceFactory;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -47,17 +48,12 @@ import static org.junit.jupiter.api.Assertions.fail;
 public class CamelSourceJMSITCase extends AbstractKafkaTest {
     private static final Logger LOG = 
LoggerFactory.getLogger(CamelSourceJMSITCase.class);
 
-    @Container
+    @RegisterExtension
     public JMSService jmsService = JMSServiceFactory.createService();
 
     private int received;
     private final int expect = 10;
 
-    @BeforeEach
-    public void setUp() {
-        LOG.info("JMS service running at {}", jmsService.getDefaultEndpoint());
-    }
-
     private boolean checkRecord(ConsumerRecord<String, String> record) {
         LOG.debug("Received: {}", record.value());
         received++;
@@ -69,24 +65,39 @@ public class CamelSourceJMSITCase extends AbstractKafkaTest 
{
         return true;
     }
 
+    private void produceMessages(String queue, String baseText) {
+        JMSClient jmsProducer = null;
+
+        try {
+            jmsProducer = jmsService.getClient();
+
+            jmsProducer.start();
+            for (int i = 0; i < expect; i++) {
+                jmsProducer.send(queue, baseText + " " + i);
+            }
+        } catch (JMSException e) {
+            LOG.error("JMS exception trying to send messages to the queue: 
{}", e.getMessage(), e);
+            fail(e.getMessage());
+        } catch (Exception e) {
+            LOG.error("Failed to send messages to the queue: {}", 
e.getMessage(), e);
+            fail(e.getMessage());
+        } finally {
+            jmsProducer.stop();
+        }
+    }
+
     @Test
     @Timeout(90)
     public void testBasicSendReceive() {
         try {
-            Properties connectionProperties = 
JMSClient.getConnectionProperties(jmsService.getDefaultEndpoint());
+            Properties connectionProperties = 
jmsService.getConnectionProperties();
 
             ConnectorPropertyFactory testProperties = new 
CamelJMSPropertyFactory(1,
                     TestCommon.getDefaultTestTopic(this.getClass()), 
TestCommon.DEFAULT_JMS_QUEUE, connectionProperties);
 
             getKafkaConnectService().initializeConnector(testProperties);
 
-            JMSClient jmsProducer = 
JMSClient.createClient(jmsService.getDefaultEndpoint());
-
-            jmsProducer.start();
-            for (int i = 0; i < expect; i++) {
-                jmsProducer.send(TestCommon.DEFAULT_JMS_QUEUE, "Test message " 
+ i);
-            }
-            jmsProducer.stop();
+            produceMessages(TestCommon.DEFAULT_JMS_QUEUE, "Test string 
message");
 
             LOG.debug("Creating the consumer ...");
             KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
@@ -98,28 +109,25 @@ public class CamelSourceJMSITCase extends 
AbstractKafkaTest {
             LOG.error("JMS test failed: {}", e.getMessage(), e);
             fail(e.getMessage());
         }
-
     }
 
+
+
     @Test
     @Timeout(90)
     public void testIntSendReceive() {
         try {
-            Properties connectionProperties = 
JMSClient.getConnectionProperties(jmsService.getDefaultEndpoint());
+            final String jmsQueueName = "testIntSendReceive";
+
+            Properties connectionProperties = 
jmsService.getConnectionProperties();
 
             ConnectorPropertyFactory testProperties = new 
CamelJMSPropertyFactory(1,
-                    TestCommon.getDefaultTestTopic(this.getClass()) + 
"testIntSendReceive",
-                    "testIntSendReceive", connectionProperties);
+                    TestCommon.getDefaultTestTopic(this.getClass()) + 
jmsQueueName,
+                    jmsQueueName, connectionProperties);
 
             getKafkaConnectService().initializeConnector(testProperties);
 
-            JMSClient jmsProducer = 
JMSClient.createClient(jmsService.getDefaultEndpoint());
-
-            jmsProducer.start();
-            for (int i = 0; i < expect; i++) {
-                jmsProducer.send("testIntSendReceive", i);
-            }
-            jmsProducer.stop();
+            produceMessages(jmsQueueName, "Test string message");
 
             LOG.debug("Creating the consumer ...");
             KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());

Reply via email to