This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
View the commit online: https://github.com/apache/camel/commit/9e7cbbbd143e2b9a92634429b08c316a3687fa25 The following commit(s) were added to refs/heads/master by this push: new 9e7cbbb CAMEL-14173: camel-paho - Do not unsubscribe for durable topics when stopping. Thanks to Roland Beisel for reporting. Removed support for suspend/resume as it wouldnt work properly either with durable topics. 9e7cbbb is described below commit 9e7cbbbd143e2b9a92634429b08c316a3687fa25 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Nov 20 21:26:00 2019 +0100 CAMEL-14173: camel-paho - Do not unsubscribe for durable topics when stopping. Thanks to Roland Beisel for reporting. Removed support for suspend/resume as it wouldnt work properly either with durable topics. --- .../apache/camel/component/paho/PahoConsumer.java | 34 ++++++---------------- 1 file changed, 9 insertions(+), 25 deletions(-) diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java index 9d05842..413a476 100644 --- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java +++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java @@ -20,7 +20,6 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.SuspendableService; import org.apache.camel.support.DefaultConsumer; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; @@ -29,7 +28,7 @@ import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; -public class PahoConsumer extends DefaultConsumer implements SuspendableService { +public class PahoConsumer extends DefaultConsumer { private volatile MqttClient client; private volatile String clientId; @@ -114,35 +113,20 @@ public class PahoConsumer extends DefaultConsumer implements SuspendableService if (stopClient && client != null && client.isConnected()) { String topic = getEndpoint().getTopic(); - log.debug("Un-unsubscribing client: {} from topic: {}", clientId, topic); - client.unsubscribe(topic); - log.debug("Connecting client: {} from broker: {}", clientId, getEndpoint().getConfiguration().getBrokerUrl()); + // only unsubscribe if we are not durable + if (getEndpoint().getConfiguration().isCleanSession()) { + log.debug("Unsubscribing client: {} from topic: {}", clientId, topic); + client.unsubscribe(topic); + } else { + log.debug("Client: {} is durable so will not unsubscribe from topic: {}", clientId, topic); + } + log.debug("Disconnecting client: {} from broker: {}", clientId, getEndpoint().getConfiguration().getBrokerUrl()); client.disconnect(); client = null; } } @Override - protected void doSuspend() throws Exception { - super.doSuspend(); - if (client != null) { - String topic = getEndpoint().getTopic(); - log.debug("Un-unsubscribing client: {} from topic: {}", clientId, topic); - client.unsubscribe(topic); - } - } - - @Override - protected void doResume() throws Exception { - super.doResume(); - if (client != null) { - String topic = getEndpoint().getTopic(); - log.debug("Subscribing client: {} to topic: {}", clientId, topic); - client.subscribe(topic, getEndpoint().getConfiguration().getQos()); - } - } - - @Override public PahoEndpoint getEndpoint() { return (PahoEndpoint) super.getEndpoint(); }