Modify parsing topic in the uri, and Exchange header value is changed because 
mqtt topic is able to use in after process.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/78b7681f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/78b7681f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/78b7681f

Branch: refs/heads/master
Commit: 78b7681f3d64f3c9f8667a55f233d8dd9bdcdf13
Parents: e2ef05d
Author: Takanori Suzuki <takano...@gmail.com>
Authored: Mon Oct 12 02:20:46 2015 +0900
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sat Oct 17 10:41:18 2015 +0200

----------------------------------------------------------------------
 .../camel/component/paho/MqttProperties.java    | 71 ++++++++++++++++++++
 .../camel/component/paho/PahoComponent.java     | 15 +++++
 .../camel/component/paho/PahoConstants.java     |  2 +
 .../camel/component/paho/PahoConsumer.java      | 21 +++++-
 .../camel/component/paho/PahoEndpoint.java      | 21 +++++-
 .../camel/component/paho/PahoComponentTest.java | 51 +++++++++++++-
 6 files changed, 175 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/78b7681f/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java
----------------------------------------------------------------------
diff --git 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java
 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java
new file mode 100644
index 0000000..782654a
--- /dev/null
+++ 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.paho;
+
+/**
+ * MQTT message properties.
+ */
+public class MqttProperties {
+
+    private String  topic;
+
+    private int     qos;
+
+    private boolean retain;
+
+    private boolean duplicate;
+
+    public MqttProperties() {}
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public int getQos() {
+        return qos;
+    }
+
+    public void setQos(int qos) {
+        this.qos = qos;
+    }
+
+    public boolean isRetain() {
+        return retain;
+    }
+
+    public void setRetain(boolean retain) {
+        this.retain = retain;
+    }
+
+    public boolean isDuplicate() {
+        return duplicate;
+    }
+
+    public void setDuplicate(boolean duplicate) {
+        this.duplicate = duplicate;
+    }
+    
+    @Override
+    public String toString() {
+        return "PahoMqttProperties [topic=" + topic + ", qos=" + qos + "]";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/78b7681f/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
index 232d38e..ea0b627 100644
--- 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
+++ 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
@@ -27,6 +27,7 @@ public class PahoComponent extends UriEndpointComponent {
     private String brokerUrl;
     private String clientId;
     private MqttConnectOptions connectOptions;
+    private String headerType;
 
     public PahoComponent() {
         super(PahoEndpoint.class);
@@ -45,6 +46,9 @@ public class PahoComponent extends UriEndpointComponent {
         if (connectOptions != null) {
             answer.setConnectOptions(connectOptions);
         }
+        if (headerType != null) {
+            answer.setHeaderType(headerType);
+        }
 
         setProperties(answer, parameters);
         return answer;
@@ -82,4 +86,15 @@ public class PahoComponent extends UriEndpointComponent {
     public void setConnectOptions(MqttConnectOptions connectOptions) {
         this.connectOptions = connectOptions;
     }
+    
+    public String getHeaderType() {
+        return headerType;
+    }
+
+    /**
+     * Exchange header type.
+     */
+    public void setHeaderType(String headerType) {
+        this.headerType = headerType;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/78b7681f/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
index 35b6a49..89d9994 100644
--- 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
+++ 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.paho;
 
 public final class PahoConstants {
 
+    public static final String HEASER_MQTT_PROPERTIES = "MqttProperties";
+
     public static final String HEADER_ORIGINAL_MESSAGE = "PahoOriginalMessage";
 
     private PahoConstants() {

http://git-wip-us.apache.org/repos/asf/camel/blob/78b7681f/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
index 100644c..86dee14 100644
--- 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
+++ 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
@@ -28,8 +28,6 @@ import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static 
org.apache.camel.component.paho.PahoConstants.HEADER_ORIGINAL_MESSAGE;
-
 public class PahoConsumer extends DefaultConsumer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PahoConsumer.class);
@@ -51,9 +49,26 @@ public class PahoConsumer extends DefaultConsumer {
 
             @Override
             public void messageArrived(String topic, MqttMessage message) 
throws Exception {
+                String headerKey;
+                Object headerValue;
+                String headerType = getEndpoint().getHeaderType();
+                if (PahoConstants.HEADER_ORIGINAL_MESSAGE.equals(headerType)) {
+                    headerKey = PahoConstants.HEADER_ORIGINAL_MESSAGE;
+                    headerValue = message;
+                } else {
+                    MqttProperties props = new MqttProperties();
+                    props.setTopic(topic);
+                    props.setQos(message.getQos());
+                    props.setRetain(message.isRetained());
+                    props.setDuplicate(message.isDuplicate());
+                    
+                    headerKey = PahoConstants.HEASER_MQTT_PROPERTIES;
+                    headerValue = props;
+                }
+                
                 Exchange exchange = 
ExchangeBuilder.anExchange(getEndpoint().getCamelContext()).
                         withBody(message.getPayload()).
-                        withHeader(HEADER_ORIGINAL_MESSAGE, message).
+                        withHeader(headerKey, headerValue).
                         build();
                 getAsyncProcessor().process(exchange, new AsyncCallback() {
                     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/78b7681f/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
index 5ca943a..0562c9a 100644
--- 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
+++ 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
@@ -64,6 +64,8 @@ public class PahoEndpoint extends DefaultEndpoint {
     private int qos = DEFAULT_QOS;
     @UriParam(defaultValue = "MEMORY")
     private PahoPersistence persistence = MEMORY;
+    @UriParam(defaultValue = PahoConstants.HEASER_MQTT_PROPERTIES)
+    private String headerType = PahoConstants.HEASER_MQTT_PROPERTIES;
 
     // Collaboration members
     @UriParam
@@ -76,7 +78,12 @@ public class PahoEndpoint extends DefaultEndpoint {
     public PahoEndpoint(String uri, Component component) {
         super(uri, component);
         if (topic == null) {
-            topic = uri.substring(7);
+            int optionIndex = uri.indexOf("?");
+            if (optionIndex > 0) {
+                topic = uri.substring(7, optionIndex);
+            } else {
+                topic = uri.substring(7);
+            }
         }
     }
 
@@ -217,4 +224,16 @@ public class PahoEndpoint extends DefaultEndpoint {
     public void setConnectOptions(MqttConnectOptions connOpts) {
         this.connectOptions = connOpts;
     }
+
+    public String getHeaderType() {
+        return headerType;
+    }
+
+    /**
+     * Exchange header type.
+     */
+    public void setHeaderType(String headerType) {
+        this.headerType = headerType;
+    }
+    
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/78b7681f/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
 
b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
index 905f6fd..14e1515 100644
--- 
a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
+++ 
b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.paho;
 
+import java.io.UnsupportedEncodingException;
+
 import org.apache.activemq.broker.BrokerService;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.Exchange;
@@ -34,6 +36,9 @@ public class PahoComponentTest extends CamelTestSupport {
 
     @EndpointInject(uri = "mock:test")
     MockEndpoint mock;
+    
+    @EndpointInject(uri = "mock:test2")
+    MockEndpoint mock2;
 
     BrokerService broker;
 
@@ -65,9 +70,11 @@ public class PahoComponentTest extends CamelTestSupport {
             @Override
             public void configure() throws Exception {
                 from("direct:test").to("paho:queue?brokerUrl=tcp://localhost:" 
+ mqttPort);
-
                 from("paho:queue?brokerUrl=tcp://localhost:" + 
mqttPort).to("mock:test");
 
+                
from("direct:test2").to("paho:queue?brokerUrl=tcp://localhost:" + mqttPort);
+                
from("paho:queue?headerType=PahoOriginalMessage&brokerUrl=tcp://localhost:" + 
mqttPort).to("mock:test2");
+
                 
from("paho:persistenceTest?persistence=FILE&brokerUrl=tcp://localhost:" + 
mqttPort).to("mock:persistenceTest");
 
                 
from("direct:connectOptions").to("paho:registryConnectOptions?connectOptions=#connectOptions&brokerUrl=tcp://localhost:"
 + mqttPort);
@@ -85,6 +92,26 @@ public class PahoComponentTest extends CamelTestSupport {
     // Tests
 
     @Test
+    public void checkOptions() {
+        String uri = "paho:/test/topic"
+                + "?clientId=sampleClient"
+                + "&brokerUrl=tcp://localhost:" + mqttPort
+                + "&qos=2"
+                + "&persistence=file"
+                + "&headerType=testType";
+        
+        PahoEndpoint endpoint = getMandatoryEndpoint(uri, PahoEndpoint.class);
+
+        // Then
+        assertEquals("/test/topic", endpoint.getTopic());
+        assertEquals("sampleClient", endpoint.getClientId());
+        assertEquals("tcp://localhost:" + mqttPort, endpoint.getBrokerUrl());
+        assertEquals(2, endpoint.getQos());
+        assertEquals(PahoPersistence.FILE, endpoint.getPersistence());
+        assertEquals("testType", endpoint.getHeaderType());
+    }
+
+    @Test
     public void shouldReadMessageFromMqtt() throws InterruptedException {
         // Given
         String msg = "msg";
@@ -132,7 +159,7 @@ public class PahoComponentTest extends CamelTestSupport {
     }
 
     @Test
-    public void shouldKeepOriginalMessageInHeader() throws 
InterruptedException {
+    public void shouldKeepDefaultMessageInHeader() throws 
InterruptedException, UnsupportedEncodingException {
         // Given
         final String msg = "msg";
         mock.expectedBodiesReceived(msg);
@@ -143,6 +170,26 @@ public class PahoComponentTest extends CamelTestSupport {
         // Then
         mock.assertIsSatisfied();
         Exchange exchange = mock.getExchanges().get(0);
+        MqttProperties mqttProperties = 
exchange.getIn().getHeader(PahoConstants.HEASER_MQTT_PROPERTIES,
+                MqttProperties.class);
+        String payload = new String((byte[]) exchange.getIn().getBody(), 
"utf-8");
+
+        assertEquals("queue", new String(mqttProperties.getTopic()));
+        assertEquals(msg, new String(payload));
+    }
+
+    @Test
+    public void shouldKeepOriginalMessageInHeader() throws 
InterruptedException {
+        // Given
+        final String msg = "msg";
+        mock2.expectedBodiesReceived(msg);
+
+        // When
+        template.sendBody("direct:test2", msg);
+
+        // Then
+        mock2.assertIsSatisfied();
+        Exchange exchange = mock2.getExchanges().get(0);
         MqttMessage message = 
exchange.getIn().getHeader(PahoConstants.HEADER_ORIGINAL_MESSAGE, 
MqttMessage.class);
         assertEquals(msg, new String(message.getPayload()));
     }

Reply via email to