This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 8a4ffef Added support for using an external JMS broker new 1abbc79 Merge pull request #83 from orpiske/add-support-for-remote-jms-broker 8a4ffef is described below commit 8a4ffef5fa28a96025cd9de2c4aed3d1d3177c58 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Wed Jan 29 18:09:03 2020 +0100 Added support for using an external JMS broker --- .../apache/camel/kafkaconnector/PropertyUtils.java | 7 +- .../kafkaconnector/clients/jms/JMSClient.java | 79 ++++++---------------- .../{ArtemisService.java => ArtemisContainer.java} | 27 ++++++-- .../services/jms/ContainerLocalService.java | 60 ++++++++++++++++ .../jms/{JMSService.java => JMSContainer.java} | 28 ++++++-- .../kafkaconnector/services/jms/JMSService.java | 35 ++++++++-- .../services/jms/JMSServiceFactory.java | 10 ++- ...rvice.java => QpidDispatchRouterContainer.java} | 22 +++++- .../jms/{JMSService.java => RemoteJMSService.java} | 38 ++++++++--- .../sink/jms/CamelSinkJMSITCase.java | 8 +-- .../source/jms/CamelSourceJMSITCase.java | 62 +++++++++-------- 11 files changed, 251 insertions(+), 125 deletions(-) diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/PropertyUtils.java b/tests/src/test/java/org/apache/camel/kafkaconnector/PropertyUtils.java index a18971c..4081b78 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/PropertyUtils.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/PropertyUtils.java @@ -29,11 +29,16 @@ import org.slf4j.LoggerFactory; public final class PropertyUtils { private static final Logger LOG = LoggerFactory.getLogger(PropertyUtils.class); + private static Properties properties = new Properties(); private PropertyUtils() { } + public static Properties getProperties() { + return properties; + } + public static void load() { String fileName = System.getProperty("test.properties"); @@ -44,8 +49,6 @@ public final class PropertyUtils { } try (InputStream stream = new FileInputStream(fileName)) { - Properties properties = new Properties(); - properties.load(stream); System.getProperties().putAll(properties); diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/clients/jms/JMSClient.java b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/jms/JMSClient.java index 2a24264..4949126 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/clients/jms/JMSClient.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/jms/JMSClient.java @@ -17,7 +17,6 @@ package org.apache.camel.kafkaconnector.clients.jms; -import java.util.Properties; import java.util.function.Function; import java.util.function.Predicate; @@ -29,9 +28,9 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; -import javax.jms.Queue; import javax.jms.Session; +import org.junit.jupiter.api.Assertions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,21 +40,27 @@ import org.slf4j.LoggerFactory; public class JMSClient { private static final Logger LOG = LoggerFactory.getLogger(JMSClient.class); - private final String url; private Connection connection; private Session session; - private final Function<String, ? extends ConnectionFactory> connectionFactory; - private final Function<String, ? extends Queue> destinationFactory; + private ConnectionFactory factory; public JMSClient(Function<String, ? extends ConnectionFactory> connectionFactory, - Function<String, ? extends Queue> destinationFactory, String url) { - this.connectionFactory = connectionFactory; - this.destinationFactory = destinationFactory; - this.url = url; + factory = connectionFactory.apply(url); } + public JMSClient(String className, String url) { + Class<? extends ConnectionFactory> clazz; + try { + clazz = (Class<? extends ConnectionFactory>) Class.forName(className); + + factory = clazz.getConstructor(String.class).newInstance(url); + } catch (Exception e) { + LOG.error("Unable to create the JMS client classL {}", e.getMessage(), e); + Assertions.fail(e); + } + } @SuppressWarnings("UnusedReturnValue") public static Throwable capturingClose(MessageProducer closeable) { @@ -113,8 +118,6 @@ public class JMSClient { LOG.debug("Starting the JMS client"); try { - final ConnectionFactory factory = connectionFactory.apply(url); - LOG.debug("Creating the connection"); connection = factory.createConnection(); LOG.debug("Connection created successfully"); @@ -146,7 +149,14 @@ public class JMSClient { } private Destination createDestination(final String destinationName) { - return destinationFactory.apply(destinationName); + try { + return session.createQueue(destinationName); + } catch (JMSException e) { + Assertions.fail(e.getMessage()); + + // unreachable + return null; + } } @@ -225,49 +235,4 @@ public class JMSClient { capturingClose(producer); } } - - public static JMSClient createClient(String url) { - String jmsInstanceType = System.getProperty("jms-service.instance.type"); - - if (jmsInstanceType == null || jmsInstanceType.equals("local-dispatch-router-container")) { - return new JMSClient(org.apache.qpid.jms.JmsConnectionFactory::new, - org.apache.qpid.jms.JmsQueue::new, url); - } - - if (jmsInstanceType.equals("local-artemis-container")) { - return new JMSClient( - org.apache.activemq.ActiveMQConnectionFactory::new, - org.apache.activemq.command.ActiveMQQueue::new, - url); - } - - LOG.error("Invalid JMS instance type: {}. Must be one of 'local-artemis-container' or 'local-dispatch-router-container", - jmsInstanceType); - throw new UnsupportedOperationException("Invalid JMS instance type:"); - } - - public static Properties getConnectionProperties(String url) { - Properties properties = new Properties(); - - String jmsInstanceType = System.getProperty("jms-service.instance.type"); - - if (jmsInstanceType == null || jmsInstanceType.equals("local-dispatch-router-container")) { - properties.put("camel.component.sjms2.connection-factory", "#class:org.apache.qpid.jms.JmsConnectionFactory"); - properties.put("camel.component.sjms2.connection-factory.remoteURI", url); - - return properties; - } - - if (jmsInstanceType.equals("local-artemis-container")) { - properties.put("camel.component.sjms2.connection-factory", "#class:org.apache.activemq.ActiveMQConnectionFactory"); - properties.put("camel.component.sjms2.connection-factory.brokerURL", url); - - return properties; - } - - LOG.error("Invalid JMS instance type: {}. Must be one of 'local-artemis-container' or 'local-dispatch-router-container", - jmsInstanceType); - throw new UnsupportedOperationException("Invalid JMS instance type:"); - } - } diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/ArtemisService.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/ArtemisContainer.java similarity index 85% rename from tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/ArtemisService.java rename to tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/ArtemisContainer.java index a1e828f..e9ee3a7 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/ArtemisService.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/ArtemisContainer.java @@ -17,21 +17,20 @@ package org.apache.camel.kafkaconnector.services.jms; +import java.util.Properties; + +import org.apache.camel.kafkaconnector.clients.jms.JMSClient; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.images.builder.ImageFromDockerfile; -/** - * A specialized container that can be used to create Apache Artemis broker - * instances. - */ -public class ArtemisService extends JMSService { +public class ArtemisContainer extends JMSContainer { private static final int DEFAULT_MQTT_PORT = 1883; private static final int DEFAULT_AMQP_PORT = 5672; private static final int DEFAULT_ADMIN_PORT = 8161; private static final int DEFAULT_ACCEPTOR_PORT = 61616; - public ArtemisService() { + public ArtemisContainer() { super(new ImageFromDockerfile("apache-artemis:ckc", false) .withFileFromClasspath("Dockerfile", "org/apache/camel/kafkaconnector/services/jms/artemis/Dockerfile")); @@ -115,7 +114,6 @@ public class ArtemisService extends JMSService { } - /** * Gets the port number used for exchanging messages using the Openwire protocol * @return the port number @@ -132,4 +130,19 @@ public class ArtemisService extends JMSService { public String getOpenwireEndpoint() { return String.format("tcp://localhost:%d", getOpenwirePort()); } + + @Override + public Properties getConnectionProperties() { + Properties properties = new Properties(); + + properties.put("camel.component.sjms2.connection-factory", "#class:org.apache.activemq.ActiveMQConnectionFactory"); + properties.put("camel.component.sjms2.connection-factory.brokerURL", getDefaultEndpoint()); + + return properties; + } + + @Override + public JMSClient getClient() { + return new JMSClient(org.apache.activemq.ActiveMQConnectionFactory::new, getDefaultEndpoint()); + } } diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/ContainerLocalService.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/ContainerLocalService.java new file mode 100644 index 0000000..93e81c8 --- /dev/null +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/ContainerLocalService.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.services.jms; + +import java.util.Properties; + +import org.apache.camel.kafkaconnector.clients.jms.JMSClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A specialized container that can be used to create Apache Artemis broker + * instances. + */ +public class ContainerLocalService implements JMSService { + private static final Logger LOG = LoggerFactory.getLogger(ContainerLocalService.class); + + private final JMSContainer container; + + public ContainerLocalService(JMSContainer container) { + this.container = container; + + container.start(); + } + + @Override + public Properties getConnectionProperties() { + return container.getConnectionProperties(); + } + + @Override + public JMSClient getClient() { + return container.getClient(); + } + + @Override + public String getDefaultEndpoint() { + return container.getDefaultEndpoint(); + } + + @Override + public void initialize() { + LOG.info("JMS broker running at address {}", container.getDefaultEndpoint()); + } +} diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSContainer.java similarity index 58% copy from tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java copy to tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSContainer.java index 7480d28..9366373 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSContainer.java @@ -17,19 +17,35 @@ package org.apache.camel.kafkaconnector.services.jms; -import java.util.concurrent.Future; +import java.util.Properties; +import org.apache.camel.kafkaconnector.clients.jms.JMSClient; import org.testcontainers.containers.GenericContainer; +import org.testcontainers.images.builder.ImageFromDockerfile; -public abstract class JMSService extends GenericContainer { +public abstract class JMSContainer extends GenericContainer { - public JMSService(Future<String> image) { - super(image); + + public JMSContainer(ImageFromDockerfile dockerfile) { + super(dockerfile); } /** - * Gets the default endpoint for the JMS service (ie.: amqp://host:port, or tcp://host:port, etc) - * @return the endpoint URL as a string in the specific format used by the service + * Gets the connection properties for accessing the service + * @return + */ + public abstract Properties getConnectionProperties(); + + + /** + * Get a client that can access the container + * @return + */ + public abstract JMSClient getClient(); + + /** + * Gets the end point URL used exchanging messages through the default acceptor port + * @return the end point URL as a string */ public abstract String getDefaultEndpoint(); } diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java index 7480d28..99a5abd 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java @@ -17,19 +17,40 @@ package org.apache.camel.kafkaconnector.services.jms; -import java.util.concurrent.Future; +import java.util.Properties; -import org.testcontainers.containers.GenericContainer; +import org.apache.camel.kafkaconnector.clients.jms.JMSClient; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; -public abstract class JMSService extends GenericContainer { +public interface JMSService extends BeforeAllCallback { - public JMSService(Future<String> image) { - super(image); - } + /** + * Gets the connection properties for accessing the service + * @return + */ + Properties getConnectionProperties(); + + /** + * Get the appropriate client for the service + * @return + */ + JMSClient getClient(); /** * Gets the default endpoint for the JMS service (ie.: amqp://host:port, or tcp://host:port, etc) * @return the endpoint URL as a string in the specific format used by the service */ - public abstract String getDefaultEndpoint(); + String getDefaultEndpoint(); + + /** + * Perform any initialization necessary + */ + void initialize(); + + + @Override + default void beforeAll(ExtensionContext extensionContext) throws Exception { + initialize(); + } } diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSServiceFactory.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSServiceFactory.java index aa18498..0855010 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSServiceFactory.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSServiceFactory.java @@ -30,14 +30,18 @@ public final class JMSServiceFactory { String jmsInstanceType = System.getProperty("jms-service.instance.type"); if (jmsInstanceType == null || jmsInstanceType.equals("local-dispatch-router-container")) { - return new QpidDispatchRouterService(); + return new ContainerLocalService(new QpidDispatchRouterContainer()); } if (jmsInstanceType.equals("local-artemis-container")) { - return new ArtemisService(); + return new ContainerLocalService(new ArtemisContainer()); } - LOG.error("Invalid JMS instance type: {}. Must be one of 'local-artemis-container' or 'local-dispatch-router-container", + if (jmsInstanceType.equals("remote")) { + return new RemoteJMSService(); + } + + LOG.error("Invalid JMS instance type: {}. Must be one of 'remote', 'local-artemis-container' or 'local-dispatch-router-container", jmsInstanceType); throw new UnsupportedOperationException("Invalid JMS instance type:"); } diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/QpidDispatchRouterService.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/QpidDispatchRouterContainer.java similarity index 73% rename from tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/QpidDispatchRouterService.java rename to tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/QpidDispatchRouterContainer.java index 5eaebdb..1d93d21 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/QpidDispatchRouterService.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/QpidDispatchRouterContainer.java @@ -17,14 +17,17 @@ package org.apache.camel.kafkaconnector.services.jms; +import java.util.Properties; + +import org.apache.camel.kafkaconnector.clients.jms.JMSClient; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.images.builder.ImageFromDockerfile; -public class QpidDispatchRouterService extends JMSService { +public class QpidDispatchRouterContainer extends JMSContainer { private static final int DEFAULT_AMQP_PORT = 5672; - public QpidDispatchRouterService() { + public QpidDispatchRouterContainer() { super(new ImageFromDockerfile("qpid-dispatch:ckc", false) .withFileFromClasspath("Dockerfile", "org/apache/camel/kafkaconnector/services/jms/qpid-dispatch-router/Dockerfile")); @@ -56,4 +59,19 @@ public class QpidDispatchRouterService extends JMSService { public String getDefaultEndpoint() { return getAMQPEndpoint(); } + + @Override + public Properties getConnectionProperties() { + Properties properties = new Properties(); + + properties.put("camel.component.sjms2.connection-factory", "#class:org.apache.qpid.jms.JmsConnectionFactory"); + properties.put("camel.component.sjms2.connection-factory.remoteURI", getDefaultEndpoint()); + + return properties; + } + + @Override + public JMSClient getClient() { + return new JMSClient(org.apache.qpid.jms.JmsConnectionFactory::new, getDefaultEndpoint()); + } } diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/RemoteJMSService.java similarity index 51% copy from tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java copy to tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/RemoteJMSService.java index 7480d28..c0f88f0 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/RemoteJMSService.java @@ -17,19 +17,37 @@ package org.apache.camel.kafkaconnector.services.jms; -import java.util.concurrent.Future; +import java.util.Properties; -import org.testcontainers.containers.GenericContainer; +import org.apache.camel.kafkaconnector.PropertyUtils; +import org.apache.camel.kafkaconnector.clients.jms.JMSClient; -public abstract class JMSService extends GenericContainer { +public class RemoteJMSService implements JMSService { - public JMSService(Future<String> image) { - super(image); + + @Override + public void initialize() { + // NO-OP + } + + @Override + public Properties getConnectionProperties() { + return PropertyUtils.getProperties(); + } + + @Override + public String getDefaultEndpoint() { + return System.getProperty("jms.broker.address"); } - /** - * Gets the default endpoint for the JMS service (ie.: amqp://host:port, or tcp://host:port, etc) - * @return the endpoint URL as a string in the specific format used by the service - */ - public abstract String getDefaultEndpoint(); + @Override + public JMSClient getClient() { + String tmpConnectionFactory = System.getProperty("camel.component.sjms2.connection-factory"); + + String connectionFactory = tmpConnectionFactory.replace("#class:", ""); + + return new JMSClient(connectionFactory, getDefaultEndpoint()); + + + } } diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/jms/CamelSinkJMSITCase.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/jms/CamelSinkJMSITCase.java index 899349c..f14c801 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/jms/CamelSinkJMSITCase.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/jms/CamelSinkJMSITCase.java @@ -37,9 +37,9 @@ import org.apache.camel.kafkaconnector.services.jms.JMSServiceFactory; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import static org.junit.Assert.fail; @@ -53,7 +53,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class CamelSinkJMSITCase extends AbstractKafkaTest { private static final Logger LOG = LoggerFactory.getLogger(CamelSinkJMSITCase.class); - @Container + @RegisterExtension public JMSService jmsService = JMSServiceFactory.createService(); private int received; @@ -89,7 +89,7 @@ public class CamelSinkJMSITCase extends AbstractKafkaTest { @Timeout(90) public void testBasicSendReceive() { try { - Properties connectionProperties = JMSClient.getConnectionProperties(jmsService.getDefaultEndpoint()); + Properties connectionProperties = jmsService.getConnectionProperties(); ConnectorPropertyFactory testProperties = new CamelJMSPropertyFactory(1, TestCommon.getDefaultTestTopic(this.getClass()), @@ -128,7 +128,7 @@ public class CamelSinkJMSITCase extends AbstractKafkaTest { JMSClient jmsClient = null; try { - jmsClient = JMSClient.createClient(jmsService.getDefaultEndpoint()); + jmsClient = jmsService.getClient(); jmsClient.start(); diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/source/jms/CamelSourceJMSITCase.java b/tests/src/test/java/org/apache/camel/kafkaconnector/source/jms/CamelSourceJMSITCase.java index 6e76879..91e4f39 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/source/jms/CamelSourceJMSITCase.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/source/jms/CamelSourceJMSITCase.java @@ -19,6 +19,8 @@ package org.apache.camel.kafkaconnector.source.jms; import java.util.Properties; +import javax.jms.JMSException; + import org.apache.camel.kafkaconnector.AbstractKafkaTest; import org.apache.camel.kafkaconnector.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.TestCommon; @@ -27,12 +29,11 @@ import org.apache.camel.kafkaconnector.clients.kafka.KafkaClient; import org.apache.camel.kafkaconnector.services.jms.JMSService; import org.apache.camel.kafkaconnector.services.jms.JMSServiceFactory; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -47,17 +48,12 @@ import static org.junit.jupiter.api.Assertions.fail; public class CamelSourceJMSITCase extends AbstractKafkaTest { private static final Logger LOG = LoggerFactory.getLogger(CamelSourceJMSITCase.class); - @Container + @RegisterExtension public JMSService jmsService = JMSServiceFactory.createService(); private int received; private final int expect = 10; - @BeforeEach - public void setUp() { - LOG.info("JMS service running at {}", jmsService.getDefaultEndpoint()); - } - private boolean checkRecord(ConsumerRecord<String, String> record) { LOG.debug("Received: {}", record.value()); received++; @@ -69,24 +65,39 @@ public class CamelSourceJMSITCase extends AbstractKafkaTest { return true; } + private void produceMessages(String queue, String baseText) { + JMSClient jmsProducer = null; + + try { + jmsProducer = jmsService.getClient(); + + jmsProducer.start(); + for (int i = 0; i < expect; i++) { + jmsProducer.send(queue, baseText + " " + i); + } + } catch (JMSException e) { + LOG.error("JMS exception trying to send messages to the queue: {}", e.getMessage(), e); + fail(e.getMessage()); + } catch (Exception e) { + LOG.error("Failed to send messages to the queue: {}", e.getMessage(), e); + fail(e.getMessage()); + } finally { + jmsProducer.stop(); + } + } + @Test @Timeout(90) public void testBasicSendReceive() { try { - Properties connectionProperties = JMSClient.getConnectionProperties(jmsService.getDefaultEndpoint()); + Properties connectionProperties = jmsService.getConnectionProperties(); ConnectorPropertyFactory testProperties = new CamelJMSPropertyFactory(1, TestCommon.getDefaultTestTopic(this.getClass()), TestCommon.DEFAULT_JMS_QUEUE, connectionProperties); getKafkaConnectService().initializeConnector(testProperties); - JMSClient jmsProducer = JMSClient.createClient(jmsService.getDefaultEndpoint()); - - jmsProducer.start(); - for (int i = 0; i < expect; i++) { - jmsProducer.send(TestCommon.DEFAULT_JMS_QUEUE, "Test message " + i); - } - jmsProducer.stop(); + produceMessages(TestCommon.DEFAULT_JMS_QUEUE, "Test string message"); LOG.debug("Creating the consumer ..."); KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); @@ -98,28 +109,25 @@ public class CamelSourceJMSITCase extends AbstractKafkaTest { LOG.error("JMS test failed: {}", e.getMessage(), e); fail(e.getMessage()); } - } + + @Test @Timeout(90) public void testIntSendReceive() { try { - Properties connectionProperties = JMSClient.getConnectionProperties(jmsService.getDefaultEndpoint()); + final String jmsQueueName = "testIntSendReceive"; + + Properties connectionProperties = jmsService.getConnectionProperties(); ConnectorPropertyFactory testProperties = new CamelJMSPropertyFactory(1, - TestCommon.getDefaultTestTopic(this.getClass()) + "testIntSendReceive", - "testIntSendReceive", connectionProperties); + TestCommon.getDefaultTestTopic(this.getClass()) + jmsQueueName, + jmsQueueName, connectionProperties); getKafkaConnectService().initializeConnector(testProperties); - JMSClient jmsProducer = JMSClient.createClient(jmsService.getDefaultEndpoint()); - - jmsProducer.start(); - for (int i = 0; i < expect; i++) { - jmsProducer.send("testIntSendReceive", i); - } - jmsProducer.stop(); + produceMessages(jmsQueueName, "Test string message"); LOG.debug("Creating the consumer ..."); KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());