Repository: camel Updated Branches: refs/heads/camel-2.13.x dc7a1a901 -> e26a08640 refs/heads/camel-2.14.x c6ff77dda -> 0218061f0 refs/heads/master 594842a20 -> 15d7c596c
CAMEL-7914: MQTT Endpoint disconnects on failure. Does not reconnect Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/15d7c596 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/15d7c596 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/15d7c596 Branch: refs/heads/master Commit: 15d7c596c4556ba5c85d0ebf4801228df85a2a58 Parents: 594842a Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Feb 15 11:15:29 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Feb 15 11:32:15 2015 +0100 ---------------------------------------------------------------------- .../camel/component/mqtt/MQTTConfiguration.java | 261 +++++++++++++++++++ .../camel/component/mqtt/MQTTEndpoint.java | 54 +++- .../mqtt/MQTTProducerReconnectTest.java | 96 +++++++ 3 files changed, 407 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/15d7c596/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConfiguration.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConfiguration.java index 428ffad..97e906c 100644 --- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConfiguration.java +++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConfiguration.java @@ -17,11 +17,13 @@ package org.apache.camel.component.mqtt; import java.net.URI; +import java.net.URISyntaxException; import java.util.concurrent.Executor; import javax.net.ssl.SSLContext; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriParams; +import org.fusesource.hawtbuf.UTF8Buffer; import org.fusesource.hawtdispatch.DispatchQueue; import org.fusesource.hawtdispatch.transport.TcpTransport; import org.fusesource.mqtt.client.MQTT; @@ -206,6 +208,265 @@ public class MQTTConfiguration extends MQTT { throw new IllegalArgumentException("There is no QoS with name " + qualityOfService); } + @Override + public void setTracer(Tracer tracer) { + super.setTracer(tracer); + } + + @Override + public void setClientId(String clientId) { + super.setClientId(clientId); + } + + @Override + public void setClientId(UTF8Buffer clientId) { + super.setClientId(clientId); + } + + @Override + public void setKeepAlive(short keepAlive) { + super.setKeepAlive(keepAlive); + } + + @Override + public void setPassword(String password) { + super.setPassword(password); + } + + @Override + public void setPassword(UTF8Buffer password) { + super.setPassword(password); + } + + @Override + public void setUserName(String userName) { + super.setUserName(userName); + } + + @Override + public void setUserName(UTF8Buffer userName) { + super.setUserName(userName); + } + + @Override + public void setWillMessage(String willMessage) { + super.setWillMessage(willMessage); + } + + @Override + public void setWillMessage(UTF8Buffer willMessage) { + super.setWillMessage(willMessage); + } + + @Override + public void setWillQos(QoS willQos) { + super.setWillQos(willQos); + } + + @Override + public void setVersion(String version) { + super.setVersion(version); + } + + @Override + public String getVersion() { + return super.getVersion(); + } + + @Override + public void setWillRetain(boolean willRetain) { + super.setWillRetain(willRetain); + } + + @Override + public void setWillTopic(String willTopic) { + super.setWillTopic(willTopic); + } + + @Override + public void setWillTopic(UTF8Buffer willTopic) { + super.setWillTopic(willTopic); + } + + @Override + public Executor getBlockingExecutor() { + return super.getBlockingExecutor(); + } + + @Override + public void setBlockingExecutor(Executor blockingExecutor) { + super.setBlockingExecutor(blockingExecutor); + } + + @Override + public DispatchQueue getDispatchQueue() { + return super.getDispatchQueue(); + } + + @Override + public void setDispatchQueue(DispatchQueue dispatchQueue) { + super.setDispatchQueue(dispatchQueue); + } + + @Override + public URI getLocalAddress() { + return super.getLocalAddress(); + } + + @Override + public void setLocalAddress(String localAddress) throws URISyntaxException { + super.setLocalAddress(localAddress); + } + + @Override + public void setLocalAddress(URI localAddress) { + super.setLocalAddress(localAddress); + } + + @Override + public int getMaxReadRate() { + return super.getMaxReadRate(); + } + + @Override + public void setMaxReadRate(int maxReadRate) { + super.setMaxReadRate(maxReadRate); + } + + @Override + public int getMaxWriteRate() { + return super.getMaxWriteRate(); + } + + @Override + public void setMaxWriteRate(int maxWriteRate) { + super.setMaxWriteRate(maxWriteRate); + } + + @Override + public int getReceiveBufferSize() { + return super.getReceiveBufferSize(); + } + + @Override + public void setReceiveBufferSize(int receiveBufferSize) { + super.setReceiveBufferSize(receiveBufferSize); + } + + @Override + public URI getHost() { + return super.getHost(); + } + + @Override + public void setHost(String host, int port) throws URISyntaxException { + super.setHost(host, port); + } + + @Override + public void setHost(String host) throws URISyntaxException { + super.setHost(host); + } + + @Override + public void setHost(URI host) { + super.setHost(host); + } + + @Override + public int getSendBufferSize() { + return super.getSendBufferSize(); + } + + @Override + public void setSendBufferSize(int sendBufferSize) { + super.setSendBufferSize(sendBufferSize); + } + + @Override + public SSLContext getSslContext() { + return super.getSslContext(); + } + + @Override + public void setSslContext(SSLContext sslContext) { + super.setSslContext(sslContext); + } + + @Override + public int getTrafficClass() { + return super.getTrafficClass(); + } + + @Override + public void setTrafficClass(int trafficClass) { + super.setTrafficClass(trafficClass); + } + + @Override + public boolean isUseLocalHost() { + return super.isUseLocalHost(); + } + + @Override + public void setUseLocalHost(boolean useLocalHost) { + super.setUseLocalHost(useLocalHost); + } + + @Override + public long getConnectAttemptsMax() { + return super.getConnectAttemptsMax(); + } + + @Override + public void setConnectAttemptsMax(long connectAttemptsMax) { + super.setConnectAttemptsMax(connectAttemptsMax); + } + + @Override + public long getReconnectAttemptsMax() { + return super.getReconnectAttemptsMax(); + } + + @Override + public void setReconnectAttemptsMax(long reconnectAttemptsMax) { + super.setReconnectAttemptsMax(reconnectAttemptsMax); + } + + @Override + public double getReconnectBackOffMultiplier() { + return super.getReconnectBackOffMultiplier(); + } + + @Override + public void setReconnectBackOffMultiplier(double reconnectBackOffMultiplier) { + super.setReconnectBackOffMultiplier(reconnectBackOffMultiplier); + } + + @Override + public long getReconnectDelay() { + return super.getReconnectDelay(); + } + + @Override + public void setReconnectDelay(long reconnectDelay) { + super.setReconnectDelay(reconnectDelay); + } + + @Override + public long getReconnectDelayMax() { + return super.getReconnectDelayMax(); + } + + @Override + public void setReconnectDelayMax(long reconnectDelayMax) { + super.setReconnectDelayMax(reconnectDelayMax); + } + + @Override + public Tracer getTracer() { + return super.getTracer(); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/15d7c596/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java index 7e1737b..3563f6e 100644 --- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java +++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java @@ -19,6 +19,7 @@ package org.apache.camel.component.mqtt; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.camel.Consumer; import org.apache.camel.Exchange; @@ -47,6 +48,8 @@ import org.slf4j.LoggerFactory; public class MQTTEndpoint extends DefaultEndpoint { private static final Logger LOG = LoggerFactory.getLogger(MQTTEndpoint.class); + private static final int PUBLISH_MAX_RECONNECT_ATTEMPTS = 3; + private CallbackConnection connection; @UriPath private String name; @@ -87,14 +90,21 @@ public class MQTTEndpoint extends DefaultEndpoint { @Override protected void doStart() throws Exception { super.doStart(); + + createConnection(); + } + + protected void createConnection() { connection = configuration.callbackConnection(); connection.listener(new Listener() { public void onConnected() { + connected = true; LOG.info("MQTT Connection connected to {}", configuration.getHost()); } public void onDisconnected() { + connected = false; LOG.debug("MQTT Connection disconnected from {}", configuration.getHost()); } @@ -113,6 +123,9 @@ public class MQTTEndpoint extends DefaultEndpoint { } public void onFailure(Throwable value) { + // mark this connection as disconnected so we force re-connect + connected = false; + LOG.warn("Connection to " + configuration.getHost() + " failure due " + value.getMessage() + ". Forcing a disconnect to re-connect on next attempt."); connection.disconnect(new Callback<Void>() { public void onSuccess(Void value) { } @@ -123,11 +136,11 @@ public class MQTTEndpoint extends DefaultEndpoint { }); } }); - - } protected void doStop() throws Exception { + super.doStop(); + if (connection != null) { final Promise<Void> promise = new Promise<Void>(); connection.getDispatchQueue().execute(new Task() { @@ -146,13 +159,14 @@ public class MQTTEndpoint extends DefaultEndpoint { }); promise.await(configuration.getDisconnectWaitInSeconds(), TimeUnit.SECONDS); } - super.doStop(); } - + void connect() throws Exception { final Promise<Object> promise = new Promise<Object>(); connection.connect(new Callback<Void>() { public void onSuccess(Void value) { + LOG.debug("Connected to {}", configuration.getHost()); + String subscribeTopicName = configuration.getSubscribeTopicName(); subscribeTopicName = subscribeTopicName != null ? subscribeTopicName.trim() : null; @@ -178,11 +192,13 @@ public class MQTTEndpoint extends DefaultEndpoint { } public void onFailure(Throwable value) { + LOG.warn("Failed to connect to " + configuration.getHost() + " due " + value.getMessage()); promise.onFailure(value); connection.disconnect(null); connected = false; } }); + LOG.info("Connecting to {} using {} seconds timeout", configuration.getHost(), configuration.getConnectWaitInSeconds()); promise.await(configuration.getConnectWaitInSeconds(), TimeUnit.SECONDS); } @@ -191,9 +207,39 @@ public class MQTTEndpoint extends DefaultEndpoint { } void publish(final String topic, final byte[] payload, final QoS qoS, final boolean retain, final Callback<Void> callback) throws Exception { + // if not connected then create a new connection to re-connect + boolean done = isConnected(); + int attempt = 0; + TimeoutException timeout = null; + while (!done && attempt <= PUBLISH_MAX_RECONNECT_ATTEMPTS) { + attempt++; + try { + LOG.warn("#{} attempt to re-create connection to {} before publishing", attempt, configuration.getHost()); + createConnection(); + connect(); + } catch (TimeoutException e) { + timeout = e; + LOG.debug("Timed out after {} seconds after {} attempt to re-create connection to {}", + new Object[]{configuration.getConnectWaitInSeconds(), attempt, configuration.getHost()}); + } catch (Throwable e) { + // other kind of error then exit asap + callback.onFailure(e); + return; + } + + done = isConnected(); + } + + if (attempt > 3 && !isConnected()) { + LOG.warn("Cannot re-connect to {} after {} attempts", configuration.getHost(), attempt); + callback.onFailure(timeout); + return; + } + connection.getDispatchQueue().execute(new Task() { @Override public void run() { + LOG.debug("Publishing to {}", configuration.getHost()); connection.publish(topic, payload, qoS, retain, callback); } }); http://git-wip-us.apache.org/repos/asf/camel/blob/15d7c596/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTProducerReconnectTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTProducerReconnectTest.java b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTProducerReconnectTest.java new file mode 100644 index 0000000..15281a8 --- /dev/null +++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTProducerReconnectTest.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mqtt; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.builder.RouteBuilder; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.Message; +import org.fusesource.mqtt.client.QoS; +import org.fusesource.mqtt.client.Topic; +import org.junit.Test; + +public class MQTTProducerReconnectTest extends MQTTBaseTest { + + @Test + public void testProduce() throws Exception { + MQTT mqtt = new MQTT(); + final BlockingConnection subscribeConnection = mqtt.blockingConnection(); + subscribeConnection.connect(); + Topic topic = new Topic(TEST_TOPIC, QoS.AT_MOST_ONCE); + Topic[] topics = {topic}; + subscribeConnection.subscribe(topics); + final CountDownLatch latch = new CountDownLatch(numberOfMessages); + + Thread thread = new Thread(new Runnable() { + public void run() { + for (int i = 0; i < numberOfMessages; i++) { + try { + Message message = subscribeConnection.receive(); + message.ack(); + latch.countDown(); + } catch (Exception e) { + e.printStackTrace(); + break; + } + } + } + }); + thread.start(); + + for (int i = 0; i < numberOfMessages; i++) { + + if (i == 5) { + LOG.info("Stopping MQTT transport"); + brokerService.getTransportConnectorByScheme("mqtt").stop(); + + Thread starter = new Thread(new Runnable() { + public void run() { + try { + Thread.sleep(3000); + LOG.info("Starting MQTT transport again"); + brokerService.getTransportConnectorByScheme("mqtt").start(); + } catch (Exception e) { + // ignore + } + } + }); + starter.start(); + } + + try { + template.sendBody("direct:foo", "test message " + i); + } catch (Exception e) { + // ignore + } + } + + latch.await(20, TimeUnit.SECONDS); + assertTrue("Messages not consumed = " + latch.getCount(), latch.getCount() == 0); + } + + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from("direct:foo").to("mqtt:boo?reconnectDelay=1000&publishTopicName=" + TEST_TOPIC); + } + }; + } +}