Repository: camel
Updated Branches:
  refs/heads/camel-2.13.x dc7a1a901 -> e26a08640
  refs/heads/camel-2.14.x c6ff77dda -> 0218061f0
  refs/heads/master 594842a20 -> 15d7c596c


CAMEL-7914: MQTT Endpoint disconnects on failure. Does not reconnect


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/15d7c596
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/15d7c596
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/15d7c596

Branch: refs/heads/master
Commit: 15d7c596c4556ba5c85d0ebf4801228df85a2a58
Parents: 594842a
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sun Feb 15 11:15:29 2015 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sun Feb 15 11:32:15 2015 +0100

----------------------------------------------------------------------
 .../camel/component/mqtt/MQTTConfiguration.java | 261 +++++++++++++++++++
 .../camel/component/mqtt/MQTTEndpoint.java      |  54 +++-
 .../mqtt/MQTTProducerReconnectTest.java         |  96 +++++++
 3 files changed, 407 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/15d7c596/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConfiguration.java
 
b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConfiguration.java
index 428ffad..97e906c 100644
--- 
a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConfiguration.java
+++ 
b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConfiguration.java
@@ -17,11 +17,13 @@
 package org.apache.camel.component.mqtt;
 
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.concurrent.Executor;
 import javax.net.ssl.SSLContext;
 
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
+import org.fusesource.hawtbuf.UTF8Buffer;
 import org.fusesource.hawtdispatch.DispatchQueue;
 import org.fusesource.hawtdispatch.transport.TcpTransport;
 import org.fusesource.mqtt.client.MQTT;
@@ -206,6 +208,265 @@ public class MQTTConfiguration extends MQTT {
         throw new IllegalArgumentException("There is no QoS with name " + 
qualityOfService);
     }
 
