CAMEL-7914: MQTT Endpoint disconnects on failure. Does not reconnect

Conflicts:
        
components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConfiguration.java


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e26a0864
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e26a0864
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e26a0864

Branch: refs/heads/camel-2.13.x
Commit: e26a08640cf3e37135fa58d3b3ef15fb9d0c6d44
Parents: dc7a1a9
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sun Feb 15 11:15:29 2015 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sun Feb 15 11:33:36 2015 +0100

----------------------------------------------------------------------
 .../camel/component/mqtt/MQTTEndpoint.java      | 54 ++++++++++-
 .../mqtt/MQTTProducerReconnectTest.java         | 96 ++++++++++++++++++++
 2 files changed, 146 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e26a0864/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
 
b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
index 5e167d7..a6524f4 100644
--- 
a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
+++ 
b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.mqtt;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
@@ -43,6 +44,8 @@ import org.slf4j.LoggerFactory;
 public class MQTTEndpoint extends DefaultEndpoint {
     private static final Logger LOG = 
LoggerFactory.getLogger(MQTTEndpoint.class);
 
+    private static final int PUBLISH_MAX_RECONNECT_ATTEMPTS = 3;
+
     private CallbackConnection connection;
     private final MQTTConfiguration configuration;
     private volatile boolean connected;
@@ -72,14 +75,21 @@ public class MQTTEndpoint extends DefaultEndpoint {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
+
+        createConnection();
+    }
+
+    protected void createConnection() {
         connection = configuration.callbackConnection();
 
         connection.listener(new Listener() {
             public void onConnected() {
+                connected = true;
                 LOG.info("MQTT Connection connected to {}", 
configuration.getHost());
             }
 
             public void onDisconnected() {
+                connected = false;
                 LOG.debug("MQTT Connection disconnected from {}", 
configuration.getHost());
             }
 
@@ -98,6 +108,9 @@ public class MQTTEndpoint extends DefaultEndpoint {
             }
 
             public void onFailure(Throwable value) {
+                // mark this connection as disconnected so we force re-connect
+                connected = false;
+                LOG.warn("Connection to " + configuration.getHost() + " 
failure due " + value.getMessage() + ". Forcing a disconnect to re-connect on 
next attempt.");
                 connection.disconnect(new Callback<Void>() {
                     public void onSuccess(Void value) {
                     }
@@ -108,11 +121,11 @@ public class MQTTEndpoint extends DefaultEndpoint {
                 });
             }
         });
-
-        
     }
 
     protected void doStop() throws Exception {
+        super.doStop();
+
         if (connection != null) {
             final Promise<Void> promise = new Promise<Void>();
             connection.getDispatchQueue().execute(new Task() {
@@ -131,13 +144,14 @@ public class MQTTEndpoint extends DefaultEndpoint {
             });
             promise.await(configuration.getDisconnectWaitInSeconds(), 
TimeUnit.SECONDS);
         }
-        super.doStop();
     }
-    
+
     void connect() throws Exception {
         final Promise<Object> promise = new Promise<Object>();
         connection.connect(new Callback<Void>() {
             public void onSuccess(Void value) {
+                LOG.debug("Connected to {}", configuration.getHost());
+
                 String subscribeTopicName = 
configuration.getSubscribeTopicName();
                 subscribeTopicName = subscribeTopicName != null ? 
subscribeTopicName.trim() : null;
 
@@ -163,11 +177,13 @@ public class MQTTEndpoint extends DefaultEndpoint {
             }
 
             public void onFailure(Throwable value) {
+                LOG.warn("Failed to connect to " + configuration.getHost() + " 
due " + value.getMessage());
                 promise.onFailure(value);
                 connection.disconnect(null);
                 connected = false;
             }
         });
+        LOG.info("Connecting to {} using {} seconds timeout", 
configuration.getHost(), configuration.getConnectWaitInSeconds());
         promise.await(configuration.getConnectWaitInSeconds(), 
TimeUnit.SECONDS);
     }
     
@@ -176,9 +192,39 @@ public class MQTTEndpoint extends DefaultEndpoint {
     }
  
     void publish(final String topic, final byte[] payload, final QoS qoS, 
final boolean retain, final Callback<Void> callback) throws Exception {
+        // if not connected then create a new connection to re-connect
+        boolean done = isConnected();
+        int attempt = 0;
+        TimeoutException timeout = null;
+        while (!done && attempt <= PUBLISH_MAX_RECONNECT_ATTEMPTS) {
+            attempt++;
+            try {
+                LOG.warn("#{} attempt to re-create connection to {} before 
publishing", attempt, configuration.getHost());
+                createConnection();
+                connect();
+            } catch (TimeoutException e) {
+                timeout = e;
+                LOG.debug("Timed out after {} seconds after {} attempt to 
re-create connection to {}",
+                        new Object[]{configuration.getConnectWaitInSeconds(), 
attempt, configuration.getHost()});
+            } catch (Throwable e) {
+                // other kind of error then exit asap
+                callback.onFailure(e);
+                return;
+            }
+
+            done = isConnected();
+        }
+
+        if (attempt > 3 && !isConnected()) {
+            LOG.warn("Cannot re-connect to {} after {} attempts", 
configuration.getHost(), attempt);
+            callback.onFailure(timeout);
+            return;
+        }
+
         connection.getDispatchQueue().execute(new Task() {
             @Override
             public void run() {
+                LOG.debug("Publishing to {}", configuration.getHost());
                 connection.publish(topic, payload, qoS, retain, callback);
             }
         });

http://git-wip-us.apache.org/repos/asf/camel/blob/e26a0864/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTProducerReconnectTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTProducerReconnectTest.java
 
b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTProducerReconnectTest.java
new file mode 100644
index 0000000..15281a8
--- /dev/null
+++ 
b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTProducerReconnectTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mqtt;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Message;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.junit.Test;
+
+public class MQTTProducerReconnectTest extends MQTTBaseTest {
+
+    @Test
+    public void testProduce() throws Exception {
+        MQTT mqtt = new MQTT();
+        final BlockingConnection subscribeConnection = 
mqtt.blockingConnection();
+        subscribeConnection.connect();
+        Topic topic = new Topic(TEST_TOPIC, QoS.AT_MOST_ONCE);
+        Topic[] topics = {topic};
+        subscribeConnection.subscribe(topics);
+        final CountDownLatch latch = new CountDownLatch(numberOfMessages);
+
+        Thread thread = new Thread(new Runnable() {
+            public void run() {
+                for (int i = 0; i < numberOfMessages; i++) {
+                    try {
+                        Message message = subscribeConnection.receive();
+                        message.ack();
+                        latch.countDown();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        break;
+                    }
+                }
+            }
+        });
+        thread.start();
+
+        for (int i = 0; i < numberOfMessages; i++) {
+
+            if (i == 5) {
+                LOG.info("Stopping MQTT transport");
+                brokerService.getTransportConnectorByScheme("mqtt").stop();
+
+                Thread starter = new Thread(new Runnable() {
+                    public void run() {
+                        try {
+                            Thread.sleep(3000);
+                            LOG.info("Starting MQTT transport again");
+                            
brokerService.getTransportConnectorByScheme("mqtt").start();
+                        } catch (Exception e) {
+                            // ignore
+                        }
+                    }
+                });
+                starter.start();
+            }
+
+            try {
+                template.sendBody("direct:foo", "test message " + i);
+            } catch (Exception e) {
+                // ignore
+            }
+        }
+
+        latch.await(20, TimeUnit.SECONDS);
+        assertTrue("Messages not consumed = " + latch.getCount(), 
latch.getCount() == 0);
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                
from("direct:foo").to("mqtt:boo?reconnectDelay=1000&publishTopicName=" + 
TEST_TOPIC);
+            }
+        };
+    }
+}

Reply via email to