Repository: camel Updated Branches: refs/heads/master 879781312 -> 66a11e0c9
CAMEL-10695: camel-mqtt: TimeoutException thrown on MQTTEndpoint stop Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/66a11e0c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/66a11e0c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/66a11e0c Branch: refs/heads/master Commit: 66a11e0c96a4abb3d8902f1be96ac4bc33293d44 Parents: 8797813 Author: lburgazzoli <lburgazz...@gmail.com> Authored: Wed Jan 11 15:22:19 2017 +0100 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Wed Jan 11 15:23:30 2017 +0100 ---------------------------------------------------------------------- components/camel-mqtt/pom.xml | 5 +++ .../camel/component/mqtt/MQTTEndpoint.java | 8 ++-- .../camel/component/mqtt/MQTTBaseTest.java | 5 +-- .../camel/component/mqtt/MQTTTestSupport.java | 19 +++++++++ .../camel/component/mqtt/SpringMQTTTest.java | 35 ++++++++++++++++ .../camel/component/mqtt/SpringMQTTTest.xml | 44 ++++++++++++++++++++ 6 files changed, 109 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/66a11e0c/components/camel-mqtt/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/pom.xml b/components/camel-mqtt/pom.xml index 357f7f6..8a91e18 100644 --- a/components/camel-mqtt/pom.xml +++ b/components/camel-mqtt/pom.xml @@ -52,6 +52,11 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-spring</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-broker</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/camel/blob/66a11e0c/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java index 71e333e..455b7a8 100644 --- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java +++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java @@ -278,25 +278,27 @@ public class MQTTEndpoint extends DefaultEndpoint implements AsyncEndpoint { }); } + @Override protected void doStop() throws Exception { super.doStop(); - if (connection != null) { - final Promise<Void> promise = new Promise<Void>(); + if (connection != null && connected) { + final Promise<Void> promise = new Promise<>(); connection.getDispatchQueue().execute(new Task() { @Override public void run() { connection.disconnect(new Callback<Void>() { public void onSuccess(Void value) { + connected = false; promise.onSuccess(value); } - public void onFailure(Throwable value) { promise.onFailure(value); } }); } }); + promise.await(configuration.getDisconnectWaitInSeconds(), TimeUnit.SECONDS); } } http://git-wip-us.apache.org/repos/asf/camel/blob/66a11e0c/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java index 7d6280f..476abf4 100644 --- a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java +++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java @@ -36,10 +36,7 @@ public abstract class MQTTBaseTest extends CamelTestSupport { public void setUp() throws Exception { - brokerService = new BrokerService(); - brokerService.setPersistent(false); - brokerService.setAdvisorySupport(false); - brokerService.addConnector(MQTTTestSupport.getConnection()); + brokerService = MQTTTestSupport.newBrokerService(); brokerService.start(); super.setUp(); } http://git-wip-us.apache.org/repos/asf/camel/blob/66a11e0c/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTTestSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTTestSupport.java b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTTestSupport.java index d521b10..f870855 100644 --- a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTTestSupport.java +++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTTestSupport.java @@ -17,6 +17,7 @@ package org.apache.camel.component.mqtt; +import org.apache.activemq.broker.BrokerService; import org.apache.camel.test.AvailablePortFinder; /** @@ -58,4 +59,22 @@ public final class MQTTTestSupport { public static String getHostForMQTTEndpoint() { return HOST; } + + + + public static BrokerService newBrokerService() throws Exception { + BrokerService service = new BrokerService(); + service.setPersistent(false); + service.setAdvisorySupport(false); + service.addConnector(getConnection()); + + return service; + } + + public static MQTTComponent newComponent() throws Exception { + MQTTComponent component = new MQTTComponent(); + component.setHost(getHostForMQTTEndpoint()); + + return component; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/66a11e0c/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/SpringMQTTTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/SpringMQTTTest.java b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/SpringMQTTTest.java new file mode 100644 index 0000000..d5cdb09 --- /dev/null +++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/SpringMQTTTest.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mqtt; + +import org.apache.camel.test.spring.CamelSpringTestSupport; +import org.junit.Test; +import org.springframework.context.support.AbstractApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; +import org.springframework.test.annotation.DirtiesContext; + +@DirtiesContext +public class SpringMQTTTest extends CamelSpringTestSupport { + @Override + protected AbstractApplicationContext createApplicationContext() { + return new ClassPathXmlApplicationContext("org/apache/camel/component/mqtt/SpringMQTTTest.xml"); + } + + @Test + public void simpleTest() throws Exception { + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/66a11e0c/components/camel-mqtt/src/test/resources/org/apache/camel/component/mqtt/SpringMQTTTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/src/test/resources/org/apache/camel/component/mqtt/SpringMQTTTest.xml b/components/camel-mqtt/src/test/resources/org/apache/camel/component/mqtt/SpringMQTTTest.xml new file mode 100644 index 0000000..7574675 --- /dev/null +++ b/components/camel-mqtt/src/test/resources/org/apache/camel/component/mqtt/SpringMQTTTest.xml @@ -0,0 +1,44 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> + + <bean id="broker" + class="org.apache.camel.component.mqtt.MQTTTestSupport" + factory-method="newBrokerService" + init-method="start" + destroy-method="stop"/> + + <bean id="mqtt" + class="org.apache.camel.component.mqtt.MQTTTestSupport" + factory-method="newComponent"/> + + <camelContext id="basic-mqtt-component" xmlns="http://camel.apache.org/schema/spring" autoStartup="true"> + + <endpoint id="mqtt-endpoint" uri="mqtt:mqtt-endpoint?publishTopicName=basicMqtt/message/basic-1"/> + + <route> + <from uri="direct:in"/> + <to uri="ref:mqtt-endpoint"/> + </route> + </camelContext> + +</beans>