+    @Override
+    public void setTracer(Tracer tracer) {
+        super.setTracer(tracer);
+    }
+
+    @Override
+    public void setClientId(String clientId) {
+        super.setClientId(clientId);
+    }
+
+    @Override
+    public void setClientId(UTF8Buffer clientId) {
+        super.setClientId(clientId);
+    }
+
+    @Override
+    public void setKeepAlive(short keepAlive) {
+        super.setKeepAlive(keepAlive);
+    }
+
+    @Override
+    public void setPassword(String password) {
+        super.setPassword(password);
+    }
+
+    @Override
+    public void setPassword(UTF8Buffer password) {
+        super.setPassword(password);
+    }
+
+    @Override
+    public void setUserName(String userName) {
+        super.setUserName(userName);
+    }
+
+    @Override
+    public void setUserName(UTF8Buffer userName) {
+        super.setUserName(userName);
+    }
+
+    @Override
+    public void setWillMessage(String willMessage) {
+        super.setWillMessage(willMessage);
+    }
+
+    @Override
+    public void setWillMessage(UTF8Buffer willMessage) {
+        super.setWillMessage(willMessage);
+    }
+
+    @Override
+    public void setWillQos(QoS willQos) {
+        super.setWillQos(willQos);
+    }
+
+    @Override
+    public void setVersion(String version) {
+        super.setVersion(version);
+    }
+
+    @Override
+    public String getVersion() {
+        return super.getVersion();
+    }
+
+    @Override
+    public void setWillRetain(boolean willRetain) {
+        super.setWillRetain(willRetain);
+    }
+
+    @Override
+    public void setWillTopic(String willTopic) {
+        super.setWillTopic(willTopic);
+    }
+
+    @Override
+    public void setWillTopic(UTF8Buffer willTopic) {
+        super.setWillTopic(willTopic);
+    }
+
+    @Override
+    public Executor getBlockingExecutor() {
+        return super.getBlockingExecutor();
+    }
+
+    @Override
+    public void setBlockingExecutor(Executor blockingExecutor) {
+        super.setBlockingExecutor(blockingExecutor);
+    }
+
+    @Override
+    public DispatchQueue getDispatchQueue() {
+        return super.getDispatchQueue();
+    }
+
+    @Override
+    public void setDispatchQueue(DispatchQueue dispatchQueue) {
+        super.setDispatchQueue(dispatchQueue);
+    }
+
+    @Override
+    public URI getLocalAddress() {
+        return super.getLocalAddress();
+    }
+
+    @Override
+    public void setLocalAddress(String localAddress) throws URISyntaxException 
{
+        super.setLocalAddress(localAddress);
+    }
+
+    @Override
+    public void setLocalAddress(URI localAddress) {
+        super.setLocalAddress(localAddress);
+    }
+
+    @Override
+    public int getMaxReadRate() {
+        return super.getMaxReadRate();
+    }
+
+    @Override
+    public void setMaxReadRate(int maxReadRate) {
+        super.setMaxReadRate(maxReadRate);
+    }
+
+    @Override
+    public int getMaxWriteRate() {
+        return super.getMaxWriteRate();
+    }
+
+    @Override
+    public void setMaxWriteRate(int maxWriteRate) {
+        super.setMaxWriteRate(maxWriteRate);
+    }
+
+    @Override
+    public int getReceiveBufferSize() {
+        return super.getReceiveBufferSize();
+    }
+
+    @Override
+    public void setReceiveBufferSize(int receiveBufferSize) {
+        super.setReceiveBufferSize(receiveBufferSize);
+    }
+
+    @Override
+    public URI getHost() {
+        return super.getHost();
+    }
+
+    @Override
+    public void setHost(String host, int port) throws URISyntaxException {
+        super.setHost(host, port);
+    }
+
+    @Override
+    public void setHost(String host) throws URISyntaxException {
+        super.setHost(host);
+    }
+
+    @Override
+    public void setHost(URI host) {
+        super.setHost(host);
+    }
+
+    @Override
+    public int getSendBufferSize() {
+        return super.getSendBufferSize();
+    }
+
+    @Override
+    public void setSendBufferSize(int sendBufferSize) {
+        super.setSendBufferSize(sendBufferSize);
+    }
+
+    @Override
+    public SSLContext getSslContext() {
+        return super.getSslContext();
+    }
+
+    @Override
+    public void setSslContext(SSLContext sslContext) {
+        super.setSslContext(sslContext);
+    }
+
+    @Override
+    public int getTrafficClass() {
+        return super.getTrafficClass();
+    }
+
+    @Override
+    public void setTrafficClass(int trafficClass) {
+        super.setTrafficClass(trafficClass);
+    }
+
+    @Override
+    public boolean isUseLocalHost() {
+        return super.isUseLocalHost();
+    }
+
+    @Override
+    public void setUseLocalHost(boolean useLocalHost) {
+        super.setUseLocalHost(useLocalHost);
+    }
+
+    @Override
+    public long getConnectAttemptsMax() {
+        return super.getConnectAttemptsMax();
+    }
+
+    @Override
+    public void setConnectAttemptsMax(long connectAttemptsMax) {
+        super.setConnectAttemptsMax(connectAttemptsMax);
+    }
+
+    @Override
+    public long getReconnectAttemptsMax() {
+        return super.getReconnectAttemptsMax();
+    }
+
+    @Override
+    public void setReconnectAttemptsMax(long reconnectAttemptsMax) {
+        super.setReconnectAttemptsMax(reconnectAttemptsMax);
+    }
+
+    @Override
+    public double getReconnectBackOffMultiplier() {
+        return super.getReconnectBackOffMultiplier();
+    }
+
+    @Override
+    public void setReconnectBackOffMultiplier(double 
reconnectBackOffMultiplier) {
+        super.setReconnectBackOffMultiplier(reconnectBackOffMultiplier);
+    }
+
+    @Override
+    public long getReconnectDelay() {
+        return super.getReconnectDelay();
+    }
+
+    @Override
+    public void setReconnectDelay(long reconnectDelay) {
+        super.setReconnectDelay(reconnectDelay);
+    }
+
+    @Override
+    public long getReconnectDelayMax() {
+        return super.getReconnectDelayMax();
+    }
+
+    @Override
+    public void setReconnectDelayMax(long reconnectDelayMax) {
+        super.setReconnectDelayMax(reconnectDelayMax);
+    }
+
+    @Override
+    public Tracer getTracer() {
+        return super.getTracer();
+    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/camel/blob/15d7c596/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
 
b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
index 7e1737b..3563f6e 100644
--- 
a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
+++ 
b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.mqtt;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
@@ -47,6 +48,8 @@ import org.slf4j.LoggerFactory;
 public class MQTTEndpoint extends DefaultEndpoint {
     private static final Logger LOG = 
LoggerFactory.getLogger(MQTTEndpoint.class);
 
+    private static final int PUBLISH_MAX_RECONNECT_ATTEMPTS = 3;
+
     private CallbackConnection connection;
     @UriPath
     private String name;
@@ -87,14 +90,21 @@ public class MQTTEndpoint extends DefaultEndpoint {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
+
+        createConnection();
+    }
+
+    protected void createConnection() {
         connection = configuration.callbackConnection();
 
         connection.listener(new Listener() {
             public void onConnected() {
+                connected = true;
                 LOG.info("MQTT Connection connected to {}", 
configuration.getHost());
             }
 
             public void onDisconnected() {
+                connected = false;
                 LOG.debug("MQTT Connection disconnected from {}", 
configuration.getHost());
             }
 
@@ -113,6 +123,9 @@ public class MQTTEndpoint extends DefaultEndpoint {
             }
 
             public void onFailure(Throwable value) {
+                // mark this connection as disconnected so we force re-connect
+                connected = false;
+                LOG.warn("Connection to " + configuration.getHost() + " 
failure due " + value.getMessage() + ". Forcing a disconnect to re-connect on 
next attempt.");
                 connection.disconnect(new Callback<Void>() {
                     public void onSuccess(Void value) {
                     }
@@ -123,11 +136,11 @@ public class MQTTEndpoint extends DefaultEndpoint {
                 });
             }
         });
-
-        
     }
 
     protected void doStop() throws Exception {
+        super.doStop();
+
         if (connection != null) {
             final Promise<Void> promise = new Promise<Void>();
             connection.getDispatchQueue().execute(new Task() {
@@ -146,13 +159,14 @@ public class MQTTEndpoint extends DefaultEndpoint {
             });
             promise.await(configuration.getDisconnectWaitInSeconds(), 
TimeUnit.SECONDS);
         }
-        super.doStop();
     }
-    
+
     void connect() throws Exception {
         final Promise<Object> promise = new Promise<Object>();
         connection.connect(new Callback<Void>() {
             public void onSuccess(Void value) {
+                LOG.debug("Connected to {}", configuration.getHost());
+
                 String subscribeTopicName = 
configuration.getSubscribeTopicName();
                 subscribeTopicName = subscribeTopicName != null ? 
subscribeTopicName.trim() : null;
 
@@ -178,11 +192,13 @@ public class MQTTEndpoint extends DefaultEndpoint {
             }
 
             public void onFailure(Throwable value) {
+                LOG.warn("Failed to connect to " + configuration.getHost() + " 
due " + value.getMessage());
                 promise.onFailure(value);
                 connection.disconnect(null);
                 connected = false;
             }
         });
+        LOG.info("Connecting to {} using {} seconds timeout", 
configuration.getHost(), configuration.getConnectWaitInSeconds());
         promise.await(configuration.getConnectWaitInSeconds(), 
TimeUnit.SECONDS);
     }
     
@@ -191,9 +207,39 @@ public class MQTTEndpoint extends DefaultEndpoint {
     }
  
     void publish(final String topic, final byte[] payload, final QoS qoS, 
final boolean retain, final Callback<Void> callback) throws Exception {
+        // if not connected then create a new connection to re-connect
+        boolean done = isConnected();
+        int attempt = 0;
+        TimeoutException timeout = null;
+        while (!done && attempt <= PUBLISH_MAX_RECONNECT_ATTEMPTS) {
+            attempt++;
+            try {
+                LOG.warn("#{} attempt to re-create connection to {} before 
publishing", attempt, configuration.getHost());
+                createConnection();
+                connect();
+            } catch (TimeoutException e) {
+                timeout = e;
+                LOG.debug("Timed out after {} seconds after {} attempt to 
re-create connection to {}",
+                        new Object[]{configuration.getConnectWaitInSeconds(), 
attempt, configuration.getHost()});
+            } catch (Throwable e) {
+                // other kind of error then exit asap
+                callback.onFailure(e);
+                return;
+            }
+
+            done = isConnected();
+        }
+
+        if (attempt > 3 && !isConnected()) {
+            LOG.warn("Cannot re-connect to {} after {} attempts", 
configuration.getHost(), attempt);
+            callback.onFailure(timeout);
+            return;
+        }
+
         connection.getDispatchQueue().execute(new Task() {
             @Override
             public void run() {
+                LOG.debug("Publishing to {}", configuration.getHost());
                 connection.publish(topic, payload, qoS, retain, callback);
             }
         });

http://git-wip-us.apache.org/repos/asf/camel/blob/15d7c596/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTProducerReconnectTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTProducerReconnectTest.java
 
b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTProducerReconnectTest.java
new file mode 100644
index 0000000..15281a8
--- /dev/null
+++ 
b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTProducerReconnectTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mqtt;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Message;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.junit.Test;
+
+public class MQTTProducerReconnectTest extends MQTTBaseTest {
+
+    @Test
+    public void testProduce() throws Exception {
+        MQTT mqtt = new MQTT();
+        final BlockingConnection subscribeConnection = 
mqtt.blockingConnection();
+        subscribeConnection.connect();
+        Topic topic = new Topic(TEST_TOPIC, QoS.AT_MOST_ONCE);
+        Topic[] topics = {topic};
+        subscribeConnection.subscribe(topics);
+        final CountDownLatch latch = new CountDownLatch(numberOfMessages);
+
+        Thread thread = new Thread(new Runnable() {
+            public void run() {
+                for (int i = 0; i < numberOfMessages; i++) {
+                    try {
+                        Message message = subscribeConnection.receive();
+                        message.ack();
+                        latch.countDown();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        break;
+                    }
+                }
+            }
+        });
+        thread.start();
+
+        for (int i = 0; i < numberOfMessages; i++) {
+
+            if (i == 5) {
+                LOG.info("Stopping MQTT transport");
+                brokerService.getTransportConnectorByScheme("mqtt").stop();
+
+                Thread starter = new Thread(new Runnable() {
+                    public void run() {
+                        try {
+                            Thread.sleep(3000);
+                            LOG.info("Starting MQTT transport again");
+                            
brokerService.getTransportConnectorByScheme("mqtt").start();
+                        } catch (Exception e) {
+                            // ignore
+                        }
+                    }
+                });
+                starter.start();
+            }
+
+            try {
+                template.sendBody("direct:foo", "test message " + i);
+            } catch (Exception e) {
+                // ignore
+            }
+        }
+
+        latch.await(20, TimeUnit.SECONDS);
+        assertTrue("Messages not consumed = " + latch.getCount(), 
latch.getCount() == 0);
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                
from("direct:foo").to("mqtt:boo?reconnectDelay=1000&publishTopicName=" + 
TEST_TOPIC);
+            }
+        };
+    }
+}

Reply via email to