CAMEL-7922 start the MQTT connection when consumer or producer is started
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/feea16ad Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/feea16ad Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/feea16ad Branch: refs/heads/master Commit: feea16ada605803f81945db55b2344f2b000f0b5 Parents: 73f69d8 Author: Willem Jiang <willem.ji...@gmail.com> Authored: Fri Oct 17 14:33:39 2014 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Fri Oct 17 14:33:39 2014 +0800 ---------------------------------------------------------------------- .../camel/component/mqtt/MQTTConsumer.java | 3 ++ .../camel/component/mqtt/MQTTEndpoint.java | 47 +++++++++++++------- .../camel/component/mqtt/MQTTProducer.java | 9 ++++ 3 files changed, 42 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/feea16ad/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConsumer.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConsumer.java index 934e419..449a767 100644 --- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConsumer.java +++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConsumer.java @@ -33,6 +33,9 @@ public class MQTTConsumer extends DefaultConsumer { protected void doStart() throws Exception { getEndpoint().addConsumer(this); + if (!getEndpoint().isConnected()) { + getEndpoint().connect(); + } super.doStart(); } http://git-wip-us.apache.org/repos/asf/camel/blob/feea16ad/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java index 651049c..07014ad 100644 --- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java +++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java @@ -44,6 +44,7 @@ public class MQTTEndpoint extends DefaultEndpoint { private CallbackConnection connection; private final MQTTConfiguration configuration; + private volatile boolean connected; private final List<MQTTConsumer> consumers = new CopyOnWriteArrayList<MQTTConsumer>(); public MQTTEndpoint(String uri, MQTTComponent component, MQTTConfiguration properties) { @@ -107,6 +108,27 @@ public class MQTTEndpoint extends DefaultEndpoint { } }); + + } + + protected void doStop() throws Exception { + if (connection != null) { + final Promise<Void> promise = new Promise<Void>(); + connection.disconnect(new Callback<Void>() { + public void onSuccess(Void value) { + promise.onSuccess(value); + } + + public void onFailure(Throwable value) { + promise.onFailure(value); + } + }); + promise.await(configuration.getDisconnectWaitInSeconds(), TimeUnit.SECONDS); + } + super.doStop(); + } + + void connect() throws Exception { final Promise<Object> promise = new Promise<Object>(); connection.connect(new Callback<Void>() { public void onSuccess(Void value) { @@ -118,15 +140,18 @@ public class MQTTEndpoint extends DefaultEndpoint { connection.subscribe(topics, new Callback<byte[]>() { public void onSuccess(byte[] value) { promise.onSuccess(value); + connected = true; } public void onFailure(Throwable value) { promise.onFailure(value); connection.disconnect(null); + connected = false; } }); } else { promise.onSuccess(value); + connected = true; } } @@ -134,28 +159,16 @@ public class MQTTEndpoint extends DefaultEndpoint { public void onFailure(Throwable value) { promise.onFailure(value); connection.disconnect(null); + connected = false; } }); promise.await(configuration.getConnectWaitInSeconds(), TimeUnit.SECONDS); } - - protected void doStop() throws Exception { - if (connection != null) { - final Promise<Void> promise = new Promise<Void>(); - connection.disconnect(new Callback<Void>() { - public void onSuccess(Void value) { - promise.onSuccess(value); - } - - public void onFailure(Throwable value) { - promise.onFailure(value); - } - }); - promise.await(configuration.getDisconnectWaitInSeconds(), TimeUnit.SECONDS); - } - super.doStop(); + + boolean isConnected() { + return connected; } - + void publish(String topic, byte[] payload, QoS qoS, boolean retain, Callback<Void> callback) throws Exception { connection.publish(topic, payload, qoS, retain, callback); } http://git-wip-us.apache.org/repos/asf/camel/blob/feea16ad/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTProducer.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTProducer.java index 59ff90b..751453a 100644 --- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTProducer.java +++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTProducer.java @@ -31,6 +31,15 @@ public class MQTTProducer extends DefaultAsyncProducer implements Processor { super(mqttEndpoint); this.mqttEndpoint = mqttEndpoint; } + + protected void doStart() throws Exception { + // check the mqttEndpoint connection when it is started + if (!mqttEndpoint.isConnected()) { + mqttEndpoint.connect(); + } + super.doStart(); + } + @Override public boolean process(final Exchange exchange, final AsyncCallback callback) {