CAMEL-7914: MQTT Endpoint disconnects on failure. Does not reconnect Conflicts: components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConfiguration.java
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e26a0864 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e26a0864 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e26a0864 Branch: refs/heads/camel-2.13.x Commit: e26a08640cf3e37135fa58d3b3ef15fb9d0c6d44 Parents: dc7a1a9 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Feb 15 11:15:29 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Feb 15 11:33:36 2015 +0100 ---------------------------------------------------------------------- .../camel/component/mqtt/MQTTEndpoint.java | 54 ++++++++++- .../mqtt/MQTTProducerReconnectTest.java | 96 ++++++++++++++++++++ 2 files changed, 146 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e26a0864/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java index 5e167d7..a6524f4 100644 --- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java +++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java @@ -19,6 +19,7 @@ package org.apache.camel.component.mqtt; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.camel.Consumer; import org.apache.camel.Exchange; @@ -43,6 +44,8 @@ import org.slf4j.LoggerFactory; public class MQTTEndpoint extends DefaultEndpoint { private static final Logger LOG = LoggerFactory.getLogger(MQTTEndpoint.class); + private static final int PUBLISH_MAX_RECONNECT_ATTEMPTS = 3; + private CallbackConnection connection; private final MQTTConfiguration configuration; private volatile boolean connected; @@ -72,14 +75,21 @@ public class MQTTEndpoint extends DefaultEndpoint { @Override protected void doStart() throws Exception { super.doStart(); + + createConnection(); + } + + protected void createConnection() { connection = configuration.callbackConnection(); connection.listener(new Listener() { public void onConnected() { + connected = true; LOG.info("MQTT Connection connected to {}", configuration.getHost()); } public void onDisconnected() { + connected = false; LOG.debug("MQTT Connection disconnected from {}", configuration.getHost()); } @@ -98,6 +108,9 @@ public class MQTTEndpoint extends DefaultEndpoint { } public void onFailure(Throwable value) { + // mark this connection as disconnected so we force re-connect + connected = false; + LOG.warn("Connection to " + configuration.getHost() + " failure due " + value.getMessage() + ". Forcing a disconnect to re-connect on next attempt."); connection.disconnect(new Callback<Void>() { public void onSuccess(Void value) { } @@ -108,11 +121,11 @@ public class MQTTEndpoint extends DefaultEndpoint { }); } }); - - } protected void doStop() throws Exception { + super.doStop(); + if (connection != null) { final Promise<Void> promise = new Promise<Void>(); connection.getDispatchQueue().execute(new Task() { @@ -131,13 +144,14 @@ public class MQTTEndpoint extends DefaultEndpoint { }); promise.await(configuration.getDisconnectWaitInSeconds(), TimeUnit.SECONDS); } - super.doStop(); } - + void connect() throws Exception { final Promise<Object> promise = new Promise<Object>(); connection.connect(new Callback<Void>() { public void onSuccess(Void value) { + LOG.debug("Connected to {}", configuration.getHost()); + String subscribeTopicName = configuration.getSubscribeTopicName(); subscribeTopicName = subscribeTopicName != null ? subscribeTopicName.trim() : null; @@ -163,11 +177,13 @@ public class MQTTEndpoint extends DefaultEndpoint { } public void onFailure(Throwable value) { + LOG.warn("Failed to connect to " + configuration.getHost() + " due " + value.getMessage()); promise.onFailure(value); connection.disconnect(null); connected = false; } }); + LOG.info("Connecting to {} using {} seconds timeout", configuration.getHost(), configuration.getConnectWaitInSeconds()); promise.await(configuration.getConnectWaitInSeconds(), TimeUnit.SECONDS); } @@ -176,9 +192,39 @@ public class MQTTEndpoint extends DefaultEndpoint { } void publish(final String topic, final byte[] payload, final QoS qoS, final boolean retain, final Callback<Void> callback) throws Exception { + // if not connected then create a new connection to re-connect + boolean done = isConnected(); + int attempt = 0; + TimeoutException timeout = null; + while (!done && attempt <= PUBLISH_MAX_RECONNECT_ATTEMPTS) { + attempt++; + try { + LOG.warn("#{} attempt to re-create connection to {} before publishing", attempt, configuration.getHost()); + createConnection(); + connect(); + } catch (TimeoutException e) { + timeout = e; + LOG.debug("Timed out after {} seconds after {} attempt to re-create connection to {}", + new Object[]{configuration.getConnectWaitInSeconds(), attempt, configuration.getHost()}); + } catch (Throwable e) { + // other kind of error then exit asap + callback.onFailure(e); + return; + } + + done = isConnected(); + } + + if (attempt > 3 && !isConnected()) { + LOG.warn("Cannot re-connect to {} after {} attempts", configuration.getHost(), attempt); + callback.onFailure(timeout); + return; + } + connection.getDispatchQueue().execute(new Task() { @Override public void run() { + LOG.debug("Publishing to {}", configuration.getHost()); connection.publish(topic, payload, qoS, retain, callback); } }); http://git-wip-us.apache.org/repos/asf/camel/blob/e26a0864/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTProducerReconnectTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTProducerReconnectTest.java b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTProducerReconnectTest.java new file mode 100644 index 0000000..15281a8 --- /dev/null +++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTProducerReconnectTest.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mqtt; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.builder.RouteBuilder; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.Message; +import org.fusesource.mqtt.client.QoS; +import org.fusesource.mqtt.client.Topic; +import org.junit.Test; + +public class MQTTProducerReconnectTest extends MQTTBaseTest { + + @Test + public void testProduce() throws Exception { + MQTT mqtt = new MQTT(); + final BlockingConnection subscribeConnection = mqtt.blockingConnection(); + subscribeConnection.connect(); + Topic topic = new Topic(TEST_TOPIC, QoS.AT_MOST_ONCE); + Topic[] topics = {topic}; + subscribeConnection.subscribe(topics); + final CountDownLatch latch = new CountDownLatch(numberOfMessages); + + Thread thread = new Thread(new Runnable() { + public void run() { + for (int i = 0; i < numberOfMessages; i++) { + try { + Message message = subscribeConnection.receive(); + message.ack(); + latch.countDown(); + } catch (Exception e) { + e.printStackTrace(); + break; + } + } + } + }); + thread.start(); + + for (int i = 0; i < numberOfMessages; i++) { + + if (i == 5) { + LOG.info("Stopping MQTT transport"); + brokerService.getTransportConnectorByScheme("mqtt").stop(); + + Thread starter = new Thread(new Runnable() { + public void run() { + try { + Thread.sleep(3000); + LOG.info("Starting MQTT transport again"); + brokerService.getTransportConnectorByScheme("mqtt").start(); + } catch (Exception e) { + // ignore + } + } + }); + starter.start(); + } + + try { + template.sendBody("direct:foo", "test message " + i); + } catch (Exception e) { + // ignore + } + } + + latch.await(20, TimeUnit.SECONDS); + assertTrue("Messages not consumed = " + latch.getCount(), latch.getCount() == 0); + } + + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from("direct:foo").to("mqtt:boo?reconnectDelay=1000&publishTopicName=" + TEST_TOPIC); + } + }; + } +}