Repository: camel Updated Branches: refs/heads/master 14b345130 -> 83eca0b3f
CAMEL-9420 camel-paho : provide dynamic qos and rentained option Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1cdcba13 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1cdcba13 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1cdcba13 Branch: refs/heads/master Commit: 1cdcba13ca9f0e913d113c3f83a72d0d69c3f35d Parents: 48faba5 Author: gautric <gaut...@redhat.com> Authored: Tue Dec 15 12:23:37 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Dec 16 13:25:39 2015 +0100 ---------------------------------------------------------------------- .../camel/component/paho/PahoProducer.java | 28 +++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1cdcba13/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java index 2369015..99265d2 100644 --- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java +++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java @@ -27,12 +27,30 @@ public class PahoProducer extends DefaultProducer { super(endpoint); } + private int retrieveQos(Exchange exchange) { + if (exchange.getIn().getHeaders().containsKey(PahoConstants.CAMEL_PAHO_MSG_QOS)) { + return exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_MSG_QOS, Integer.class); + } else { + return getEndpoint().getQos(); + } + } + + private boolean retrieveRetained(Exchange exchange) { + if (exchange.getIn().getHeaders().containsKey(PahoConstants.CAMEL_PAHO_MSG_RETAINED)) { + return exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_MSG_RETAINED, Boolean.class); + } else { + return getEndpoint().isRetained(); + } + } + @Override public void process(Exchange exchange) throws Exception { MqttClient client = getEndpoint().getClient(); String topic = getEndpoint().getTopic(); - int qos = getEndpoint().getQos(); - boolean retained = getEndpoint().isRetained(); + + int qos = retrieveQos(exchange); + boolean retained = retrieveRetained(exchange); + byte[] payload = exchange.getIn().getBody(byte[].class); MqttMessage message = new MqttMessage(payload); @@ -41,9 +59,11 @@ public class PahoProducer extends DefaultProducer { client.publish(topic, message); } + + @Override public PahoEndpoint getEndpoint() { - return (PahoEndpoint) super.getEndpoint(); + return (PahoEndpoint)super.getEndpoint(); } -} \ No newline at end of file +}