Updated Branches:
  refs/heads/camel-2.12.x b220ce070 -> 1c00d97bd
  refs/heads/master 3d01c27c6 -> 4a89ad969


Rabbit consumer should include rabbit properties in Camel Exchange message. 
Thanks to Daniel Williams for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4a89ad96
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4a89ad96
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4a89ad96

Branch: refs/heads/master
Commit: 4a89ad969859d4adee09eddbb43cfd89a91efd57
Parents: 3d01c27
Author: Claus Ibsen <davscl...@apache.org>
Authored: Fri Oct 18 11:34:10 2013 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Oct 18 11:34:10 2013 +0200

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQConsumer.java    | 86 +++++++++++++++-----
 1 file changed, 67 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/4a89ad96/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index acc95d7..4f13045 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -29,7 +29,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
 
 public class RabbitMQConsumer extends DefaultConsumer {
-    
+
     ExecutorService executor;
     Connection conn;
     Channel channel;
@@ -55,18 +55,21 @@ public class RabbitMQConsumer extends DefaultConsumer {
         channel = conn.createChannel();
         log.debug("Using channel {}", channel);
 
-        channel.exchangeDeclare(endpoint.getExchangeName(),
-                "direct",
-                endpoint.isDurable(),
-                endpoint.isAutoDelete(),
+        channel.exchangeDeclare(endpoint.getExchangeName(), "direct",
+                endpoint.isDurable(), endpoint.isAutoDelete(),
                 new HashMap<String, Object>());
-        
-        // need to make sure the queueDeclare is same with the exchange declare
-        channel.queueDeclare(endpoint.getQueue(), endpoint.isDurable(), false, 
endpoint.isAutoDelete(), null);
-        channel.queueBind(endpoint.getQueue(), endpoint.getExchangeName(),
-                endpoint.getRoutingKey() == null ? "" : 
endpoint.getRoutingKey());
 
-        channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(), new 
RabbitConsumer(this, channel));
+        // need to make sure the queueDeclare is same with the exchange declare
+        channel.queueDeclare(endpoint.getQueue(), endpoint.isDurable(), false,
+                endpoint.isAutoDelete(), null);
+        channel.queueBind(
+                endpoint.getQueue(),
+                endpoint.getExchangeName(),
+                endpoint.getRoutingKey() == null ? "" : endpoint
+                        .getRoutingKey());
+
+        channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(),
+                new RabbitConsumer(this, channel));
     }
 
     @Override
@@ -76,13 +79,14 @@ public class RabbitMQConsumer extends DefaultConsumer {
         if (conn != null) {
             try {
                 conn.close();
-            } catch (Exception ignored) { 
+            } catch (Exception ignored) {
                 // ignored
             }
         }
 
         channel = null;
         conn = null;
+
         if (executor != null) {
             if (getEndpoint() != null && getEndpoint().getCamelContext() != 
null) {
                 
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
@@ -99,7 +103,8 @@ public class RabbitMQConsumer extends DefaultConsumer {
         private final Channel channel;
 
         /**
-         * Constructs a new instance and records its association to the 
passed-in channel.
+         * Constructs a new instance and records its association to the
+         * passed-in channel.
          *
          * @param channel the channel to which this consumer is attached
          */
@@ -110,13 +115,12 @@ public class RabbitMQConsumer extends DefaultConsumer {
         }
 
         @Override
-        public void handleDelivery(String consumerTag,
-                                   Envelope envelope,
-                                   AMQP.BasicProperties properties,
-                                   byte[] body) throws IOException {
+        public void handleDelivery(String consumerTag, Envelope envelope,
+                                   AMQP.BasicProperties properties, byte[] 
body) throws IOException {
 
             Exchange exchange = 
consumer.endpoint.createRabbitExchange(envelope, body);
-            log.trace("Created exchange [exchange={}]", new 
Object[]{exchange});
+            mergeAmqpProperties(exchange, properties);
+            log.trace("Created exchange [exchange={}]", exchange);
 
             try {
                 consumer.getProcessor().process(exchange);
@@ -131,6 +135,50 @@ public class RabbitMQConsumer extends DefaultConsumer {
                 getExceptionHandler().handleException("Error processing 
exchange", exchange, e);
             }
         }
+
+        /**
+         * Will take an {@link Exchange} and add header values back to the 
{@link Exchange#getIn()}
+         */
+        private void mergeAmqpProperties(Exchange exchange, 
AMQP.BasicProperties properties) {
+
+            if (properties.getType() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.TYPE, 
properties.getType());
+            }
+            if (properties.getAppId() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.APP_ID, 
properties.getAppId());
+            }
+            if (properties.getClusterId() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.CLUSTERID, 
properties.getClusterId());
+            }
+            if (properties.getContentEncoding() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.CONTENT_ENCODING, 
properties.getContentEncoding());
+            }
+            if (properties.getContentType() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.CONTENT_TYPE, 
properties.getContentType());
+            }
+            if (properties.getCorrelationId() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.CORRELATIONID, 
properties.getCorrelationId());
+            }
+            if (properties.getExpiration() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.EXPIRATION, 
properties.getExpiration());
+            }
+            if (properties.getMessageId() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.MESSAGE_ID, 
properties.getMessageId());
+            }
+            if (properties.getPriority() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.PRIORITY, 
properties.getPriority());
+            }
+            if (properties.getReplyTo() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.REPLY_TO, 
properties.getReplyTo());
+            }
+            if (properties.getTimestamp() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.TIMESTAMP, 
properties.getTimestamp());
+            }
+            if (properties.getUserId() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.USERID, 
properties.getUserId());
+            }
+        }
+
     }
-}
 
+}

Reply via email to