Rabbit consumer should include rabbit properties in Camel Exchange message. Thanks to Daniel Williams for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1c00d97b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1c00d97b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1c00d97b Branch: refs/heads/camel-2.12.x Commit: 1c00d97bdabff5bdd0f624b8c2e451f2da4dec34 Parents: b220ce0 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Oct 18 11:34:10 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Oct 18 11:34:32 2013 +0200 ---------------------------------------------------------------------- .../component/rabbitmq/RabbitMQConsumer.java | 86 +++++++++++++++----- 1 file changed, 67 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1c00d97b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java index acc95d7..4f13045 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java @@ -29,7 +29,7 @@ import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; public class RabbitMQConsumer extends DefaultConsumer { - + ExecutorService executor; Connection conn; Channel channel; @@ -55,18 +55,21 @@ public class RabbitMQConsumer extends DefaultConsumer { channel = conn.createChannel(); log.debug("Using channel {}", channel); - channel.exchangeDeclare(endpoint.getExchangeName(), - "direct", - endpoint.isDurable(), - endpoint.isAutoDelete(), + channel.exchangeDeclare(endpoint.getExchangeName(), "direct", + endpoint.isDurable(), endpoint.isAutoDelete(), new HashMap<String, Object>()); - - // need to make sure the queueDeclare is same with the exchange declare - channel.queueDeclare(endpoint.getQueue(), endpoint.isDurable(), false, endpoint.isAutoDelete(), null); - channel.queueBind(endpoint.getQueue(), endpoint.getExchangeName(), - endpoint.getRoutingKey() == null ? "" : endpoint.getRoutingKey()); - channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(), new RabbitConsumer(this, channel)); + // need to make sure the queueDeclare is same with the exchange declare + channel.queueDeclare(endpoint.getQueue(), endpoint.isDurable(), false, + endpoint.isAutoDelete(), null); + channel.queueBind( + endpoint.getQueue(), + endpoint.getExchangeName(), + endpoint.getRoutingKey() == null ? "" : endpoint + .getRoutingKey()); + + channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(), + new RabbitConsumer(this, channel)); } @Override @@ -76,13 +79,14 @@ public class RabbitMQConsumer extends DefaultConsumer { if (conn != null) { try { conn.close(); - } catch (Exception ignored) { + } catch (Exception ignored) { // ignored } } channel = null; conn = null; + if (executor != null) { if (getEndpoint() != null && getEndpoint().getCamelContext() != null) { getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor); @@ -99,7 +103,8 @@ public class RabbitMQConsumer extends DefaultConsumer { private final Channel channel; /** - * Constructs a new instance and records its association to the passed-in channel. + * Constructs a new instance and records its association to the + * passed-in channel. * * @param channel the channel to which this consumer is attached */ @@ -110,13 +115,12 @@ public class RabbitMQConsumer extends DefaultConsumer { } @Override - public void handleDelivery(String consumerTag, - Envelope envelope, - AMQP.BasicProperties properties, - byte[] body) throws IOException { + public void handleDelivery(String consumerTag, Envelope envelope, + AMQP.BasicProperties properties, byte[] body) throws IOException { Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, body); - log.trace("Created exchange [exchange={}]", new Object[]{exchange}); + mergeAmqpProperties(exchange, properties); + log.trace("Created exchange [exchange={}]", exchange); try { consumer.getProcessor().process(exchange); @@ -131,6 +135,50 @@ public class RabbitMQConsumer extends DefaultConsumer { getExceptionHandler().handleException("Error processing exchange", exchange, e); } } + + /** + * Will take an {@link Exchange} and add header values back to the {@link Exchange#getIn()} + */ + private void mergeAmqpProperties(Exchange exchange, AMQP.BasicProperties properties) { + + if (properties.getType() != null) { + exchange.getIn().setHeader(RabbitMQConstants.TYPE, properties.getType()); + } + if (properties.getAppId() != null) { + exchange.getIn().setHeader(RabbitMQConstants.APP_ID, properties.getAppId()); + } + if (properties.getClusterId() != null) { + exchange.getIn().setHeader(RabbitMQConstants.CLUSTERID, properties.getClusterId()); + } + if (properties.getContentEncoding() != null) { + exchange.getIn().setHeader(RabbitMQConstants.CONTENT_ENCODING, properties.getContentEncoding()); + } + if (properties.getContentType() != null) { + exchange.getIn().setHeader(RabbitMQConstants.CONTENT_TYPE, properties.getContentType()); + } + if (properties.getCorrelationId() != null) { + exchange.getIn().setHeader(RabbitMQConstants.CORRELATIONID, properties.getCorrelationId()); + } + if (properties.getExpiration() != null) { + exchange.getIn().setHeader(RabbitMQConstants.EXPIRATION, properties.getExpiration()); + } + if (properties.getMessageId() != null) { + exchange.getIn().setHeader(RabbitMQConstants.MESSAGE_ID, properties.getMessageId()); + } + if (properties.getPriority() != null) { + exchange.getIn().setHeader(RabbitMQConstants.PRIORITY, properties.getPriority()); + } + if (properties.getReplyTo() != null) { + exchange.getIn().setHeader(RabbitMQConstants.REPLY_TO, properties.getReplyTo()); + } + if (properties.getTimestamp() != null) { + exchange.getIn().setHeader(RabbitMQConstants.TIMESTAMP, properties.getTimestamp()); + } + if (properties.getUserId() != null) { + exchange.getIn().setHeader(RabbitMQConstants.USERID, properties.getUserId()); + } + } + } -} +}