This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.x by this push: new c2c6bd85797 CAMEL-18779: Paho (#9046) c2c6bd85797 is described below commit c2c6bd857970eeb89f067bb521bdf0e44f9a7cda Author: Federico Mariani <34543311+cro...@users.noreply.github.com> AuthorDate: Wed Jan 11 18:00:37 2023 +0100 CAMEL-18779: Paho (#9046) --- components/camel-paho/pom.xml | 8 ++++- .../camel/component/paho/PahoComponentTest.java | 42 +++++++++------------- .../component/paho/PahoOverrideTopicTest.java | 18 ++-------- .../paho/PahoReconnectAfterFailureTest.java | 38 +++++++------------- .../camel/component/paho/PahoTestSupport.java | 28 +++++++++++++++ .../component/paho/PahoToDSendDynamicTest.java | 18 ++-------- .../apache/camel/component/paho/PahoToDTest.java | 27 +++++--------- .../services/AbstractArtemisEmbeddedService.java | 14 ++++++-- ...otocolsService.java => ArtemisMQTTService.java} | 31 +++++++--------- .../artemis/services/ArtemisServiceFactory.java | 17 +++++++++ .../services/ArtemisTCPAllProtocolsService.java | 2 +- 11 files changed, 118 insertions(+), 125 deletions(-) diff --git a/components/camel-paho/pom.xml b/components/camel-paho/pom.xml index 574d8747aa3..b65c92bc70f 100644 --- a/components/camel-paho/pom.xml +++ b/components/camel-paho/pom.xml @@ -57,9 +57,15 @@ <scope>test</scope> </dependency> <!-- test infra --> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>artemis-mqtt-protocol</artifactId> + <version>${activemq-artemis-version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.camel</groupId> - <artifactId>camel-test-infra-activemq</artifactId> + <artifactId>camel-test-infra-artemis</artifactId> <version>${project.version}</version> <scope>test</scope> <type>test-jar</type> diff --git a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java index 6ccf127c8a0..f705df2a1ea 100644 --- a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java +++ b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java @@ -22,27 +22,13 @@ import org.apache.camel.EndpointInject; import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.test.AvailablePortFinder; -import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedService; -import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedServiceBuilder; -import org.apache.camel.test.junit5.CamelTestSupport; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -public class PahoComponentTest extends CamelTestSupport { - - static int mqttPort = AvailablePortFinder.getNextAvailable(); - - @RegisterExtension - public ActiveMQEmbeddedService service = ActiveMQEmbeddedServiceBuilder - .bare() - .withPersistent(false) - .withMqttTransport(mqttPort) - .build(); +public class PahoComponentTest extends PahoTestSupport { @EndpointInject("mock:test") MockEndpoint mock; @@ -63,15 +49,17 @@ public class PahoComponentTest extends CamelTestSupport { PahoComponent customizedPaho = new PahoComponent(); context.addComponent("customizedPaho", customizedPaho); - from("direct:test").to("paho:queue?brokerUrl=tcp://localhost:" + mqttPort); - from("paho:queue?brokerUrl=tcp://localhost:" + mqttPort).to("mock:test"); + from("direct:test").to("paho:queue?brokerUrl=" + service.serviceAddress()); + from("paho:queue?brokerUrl=" + service.serviceAddress()).to("mock:test"); - from("direct:test2").to("paho:queue?brokerUrl=tcp://localhost:" + mqttPort); + from("direct:test2").to("paho:queue?brokerUrl=" + service.serviceAddress()); - from("paho:persistenceTest?persistence=FILE&brokerUrl=tcp://localhost:" + mqttPort).to("mock:persistenceTest"); + from("paho:persistenceTest?persistence=FILE&brokerUrl=" + service.serviceAddress()) + .to("mock:persistenceTest"); - from("direct:testCustomizedPaho").to("customizedPaho:testCustomizedPaho?brokerUrl=tcp://localhost:" + mqttPort); - from("paho:testCustomizedPaho?brokerUrl=tcp://localhost:" + mqttPort).to("mock:testCustomizedPaho"); + from("direct:testCustomizedPaho") + .to("customizedPaho:testCustomizedPaho?brokerUrl=" + service.serviceAddress()); + from("paho:testCustomizedPaho?brokerUrl=" + service.serviceAddress()).to("mock:testCustomizedPaho"); } }; } @@ -80,7 +68,8 @@ public class PahoComponentTest extends CamelTestSupport { @Test public void checkOptions() { - String uri = "paho:/test/topic" + "?clientId=sampleClient" + "&brokerUrl=tcp://localhost:" + mqttPort + "&qos=2" + String uri = "paho:/test/topic" + "?clientId=sampleClient" + "&brokerUrl=" + service.serviceAddress() + + "&qos=2" + "&persistence=file"; PahoEndpoint endpoint = getMandatoryEndpoint(uri, PahoEndpoint.class); @@ -88,7 +77,7 @@ public class PahoComponentTest extends CamelTestSupport { // Then assertEquals("/test/topic", endpoint.getTopic()); assertEquals("sampleClient", endpoint.getConfiguration().getClientId()); - assertEquals("tcp://localhost:" + mqttPort, endpoint.getConfiguration().getBrokerUrl()); + assertEquals("" + service.serviceAddress(), endpoint.getConfiguration().getBrokerUrl()); assertEquals(2, endpoint.getConfiguration().getQos()); assertEquals(PahoPersistence.FILE, endpoint.getConfiguration().getPersistence()); } @@ -112,7 +101,7 @@ public class PahoComponentTest extends CamelTestSupport { mock.expectedMessageCount(0); // When - template.sendBody("paho:someRandomQueue?brokerUrl=tcp://localhost:" + mqttPort, "msg"); + template.sendBody("paho:someRandomQueue?brokerUrl=" + service.serviceAddress(), "msg"); // Then mock.assertIsSatisfied(); @@ -174,10 +163,11 @@ public class PahoComponentTest extends CamelTestSupport { mock.expectedMessageCount(0); // When - template.sendBody("paho:someRandomQueue?brokerUrl=tcp://localhost:" + mqttPort + "&userName=test&password=test", "msg"); + template.sendBody( + "paho:someRandomQueue?brokerUrl=" + service.serviceAddress() + "&userName=test&password=test", + "msg"); // Then mock.assertIsSatisfied(); } - } diff --git a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoOverrideTopicTest.java b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoOverrideTopicTest.java index 75fac1a680b..7520bcf2102 100644 --- a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoOverrideTopicTest.java +++ b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoOverrideTopicTest.java @@ -18,23 +18,9 @@ package org.apache.camel.component.paho; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.test.AvailablePortFinder; -import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedService; -import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedServiceBuilder; -import org.apache.camel.test.junit5.CamelTestSupport; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; -public class PahoOverrideTopicTest extends CamelTestSupport { - - static int mqttPort = AvailablePortFinder.getNextAvailable(); - - @RegisterExtension - public ActiveMQEmbeddedService service = ActiveMQEmbeddedServiceBuilder - .bare() - .withPersistent(false) - .withMqttTransport(mqttPort) - .build(); +public class PahoOverrideTopicTest extends PahoTestSupport { @Override protected boolean useJmx() { @@ -47,7 +33,7 @@ public class PahoOverrideTopicTest extends CamelTestSupport { @Override public void configure() { PahoComponent paho = context.getComponent("paho", PahoComponent.class); - paho.getConfiguration().setBrokerUrl("tcp://localhost:" + mqttPort); + paho.getConfiguration().setBrokerUrl("tcp://localhost:" + service.brokerPort()); from("direct:test").to("paho:queue").log("Message sent"); diff --git a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoReconnectAfterFailureTest.java b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoReconnectAfterFailureTest.java index 2065523b1d4..856c0d1b2ad 100644 --- a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoReconnectAfterFailureTest.java +++ b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoReconnectAfterFailureTest.java @@ -19,7 +19,6 @@ package org.apache.camel.component.paho; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.activemq.broker.BrokerService; import org.apache.camel.CamelContext; import org.apache.camel.EndpointInject; import org.apache.camel.Route; @@ -30,7 +29,7 @@ import org.apache.camel.spi.RouteController; import org.apache.camel.spi.SupervisingRouteController; import org.apache.camel.support.RoutePolicySupport; import org.apache.camel.test.AvailablePortFinder; -import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedServiceBuilder; +import org.apache.camel.test.infra.artemis.services.ArtemisMQTTService; import org.apache.camel.test.junit5.CamelTestSupport; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; @@ -42,10 +41,11 @@ import static org.junit.jupiter.api.Assertions.fail; public class PahoReconnectAfterFailureTest extends CamelTestSupport { public static final String TESTING_ROUTE_ID = "testingRoute"; - BrokerService broker; - int mqttPort = AvailablePortFinder.getNextAvailable(); + ArtemisMQTTService broker; + CountDownLatch routeStartedLatch = new CountDownLatch(1); + int port = AvailablePortFinder.getNextAvailable(); @EndpointInject("mock:test") MockEndpoint mock; @@ -55,23 +55,10 @@ public class PahoReconnectAfterFailureTest extends CamelTestSupport { return false; } - @Override - public void doPreSetup() throws Exception { - super.doPreSetup(); - - broker = ActiveMQEmbeddedServiceBuilder - .bare() - .withPersistent(false) - .build().getBrokerService(); - - // Broker will be started later, after camel context is started, - // to ensure first consumer connection fails - } - @Override protected CamelContext createCamelContext() throws Exception { CamelContext context = super.createCamelContext(); - // Setup supervisor to restart routes because paho consumer + // Setup supervisor to restart routes because paho consumer // is not able to recover automatically on startup SupervisingRouteController supervising = context.getRouteController().supervising(); supervising.setBackOffDelay(500); @@ -83,7 +70,9 @@ public class PahoReconnectAfterFailureTest extends CamelTestSupport { @AfterEach public void tearDown() throws Exception { super.tearDown(); - broker.stop(); + if (broker != null) { + broker.shutdown(); + } } @Override @@ -92,8 +81,8 @@ public class PahoReconnectAfterFailureTest extends CamelTestSupport { @Override public void configure() { - from("direct:test").to("paho:queue?lazyStartProducer=true&brokerUrl=tcp://localhost:" + mqttPort); - from("paho:queue?brokerUrl=tcp://localhost:" + mqttPort) + from("direct:test").to("paho:queue?lazyStartProducer=true&brokerUrl=tcp://localhost:" + port); + from("paho:queue?brokerUrl=tcp://localhost:" + port) .id(TESTING_ROUTE_ID) .routePolicy(new RoutePolicySupport() { @Override @@ -125,11 +114,10 @@ public class PahoReconnectAfterFailureTest extends CamelTestSupport { mock.expectedBodiesReceived(msg); // When - template.sendBody("paho:queue?lazyStartProducer=true&brokerUrl=tcp://localhost:" + mqttPort, msg); + template.sendBody("paho:queue?lazyStartProducer=true&brokerUrl=tcp://localhost:" + port, msg); // Then mock.assertIsSatisfied(); - } @Test @@ -153,7 +141,7 @@ public class PahoReconnectAfterFailureTest extends CamelTestSupport { } private void startBroker() throws Exception { - broker.addConnector("mqtt://localhost:" + mqttPort); - broker.start(); + broker = new ArtemisMQTTService(port); + broker.initialize(); } } diff --git a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoTestSupport.java b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoTestSupport.java new file mode 100644 index 00000000000..56f315c8fd1 --- /dev/null +++ b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoTestSupport.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.paho; + +import org.apache.camel.test.infra.artemis.services.ArtemisService; +import org.apache.camel.test.infra.artemis.services.ArtemisServiceFactory; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class PahoTestSupport extends CamelTestSupport { + + @RegisterExtension + public static ArtemisService service = ArtemisServiceFactory.createSingletonMQTTService(); +} diff --git a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoToDSendDynamicTest.java b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoToDSendDynamicTest.java index 1deeb5abf60..48e3e4c3393 100644 --- a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoToDSendDynamicTest.java +++ b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoToDSendDynamicTest.java @@ -17,25 +17,11 @@ package org.apache.camel.component.paho; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.test.AvailablePortFinder; -import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedService; -import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedServiceBuilder; -import org.apache.camel.test.junit5.CamelTestSupport; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; import static org.junit.jupiter.api.Assertions.assertEquals; -public class PahoToDSendDynamicTest extends CamelTestSupport { - - static int mqttPort = AvailablePortFinder.getNextAvailable(); - - @RegisterExtension - public ActiveMQEmbeddedService service = ActiveMQEmbeddedServiceBuilder - .bare() - .withPersistent(false) - .withMqttTransport(mqttPort) - .build(); +public class PahoToDSendDynamicTest extends PahoTestSupport { @Override protected boolean useJmx() { @@ -71,7 +57,7 @@ public class PahoToDSendDynamicTest extends CamelTestSupport { @Override public void configure() { PahoComponent paho = context.getComponent("paho", PahoComponent.class); - paho.getConfiguration().setBrokerUrl("tcp://localhost:" + mqttPort); + paho.getConfiguration().setBrokerUrl("tcp://localhost:" + service.brokerPort()); // route message dynamic using toD from("direct:start").toD("paho:${header.where}?retained=true"); diff --git a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoToDTest.java b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoToDTest.java index e7c762715ea..7ed4ba9179d 100644 --- a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoToDTest.java +++ b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoToDTest.java @@ -18,23 +18,9 @@ package org.apache.camel.component.paho; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.test.AvailablePortFinder; -import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedService; -import org.apache.camel.test.infra.activemq.services.ActiveMQEmbeddedServiceBuilder; -import org.apache.camel.test.junit5.CamelTestSupport; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; -public class PahoToDTest extends CamelTestSupport { - - static int mqttPort = AvailablePortFinder.getNextAvailable(); - - @RegisterExtension - public ActiveMQEmbeddedService service = ActiveMQEmbeddedServiceBuilder - .bare() - .withPersistent(false) - .withMqttTransport(mqttPort) - .build(); +public class PahoToDTest extends PahoTestSupport { @Override protected boolean useJmx() { @@ -43,13 +29,16 @@ public class PahoToDTest extends CamelTestSupport { @Test public void testToD() throws Exception { - getMockEndpoint("mock:bar").expectedBodiesReceived("Hello bar"); - getMockEndpoint("mock:beer").expectedBodiesReceived("Hello beer"); + MockEndpoint bar = getMockEndpoint("mock:bar"); + bar.expectedBodiesReceived("Hello bar", null); // issue with Artemis + MockEndpoint beer = getMockEndpoint("mock:beer"); + beer.expectedBodiesReceived("Hello beer"); template.sendBodyAndHeader("direct:start", "Hello bar", "where", "bar"); template.sendBodyAndHeader("direct:start", "Hello beer", "where", "beer"); - MockEndpoint.assertIsSatisfied(context); + bar.assertIsSatisfied(); + beer.assertIsSatisfied(); } @Override @@ -58,7 +47,7 @@ public class PahoToDTest extends CamelTestSupport { @Override public void configure() { PahoComponent paho = context.getComponent("paho", PahoComponent.class); - paho.getConfiguration().setBrokerUrl("tcp://localhost:" + mqttPort); + paho.getConfiguration().setBrokerUrl("tcp://localhost:" + service.brokerPort()); // route message dynamic using toD from("direct:start").toD("paho:${header.where}"); diff --git a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/AbstractArtemisEmbeddedService.java b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/AbstractArtemisEmbeddedService.java index 0d977d6e804..b5556dfef53 100644 --- a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/AbstractArtemisEmbeddedService.java +++ b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/AbstractArtemisEmbeddedService.java @@ -48,6 +48,18 @@ public abstract class AbstractArtemisEmbeddedService implements ArtemisService, private Configuration artemisConfiguration; public AbstractArtemisEmbeddedService() { + defaultConfigturation(); + + embeddedBrokerService.setConfiguration(getConfiguration(artemisConfiguration, AvailablePortFinder.getNextAvailable())); + } + + public AbstractArtemisEmbeddedService(int port) { + defaultConfigturation(); + + embeddedBrokerService.setConfiguration(getConfiguration(artemisConfiguration, port)); + } + + private void defaultConfigturation() { embeddedBrokerService = new EmbeddedActiveMQ(); // Base configuration @@ -56,8 +68,6 @@ public abstract class AbstractArtemisEmbeddedService implements ArtemisService, BROKER_COUNT.increment(); artemisConfiguration.setBrokerInstance(new File("target", "artemis-" + BROKER_COUNT.intValue())); artemisConfiguration.setJMXManagementEnabled(false); - - embeddedBrokerService.setConfiguration(getConfiguration(artemisConfiguration, AvailablePortFinder.getNextAvailable())); } protected abstract Configuration getConfiguration(Configuration artemisConfiguration, int port); diff --git a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisTCPAllProtocolsService.java b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisMQTTService.java similarity index 52% copy from test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisTCPAllProtocolsService.java copy to test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisMQTTService.java index 76d6d14dafa..e0d38e3c268 100644 --- a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisTCPAllProtocolsService.java +++ b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisMQTTService.java @@ -16,41 +16,34 @@ */ package org.apache.camel.test.infra.artemis.services; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.config.Configuration; -import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; -import org.apache.activemq.artemis.core.settings.impl.AddressSettings; -import org.apache.camel.test.AvailablePortFinder; import static org.junit.jupiter.api.Assertions.fail; -public class ArtemisTCPAllProtocolsService extends AbstractArtemisEmbeddedService { +public class ArtemisMQTTService extends AbstractArtemisEmbeddedService { private String brokerURL; private int port; + public ArtemisMQTTService(int port) { + super(port); + } + + public ArtemisMQTTService() { + super(); + } + @Override protected Configuration getConfiguration(Configuration configuration, int port) { - final int brokerId = super.BROKER_COUNT.intValue(); - port = AvailablePortFinder.getNextAvailable(); + this.port = port; brokerURL = "tcp://0.0.0.0:" + port; - configuration.setPersistenceEnabled(false); try { - configuration.addAcceptorConfiguration("in-vm", "vm://" + brokerId); - configuration.addAcceptorConfiguration("connector", brokerURL + "?protocols=CORE,AMQP,HORNETQ,OPENWIRE"); - configuration.addConnectorConfiguration("connector", - new TransportConfiguration(NettyConnectorFactory.class.getName())); - configuration.setJournalDirectory("target/data/journal"); + configuration.addAcceptorConfiguration("mqtt", brokerURL + "?protocols=MQTT"); } catch (Exception e) { LOG.warn(e.getMessage(), e); - fail("vm acceptor cannot be configured"); + fail("mqtt acceptor cannot be configured"); } - configuration.addAddressSetting("#", - new AddressSettings() - .setDeadLetterAddress(SimpleString.toSimpleString("DLQ")) - .setExpiryAddress(SimpleString.toSimpleString("ExpiryQueue"))); return configuration; } diff --git a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisServiceFactory.java b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisServiceFactory.java index 59756053b5f..1c5080e0b30 100644 --- a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisServiceFactory.java +++ b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisServiceFactory.java @@ -26,10 +26,12 @@ public final class ArtemisServiceFactory { private static SimpleTestServiceBuilder<ArtemisService> nonPersistentInstanceBuilder; private static SimpleTestServiceBuilder<ArtemisService> persistentInstanceBuilder; private static SimpleTestServiceBuilder<ArtemisService> amqpInstanceBuilder; + private static SimpleTestServiceBuilder<ArtemisService> mqttInstanceBuilder; private static ArtemisService persistentService; private static ArtemisService nonPersistentService; private static ArtemisService amqpService; + private static ArtemisService mqttService; public static class SingletonArtemisService extends SingletonService<ArtemisService> implements ArtemisService { @@ -159,6 +161,21 @@ public final class ArtemisServiceFactory { return amqpService; } + public static synchronized ArtemisService createSingletonMQTTService() { + if (mqttService == null) { + if (mqttInstanceBuilder == null) { + mqttInstanceBuilder = new SimpleTestServiceBuilder<>("artemis"); + + mqttInstanceBuilder + .addLocalMapping(() -> new SingletonArtemisService(new ArtemisMQTTService(), "artemis-mqtt")); + } + + mqttService = mqttInstanceBuilder.build(); + } + + return mqttService; + } + public static ArtemisService createTCPAllProtocolsService() { return new ArtemisTCPAllProtocolsService(); } diff --git a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisTCPAllProtocolsService.java b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisTCPAllProtocolsService.java index 76d6d14dafa..ac494d1a41c 100644 --- a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisTCPAllProtocolsService.java +++ b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisTCPAllProtocolsService.java @@ -39,7 +39,7 @@ public class ArtemisTCPAllProtocolsService extends AbstractArtemisEmbeddedServic configuration.setPersistenceEnabled(false); try { configuration.addAcceptorConfiguration("in-vm", "vm://" + brokerId); - configuration.addAcceptorConfiguration("connector", brokerURL + "?protocols=CORE,AMQP,HORNETQ,OPENWIRE"); + configuration.addAcceptorConfiguration("connector", brokerURL + "?protocols=CORE,AMQP,HORNETQ,OPENWIRE,MQTT"); configuration.addConnectorConfiguration("connector", new TransportConfiguration(NettyConnectorFactory.class.getName())); configuration.setJournalDirectory("target/data/journal");