CAMEL-9232: camel-paho - Create exchange correct. Fixes #635.

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d2429e7a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d2429e7a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d2429e7a

Branch: refs/heads/master
Commit: d2429e7a475a6fd41bc39ee3dcaabd8af2d19362
Parents: 9e5a51c
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sat Oct 17 10:36:15 2015 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sat Oct 17 10:41:19 2015 +0200

----------------------------------------------------------------------
 .../camel/component/paho/MqttProperties.java    | 71 --------------------
 .../camel/component/paho/PahoComponent.java     | 14 ----
 .../camel/component/paho/PahoConstants.java     |  3 +-
 .../camel/component/paho/PahoConsumer.java      | 24 +------
 .../camel/component/paho/PahoEndpoint.java      | 33 +++++----
 .../camel/component/paho/PahoMessage.java       | 47 +++++++++++++
 .../camel/component/paho/PahoComponentTest.java | 27 +++-----
 7 files changed, 78 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java
----------------------------------------------------------------------
diff --git 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java
 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java
deleted file mode 100644
index 782654a..0000000
--- 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/MqttProperties.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.paho;
-
-/**
- * MQTT message properties.
- */
-public class MqttProperties {
-
-    private String  topic;
-
-    private int     qos;
-
-    private boolean retain;
-
-    private boolean duplicate;
-
-    public MqttProperties() {}
-
-    public String getTopic() {
-        return topic;
-    }
-
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
-
-    public int getQos() {
-        return qos;
-    }
-
-    public void setQos(int qos) {
-        this.qos = qos;
-    }
-
-    public boolean isRetain() {
-        return retain;
-    }
-
-    public void setRetain(boolean retain) {
-        this.retain = retain;
-    }
-
-    public boolean isDuplicate() {
-        return duplicate;
-    }
-
-    public void setDuplicate(boolean duplicate) {
-        this.duplicate = duplicate;
-    }
-    
-    @Override
-    public String toString() {
-        return "PahoMqttProperties [topic=" + topic + ", qos=" + qos + "]";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
index ea0b627..c9cbc41 100644
--- 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
+++ 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
@@ -27,7 +27,6 @@ public class PahoComponent extends UriEndpointComponent {
     private String brokerUrl;
     private String clientId;
     private MqttConnectOptions connectOptions;
-    private String headerType;
 
     public PahoComponent() {
         super(PahoEndpoint.class);
@@ -46,9 +45,6 @@ public class PahoComponent extends UriEndpointComponent {
         if (connectOptions != null) {
             answer.setConnectOptions(connectOptions);
         }
-        if (headerType != null) {
-            answer.setHeaderType(headerType);
-        }
 
         setProperties(answer, parameters);
         return answer;
@@ -87,14 +83,4 @@ public class PahoComponent extends UriEndpointComponent {
         this.connectOptions = connectOptions;
     }
     
-    public String getHeaderType() {
-        return headerType;
-    }
-
-    /**
-     * Exchange header type.
-     */
-    public void setHeaderType(String headerType) {
-        this.headerType = headerType;
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
index 89d9994..4c8d1ff 100644
--- 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
+++ 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java
@@ -18,8 +18,9 @@ package org.apache.camel.component.paho;
 
 public final class PahoConstants {
 
-    public static final String HEASER_MQTT_PROPERTIES = "MqttProperties";
+    public static final String MQTT_TOPIC = "CamelMqttTopic";
 
+    @Deprecated
     public static final String HEADER_ORIGINAL_MESSAGE = "PahoOriginalMessage";
 
     private PahoConstants() {

http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
index 82b6c5f..75d6092 100644
--- 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
+++ 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
@@ -48,26 +48,8 @@ public class PahoConsumer extends DefaultConsumer {
 
             @Override
             public void messageArrived(String topic, MqttMessage message) 
throws Exception {
-                String headerKey;
-                Object headerValue;
-                String headerType = getEndpoint().getHeaderType();
-                if (PahoConstants.HEADER_ORIGINAL_MESSAGE.equals(headerType)) {
-                    headerKey = PahoConstants.HEADER_ORIGINAL_MESSAGE;
-                    headerValue = message;
-                } else {
-                    MqttProperties props = new MqttProperties();
-                    props.setTopic(topic);
-                    props.setQos(message.getQos());
-                    props.setRetain(message.isRetained());
-                    props.setDuplicate(message.isDuplicate());
-                    
-                    headerKey = PahoConstants.HEASER_MQTT_PROPERTIES;
-                    headerValue = props;
-                }
-
-                Exchange exchange = getEndpoint().createExchange();
-                exchange.getIn().setBody(message.getPayload());
-                exchange.getIn().setHeader(headerKey, headerValue);
+                LOG.debug("Message arrived on topic: {} -> {}", topic, 
message);
+                Exchange exchange = getEndpoint().createExchange(message, 
topic);
 
                 getAsyncProcessor().process(exchange, new AsyncCallback() {
                     @Override
@@ -79,7 +61,7 @@ public class PahoConsumer extends DefaultConsumer {
 
             @Override
             public void deliveryComplete(IMqttDeliveryToken token) {
-                LOG.debug("Delivery complete. Token: {}.", token);
+                LOG.debug("Delivery complete. Token: {}", token);
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
index 0562c9a..97dad4d 100644
--- 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
+++ 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
@@ -18,10 +18,11 @@ package org.apache.camel.component.paho;
 
 import java.util.Set;
 
-import static java.lang.System.nanoTime;
+import javax.xml.xpath.XPathConstants;
 
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
@@ -32,11 +33,13 @@ import org.apache.camel.spi.UriPath;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.lang.System.nanoTime;
 import static org.apache.camel.component.paho.PahoPersistence.MEMORY;
 
 @UriEndpoint(scheme = "paho", title = "Paho", consumerClass = 
PahoConsumer.class, label = "messaging", syntax = "paho:topic")
@@ -45,15 +48,11 @@ public class PahoEndpoint extends DefaultEndpoint {
     private static final Logger LOG = 
LoggerFactory.getLogger(PahoEndpoint.class);
 
     // Constants
-
     private static final String DEFAULT_BROKER_URL = "tcp://localhost:1883";
-
     private static final int DEFAULT_QOS = 2;
-
     private static final String DEFAULT_QOS_STRING = DEFAULT_QOS + "";
 
     // Configuration members
-
     @UriPath @Metadata(required = "true")
     private String topic;
     @UriParam
@@ -64,8 +63,6 @@ public class PahoEndpoint extends DefaultEndpoint {
     private int qos = DEFAULT_QOS;
     @UriParam(defaultValue = "MEMORY")
     private PahoPersistence persistence = MEMORY;
-    @UriParam(defaultValue = PahoConstants.HEASER_MQTT_PROPERTIES)
-    private String headerType = PahoConstants.HEASER_MQTT_PROPERTIES;
 
     // Collaboration members
     @UriParam
@@ -144,6 +141,17 @@ public class PahoEndpoint extends DefaultEndpoint {
         return new MqttConnectOptions();
     }
 
+    public Exchange createExchange(MqttMessage mqttMessage, String topic) {
+        PahoMessage paho = new PahoMessage();
+        paho.setMqttMessage(mqttMessage);
+        paho.setBody(mqttMessage.getPayload());
+        paho.setHeader(PahoConstants.MQTT_TOPIC, topic);
+
+        Exchange exchange = createExchange();
+        exchange.setIn(paho);
+        return exchange;
+    }
+
     // Configuration getters & setters
 
     public String getClientId() {
@@ -225,15 +233,4 @@ public class PahoEndpoint extends DefaultEndpoint {
         this.connectOptions = connOpts;
     }
 
-    public String getHeaderType() {
-        return headerType;
-    }
-
-    /**
-     * Exchange header type.
-     */
-    public void setHeaderType(String headerType) {
-        this.headerType = headerType;
-    }
-    
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoMessage.java
----------------------------------------------------------------------
diff --git 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoMessage.java
 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoMessage.java
new file mode 100644
index 0000000..a89e470
--- /dev/null
+++ 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoMessage.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.paho;
+
+import org.apache.camel.impl.DefaultMessage;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+public class PahoMessage extends DefaultMessage {
+
+    private transient MqttMessage mqttMessage;
+
+    public PahoMessage() {
+    }
+
+    public PahoMessage(MqttMessage mqttMessage) {
+        this.mqttMessage = mqttMessage;
+    }
+
+    public MqttMessage getMqttMessage() {
+        return mqttMessage;
+    }
+
+    public void setMqttMessage(MqttMessage mqttMessage) {
+        this.mqttMessage = mqttMessage;
+    }
+
+    @Override
+    public PahoMessage newInstance() {
+        return new PahoMessage(mqttMessage);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/d2429e7a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
 
b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
index 14e1515..8013f8f 100644
--- 
a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
+++ 
b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
@@ -37,9 +37,6 @@ public class PahoComponentTest extends CamelTestSupport {
     @EndpointInject(uri = "mock:test")
     MockEndpoint mock;
     
-    @EndpointInject(uri = "mock:test2")
-    MockEndpoint mock2;
-
     BrokerService broker;
 
     int mqttPort = AvailablePortFinder.getNextAvailable();
@@ -73,7 +70,6 @@ public class PahoComponentTest extends CamelTestSupport {
                 from("paho:queue?brokerUrl=tcp://localhost:" + 
mqttPort).to("mock:test");
 
                 
from("direct:test2").to("paho:queue?brokerUrl=tcp://localhost:" + mqttPort);
-                
from("paho:queue?headerType=PahoOriginalMessage&brokerUrl=tcp://localhost:" + 
mqttPort).to("mock:test2");
 
                 
from("paho:persistenceTest?persistence=FILE&brokerUrl=tcp://localhost:" + 
mqttPort).to("mock:persistenceTest");
 
@@ -97,9 +93,8 @@ public class PahoComponentTest extends CamelTestSupport {
                 + "?clientId=sampleClient"
                 + "&brokerUrl=tcp://localhost:" + mqttPort
                 + "&qos=2"
-                + "&persistence=file"
-                + "&headerType=testType";
-        
+                + "&persistence=file";
+
         PahoEndpoint endpoint = getMandatoryEndpoint(uri, PahoEndpoint.class);
 
         // Then
@@ -108,7 +103,6 @@ public class PahoComponentTest extends CamelTestSupport {
         assertEquals("tcp://localhost:" + mqttPort, endpoint.getBrokerUrl());
         assertEquals(2, endpoint.getQos());
         assertEquals(PahoPersistence.FILE, endpoint.getPersistence());
-        assertEquals("testType", endpoint.getHeaderType());
     }
 
     @Test
@@ -169,28 +163,29 @@ public class PahoComponentTest extends CamelTestSupport {
 
         // Then
         mock.assertIsSatisfied();
+
         Exchange exchange = mock.getExchanges().get(0);
-        MqttProperties mqttProperties = 
exchange.getIn().getHeader(PahoConstants.HEASER_MQTT_PROPERTIES,
-                MqttProperties.class);
         String payload = new String((byte[]) exchange.getIn().getBody(), 
"utf-8");
 
-        assertEquals("queue", new String(mqttProperties.getTopic()));
-        assertEquals(msg, new String(payload));
+        assertEquals("queue", 
exchange.getIn().getHeader(PahoConstants.MQTT_TOPIC));
+        assertEquals(msg, payload);
     }
 
     @Test
     public void shouldKeepOriginalMessageInHeader() throws 
InterruptedException {
         // Given
         final String msg = "msg";
-        mock2.expectedBodiesReceived(msg);
+        mock.expectedBodiesReceived(msg);
 
         // When
         template.sendBody("direct:test2", msg);
 
         // Then
-        mock2.assertIsSatisfied();
-        Exchange exchange = mock2.getExchanges().get(0);
-        MqttMessage message = 
exchange.getIn().getHeader(PahoConstants.HEADER_ORIGINAL_MESSAGE, 
MqttMessage.class);
+        mock.assertIsSatisfied();
+        Exchange exchange = mock.getExchanges().get(0);
+
+        MqttMessage message = 
exchange.getIn(PahoMessage.class).getMqttMessage();
+        assertNotNull(message);
         assertEquals(msg, new String(message.getPayload()));
     }
 

Reply via email to