Fixes #635. Create exchange the correct way.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9e5a51ca Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9e5a51ca Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9e5a51ca Branch: refs/heads/master Commit: 9e5a51ca51c495e1038f7bd861372e032cd1dbc9 Parents: dbe9aa5 Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Oct 17 10:15:40 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Oct 17 10:41:19 2015 +0200 ---------------------------------------------------------------------- .../apache/camel/component/paho/PahoConsumer.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/9e5a51ca/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java index 86dee14..82b6c5f 100644 --- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java +++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java @@ -20,7 +20,6 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.builder.ExchangeBuilder; import org.apache.camel.impl.DefaultConsumer; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; @@ -44,7 +43,7 @@ public class PahoConsumer extends DefaultConsumer { getEndpoint().getClient().setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { - LOG.debug("MQTT broker connection lost:", cause); + LOG.debug("MQTT broker connection lost due " + cause.getMessage(), cause); } @Override @@ -65,15 +64,15 @@ public class PahoConsumer extends DefaultConsumer { headerKey = PahoConstants.HEASER_MQTT_PROPERTIES; headerValue = props; } - - Exchange exchange = ExchangeBuilder.anExchange(getEndpoint().getCamelContext()). - withBody(message.getPayload()). - withHeader(headerKey, headerValue). - build(); + + Exchange exchange = getEndpoint().createExchange(); + exchange.getIn().setBody(message.getPayload()); + exchange.getIn().setHeader(headerKey, headerValue); + getAsyncProcessor().process(exchange, new AsyncCallback() { @Override public void done(boolean doneSync) { - + // noop } }); } @@ -88,6 +87,7 @@ public class PahoConsumer extends DefaultConsumer { @Override protected void doStop() throws Exception { super.doStop(); + if (getEndpoint().getClient().isConnected()) { String topic = getEndpoint().getTopic(); getEndpoint().getClient().unsubscribe(topic);