Repository: camel Updated Branches: refs/heads/master bd226cbe8 -> c34db42c8
RabbitMQEndpoint refactoring, extracted a few helper classes to avoid RabbitMQEndpoint becoming a god class Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/48a28a2c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/48a28a2c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/48a28a2c Branch: refs/heads/master Commit: 48a28a2c301c684e5481d27975cb1572c7201df6 Parents: bd226cb Author: MiloÅ¡ MilivojeviÄ <mmilivoje...@deployinc.com> Authored: Thu Dec 24 11:01:24 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Dec 30 09:40:00 2015 +0100 ---------------------------------------------------------------------- .../RabbitMQConnectionFactorySupport.java | 64 +++++ .../component/rabbitmq/RabbitMQConsumer.java | 2 +- .../rabbitmq/RabbitMQDeclareSupport.java | 103 ++++++++ .../component/rabbitmq/RabbitMQEndpoint.java | 245 +------------------ .../rabbitmq/RabbitMQMessageConverter.java | 115 +++++++-- .../rabbitmq/RabbitMQMessagePublisher.java | 123 ++++++++++ .../rabbitmq/reply/ReplyManagerSupport.java | 22 +- .../rabbitmq/RabbitMQProducerIntTest.java | 1 - 8 files changed, 411 insertions(+), 264 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/48a28a2c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConnectionFactorySupport.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConnectionFactorySupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConnectionFactorySupport.java new file mode 100644 index 0000000..de6cc24 --- /dev/null +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConnectionFactorySupport.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.rabbitmq; + +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; + +import com.rabbitmq.client.ConnectionFactory; + +public class RabbitMQConnectionFactorySupport { + + public ConnectionFactory createFactoryFor(final RabbitMQEndpoint endpoint) { + ConnectionFactory factory = new ConnectionFactory(); + factory.setUsername(endpoint.getUsername()); + factory.setPassword(endpoint.getPassword()); + factory.setVirtualHost(endpoint.getVhost()); + factory.setHost(endpoint.getHostname()); + factory.setPort(endpoint.getPortNumber()); + if (endpoint.getClientProperties() != null) { + factory.setClientProperties(endpoint.getClientProperties()); + } + factory.setConnectionTimeout(endpoint.getConnectionTimeout()); + factory.setRequestedChannelMax(endpoint.getRequestedChannelMax()); + factory.setRequestedFrameMax(endpoint.getRequestedFrameMax()); + factory.setRequestedHeartbeat(endpoint.getRequestedHeartbeat()); + if (endpoint.getSslProtocol() != null) { + try { + if (endpoint.getSslProtocol().equals("true")) { + factory.useSslProtocol(); + } else if (endpoint.getTrustManager() == null) { + factory.useSslProtocol(endpoint.getSslProtocol()); + } else { + factory.useSslProtocol(endpoint.getSslProtocol(), endpoint.getTrustManager()); + } + } catch (NoSuchAlgorithmException | KeyManagementException e) { + throw new IllegalArgumentException("Invalid sslProtocol " + endpoint.getSslProtocol(), e); + } + } + if (endpoint.getAutomaticRecoveryEnabled() != null) { + factory.setAutomaticRecoveryEnabled(endpoint.getAutomaticRecoveryEnabled()); + } + if (endpoint.getNetworkRecoveryInterval() != null) { + factory.setNetworkRecoveryInterval(endpoint.getNetworkRecoveryInterval()); + } + if (endpoint.getTopologyRecoveryEnabled() != null) { + factory.setTopologyRecoveryEnabled(endpoint.getTopologyRecoveryEnabled()); + } + return factory; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/48a28a2c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java index cdb23f4..a71769e 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java @@ -94,7 +94,7 @@ public class RabbitMQConsumer extends DefaultConsumer { // First channel used to declare Exchange and Queue Channel channel = openChannel(); if (getEndpoint().isDeclare()) { - endpoint.declareExchangeAndQueue(channel); + getEndpoint().declareExchangeAndQueue(channel); } startConsumer(channel); // Other channels http://git-wip-us.apache.org/repos/asf/camel/blob/48a28a2c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java new file mode 100644 index 0000000..aa4df2f --- /dev/null +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.rabbitmq; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import com.rabbitmq.client.Channel; + +public class RabbitMQDeclareSupport { + + private final RabbitMQEndpoint endpoint; + + RabbitMQDeclareSupport(final RabbitMQEndpoint endpoint) { + this.endpoint = endpoint; + } + + public void declareAndBindExchangesAndQueuesUsing(final Channel channel) throws IOException { + declareAndBindDeadLetterExchangeWithQueue(channel); + declareAndBindExchangeWithQueue(channel); + } + + private void declareAndBindDeadLetterExchangeWithQueue(final Channel channel) throws IOException { + if (endpoint.getDeadLetterExchange() != null) { + // TODO Do we need to setup the args for the DeadLetter? + declareExchange(channel, endpoint.getDeadLetterExchange(), endpoint.getDeadLetterExchangeType(), Collections.<String, Object>emptyMap()); + declareAndBindQueue(channel, endpoint.getDeadLetterQueue(), endpoint.getDeadLetterExchange(), endpoint.getDeadLetterRoutingKey(), null); + } + } + + private void declareAndBindExchangeWithQueue(final Channel channel) throws IOException { + declareExchange(channel, endpoint.getExchangeName(), endpoint.getExchangeType(), resolvedExchangeArguments()); + + if (shouldDeclareQueue()) { + // need to make sure the queueDeclare is same with the exchange declare + declareAndBindQueue(channel, endpoint.getQueue(), endpoint.getExchangeName(), endpoint.getRoutingKey(), resolvedQueueArguments()); + } + } + + private Map<String, Object> resolvedQueueArguments() { + Map<String, Object> queueArgs = new HashMap<>(); + populateQueueArgumentsFromDeadLetterExchange(queueArgs); + populateQueueArgumentsFromConfigurer(queueArgs); + return queueArgs; + } + + private Map<String, Object> populateQueueArgumentsFromDeadLetterExchange(final Map<String, Object> queueArgs) { + if (endpoint.getDeadLetterExchange() != null) { + queueArgs.put(RabbitMQConstants.RABBITMQ_DEAD_LETTER_EXCHANGE, endpoint.getDeadLetterExchange()); + queueArgs.put(RabbitMQConstants.RABBITMQ_DEAD_LETTER_ROUTING_KEY, endpoint.getDeadLetterRoutingKey()); + } + + return queueArgs; + } + + private Map<String, Object> resolvedExchangeArguments() { + Map<String, Object> exchangeArgs = new HashMap<>(); + if (endpoint.getExchangeArgsConfigurer() != null) { + endpoint.getExchangeArgsConfigurer().configurArgs(exchangeArgs); + } + return exchangeArgs; + } + + private boolean shouldDeclareQueue() { + return !endpoint.isSkipQueueDeclare() && endpoint.getQueue() != null; + } + + private void populateQueueArgumentsFromConfigurer(final Map<String, Object> queueArgs) { + if (endpoint.getQueueArgsConfigurer() != null) { + endpoint.getQueueArgsConfigurer().configurArgs(queueArgs); + } + } + + private void declareExchange(final Channel channel, final String exchange, final String exchangeType, final Map<String, Object> exchangeArgs) throws IOException { + channel.exchangeDeclare(exchange, exchangeType, endpoint.isDurable(), endpoint.isAutoDelete(), exchangeArgs); + } + + private void declareAndBindQueue(final Channel channel, final String queue, final String exchange, final String routingKey, final Map<String, Object> arguments) + throws IOException { + channel.queueDeclare(queue, endpoint.isDurable(), false, endpoint.isAutoDelete(), arguments); + channel.queueBind(queue, exchange, emptyIfNull(routingKey)); + } + + private String emptyIfNull(final String routingKey) { + return routingKey == null ? "" : routingKey; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/48a28a2c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java index 7a6e48c..41eab3f 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java @@ -16,18 +16,8 @@ */ package org.apache.camel.component.rabbitmq; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.NotSerializableException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; import java.net.URISyntaxException; -import java.security.KeyManagementException; -import java.security.NoSuchAlgorithmException; -import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -41,17 +31,12 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Envelope; -import com.rabbitmq.client.LongString; import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Message; -import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.Processor; import org.apache.camel.Producer; -import org.apache.camel.RuntimeCamelException; -import org.apache.camel.TypeConversionException; import org.apache.camel.impl.DefaultEndpoint; -import org.apache.camel.impl.DefaultMessage; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; @@ -59,14 +44,11 @@ import org.apache.camel.spi.UriPath; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * The rabbitmq component allows AMQP messages to be sent to (or consumed from) a RabbitMQ broker. - */ @UriEndpoint(scheme = "rabbitmq", title = "RabbitMQ", syntax = "rabbitmq:hostname:portNumber/exchangeName", consumerClass = RabbitMQConsumer.class, label = "messaging") public class RabbitMQEndpoint extends DefaultEndpoint { - private static final Logger LOG = LoggerFactory.getLogger(RabbitMQEndpoint.class); // header to indicate that the message body needs to be de-serialized - private static final String SERIALIZE_HEADER = "CamelSerialize"; + public static final String SERIALIZE_HEADER = "CamelSerialize"; + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQEndpoint.class); @UriPath @Metadata(required = "true") private String hostname; @@ -172,7 +154,8 @@ public class RabbitMQEndpoint extends DefaultEndpoint { private String replyTo; private final RabbitMQMessageConverter messageConverter = new RabbitMQMessageConverter(); - + private final RabbitMQConnectionFactorySupport factoryCreator = new RabbitMQConnectionFactorySupport(); + private final RabbitMQDeclareSupport declareSupport = new RabbitMQDeclareSupport(this); public RabbitMQEndpoint() { } @@ -188,7 +171,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint { public Exchange createRabbitExchange(Envelope envelope, AMQP.BasicProperties properties, byte[] body) { Exchange exchange = super.createExchange(); - setRabbitExchange(exchange, envelope, properties, body, false); + messageConverter.populateRabbitExchange(exchange, envelope, properties, body, false); return exchange; } @@ -199,132 +182,11 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return messageConverter; } - public void setRabbitExchange(Exchange camelExchange, Envelope envelope, AMQP.BasicProperties properties, byte[] body, boolean out) { - Message message; - if (out) { - // use OUT message - message = camelExchange.getOut(); - } else { - if (camelExchange.getIn() != null) { - // Use the existing message so we keep the headers - message = camelExchange.getIn(); - } else { - message = new DefaultMessage(); - camelExchange.setIn(message); - } - } - - if (envelope != null) { - message.setHeader(RabbitMQConstants.ROUTING_KEY, envelope.getRoutingKey()); - message.setHeader(RabbitMQConstants.EXCHANGE_NAME, envelope.getExchange()); - message.setHeader(RabbitMQConstants.DELIVERY_TAG, envelope.getDeliveryTag()); - } - - Map<String, Object> headers = properties.getHeaders(); - if (headers != null) { - for (Map.Entry<String, Object> entry : headers.entrySet()) { - // Convert LongStrings to String. - if (entry.getValue() instanceof LongString) { - message.setHeader(entry.getKey(), entry.getValue().toString()); - } else { - message.setHeader(entry.getKey(), entry.getValue()); - } - } - } - - if (hasSerializeHeader(properties)) { - Object messageBody = null; - try (InputStream b = new ByteArrayInputStream(body); - ObjectInputStream o = new ObjectInputStream(b);) { - messageBody = o.readObject(); - } catch (IOException | ClassNotFoundException e) { - LOG.warn("Could not deserialize the object"); - } - if (messageBody instanceof Throwable) { - LOG.debug("Reply was an Exception. Setting the Exception on the Exchange"); - camelExchange.setException((Throwable) messageBody); - } else { - message.setBody(messageBody); - } - } else { - // Set the body as a byte[] and let the type converter deal with it - message.setBody(body); - } - - } - - private boolean hasSerializeHeader(AMQP.BasicProperties properties) { - if (properties == null || properties.getHeaders() == null) { - return false; - } - if (properties.getHeaders().containsKey(SERIALIZE_HEADER) && properties.getHeaders().get(SERIALIZE_HEADER).equals(true)) { - return true; - } - return false; - } - /** * Sends the body that is on the exchange */ public void publishExchangeToChannel(Exchange camelExchange, Channel channel, String routingKey) throws IOException { - Message msg; - if (camelExchange.hasOut()) { - msg = camelExchange.getOut(); - } else { - msg = camelExchange.getIn(); - } - - // Remove the SERIALIZE_HEADER in case it was previously set - if (msg.getHeaders() != null && msg.getHeaders().containsKey(SERIALIZE_HEADER)) { - LOG.debug("Removing the {} header", SERIALIZE_HEADER); - msg.getHeaders().remove(SERIALIZE_HEADER); - } - - AMQP.BasicProperties properties; - byte[] body; - try { - // To maintain backwards compatibility try the TypeConverter (The DefaultTypeConverter seems to only work on Strings) - body = camelExchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class, camelExchange, msg.getBody()); - - properties = getMessageConverter().buildProperties(camelExchange).build(); - } catch (NoTypeConversionAvailableException | TypeConversionException e) { - if (msg.getBody() instanceof Serializable) { - // Add the header so the reply processor knows to de-serialize it - msg.getHeaders().put(SERIALIZE_HEADER, true); - - properties = getMessageConverter().buildProperties(camelExchange).build(); - - try (ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b);) { - o.writeObject(msg.getBody()); - body = b.toByteArray(); - } catch (NotSerializableException nse) { - LOG.warn("Can not send object " + msg.getBody().getClass() + " via RabbitMQ because it contains non-serializable objects."); - throw new RuntimeCamelException(e); - } - } else if (msg.getBody() == null) { - properties = getMessageConverter().buildProperties(camelExchange).build(); - body = null; - } else { - LOG.warn("Could not convert {} to byte[]", msg.getBody()); - throw new RuntimeCamelException(e); - } - } - String rabbitExchange = getExchangeName(msg); - - Boolean mandatory = camelExchange.getIn().getHeader(RabbitMQConstants.MANDATORY, isMandatory(), Boolean.class); - Boolean immediate = camelExchange.getIn().getHeader(RabbitMQConstants.IMMEDIATE, isImmediate(), Boolean.class); - - LOG.debug("Sending message to exchange: {} with CorrelationId = {}", rabbitExchange, properties.getCorrelationId()); - - if (isPublisherAcknowledgements()) { - channel.confirmSelect(); - } - - channel.basicPublish(rabbitExchange, routingKey, mandatory, immediate, properties, body); - - if (isPublisherAcknowledgements()) { - waitForConfirmationFor(channel, camelExchange); - } + new RabbitMQMessagePublisher(camelExchange, channel, routingKey, this).publish(); } /** @@ -337,16 +199,6 @@ public class RabbitMQEndpoint extends DefaultEndpoint { exchangeName = getExchangeName(); } return exchangeName; - } - - private void waitForConfirmationFor(final Channel channel, final Exchange camelExchange) throws IOException { - try { - LOG.debug("Waiting for publisher acknowledgements for {}ms", getPublisherAcknowledgementsTimeout()); - channel.waitForConfirmsOrDie(getPublisherAcknowledgementsTimeout()); - } catch (InterruptedException | TimeoutException e) { - LOG.warn("Acknowledgement error for {}", camelExchange); - throw new RuntimeCamelException(e); - } } @Override @@ -368,88 +220,12 @@ public class RabbitMQEndpoint extends DefaultEndpoint { * If needed, declare Exchange, declare Queue and bind them with Routing Key */ public void declareExchangeAndQueue(Channel channel) throws IOException { - Map<String, Object> queueArgs = new HashMap<String, Object>(); - Map<String, Object> exchangeArgs = new HashMap<String, Object>(); - - if (deadLetterExchange != null) { - queueArgs.put(RabbitMQConstants.RABBITMQ_DEAD_LETTER_EXCHANGE, getDeadLetterExchange()); - queueArgs.put(RabbitMQConstants.RABBITMQ_DEAD_LETTER_ROUTING_KEY, getDeadLetterRoutingKey()); - // TODO Do we need to setup the args for the DeadLetter? - channel.exchangeDeclare(getDeadLetterExchange(), - getDeadLetterExchangeType(), - isDurable(), - isAutoDelete(), - new HashMap<String, Object>()); - channel.queueDeclare(getDeadLetterQueue(), isDurable(), false, - isAutoDelete(), null); - channel.queueBind( - getDeadLetterQueue(), - getDeadLetterExchange(), - getDeadLetterRoutingKey() == null ? "" : getDeadLetterRoutingKey()); - } - - if (getQueueArgsConfigurer() != null) { - getQueueArgsConfigurer().configurArgs(queueArgs); - } - if (getExchangeArgsConfigurer() != null) { - getExchangeArgsConfigurer().configurArgs(exchangeArgs); - } - - channel.exchangeDeclare(getExchangeName(), - getExchangeType(), - isDurable(), - isAutoDelete(), exchangeArgs); - if (!isSkipQueueDeclare() && getQueue() != null) { - // need to make sure the queueDeclare is same with the exchange declare - channel.queueDeclare(getQueue(), isDurable(), false, - isAutoDelete(), queueArgs); - channel.queueBind( - getQueue(), - getExchangeName(), - getRoutingKey() == null ? "" : getRoutingKey()); - } + declareSupport.declareAndBindExchangesAndQueuesUsing(channel); } private ConnectionFactory getOrCreateConnectionFactory() { if (connectionFactory == null) { - ConnectionFactory factory = new ConnectionFactory(); - factory.setUsername(getUsername()); - factory.setPassword(getPassword()); - factory.setVirtualHost(getVhost()); - factory.setHost(getHostname()); - factory.setPort(getPortNumber()); - if (getClientProperties() != null) { - factory.setClientProperties(getClientProperties()); - } - factory.setConnectionTimeout(getConnectionTimeout()); - factory.setRequestedChannelMax(getRequestedChannelMax()); - factory.setRequestedFrameMax(getRequestedFrameMax()); - factory.setRequestedHeartbeat(getRequestedHeartbeat()); - if (getSslProtocol() != null) { - try { - if (getSslProtocol().equals("true")) { - factory.useSslProtocol(); - } else if (getTrustManager() == null) { - factory.useSslProtocol(getSslProtocol()); - } else { - factory.useSslProtocol(getSslProtocol(), getTrustManager()); - } - } catch (NoSuchAlgorithmException e) { - throw new IllegalArgumentException("Invalid sslProtocol " + sslProtocol, e); - } catch (KeyManagementException e) { - throw new IllegalArgumentException("Invalid sslProtocol " + sslProtocol, e); - } - } - if (getAutomaticRecoveryEnabled() != null) { - factory.setAutomaticRecoveryEnabled(getAutomaticRecoveryEnabled()); - } - if (getNetworkRecoveryInterval() != null) { - factory.setNetworkRecoveryInterval(getNetworkRecoveryInterval()); - } - if (getTopologyRecoveryEnabled() != null) { - factory.setTopologyRecoveryEnabled(getTopologyRecoveryEnabled()); - } - connectionFactory = factory; + connectionFactory = factoryCreator.createFactoryFor(this); } return connectionFactory; } @@ -847,7 +623,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint { public void setDeclare(boolean declare) { this.declare = declare; } - + public String getDeadLetterExchange() { return deadLetterExchange; } @@ -958,7 +734,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint { public ArgsConfigurer getExchangeArgsConfigurer() { return exchangeArgsConfigurer; } - + /** * Set the configurer for setting the exchange args in Channel.exchangeDeclare */ @@ -1041,4 +817,5 @@ public class RabbitMQEndpoint extends DefaultEndpoint { public String getReplyTo() { return replyTo; } + } http://git-wip-us.apache.org/repos/asf/camel/blob/48a28a2c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java index 66299f6..1873674 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java @@ -16,17 +16,20 @@ */ package org.apache.camel.component.rabbitmq; -import java.math.BigDecimal; -import java.sql.Timestamp; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; import java.util.Date; import java.util.HashMap; import java.util.Map; import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Envelope; import com.rabbitmq.client.LongString; - import org.apache.camel.Exchange; import org.apache.camel.Message; +import org.apache.camel.impl.DefaultMessage; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,7 +156,7 @@ public class RabbitMQMessageConverter { } final Map<String, Object> headers = msg.getHeaders(); - Map<String, Object> filteredHeaders = new HashMap<String, Object>(); + Map<String, Object> filteredHeaders = new HashMap<>(); // TODO: Add support for a HeaderFilterStrategy. See: org.apache.camel.component.jms.JmsBinding#shouldOutputHeader for (Map.Entry<String, Object> header : headers.entrySet()) { @@ -166,9 +169,7 @@ public class RabbitMQMessageConverter { LOG.debug("Ignoring header: {} with null value", header.getKey()); } else { LOG.debug("Ignoring header: {} of class: {} with value: {}", - new Object[] { - header.getKey(), ObjectHelper.classCanonicalName(header.getValue()), header.getValue() - }); + header.getKey(), ObjectHelper.classCanonicalName(header.getValue()), header.getValue()); } } } @@ -191,8 +192,6 @@ public class RabbitMQMessageConverter { private Object getValidRabbitMQHeaderValue(Object headerValue) { if (headerValue instanceof String) { return headerValue; - } else if (headerValue instanceof BigDecimal) { - return headerValue; } else if (headerValue instanceof Number) { return headerValue; } else if (headerValue instanceof Boolean) { @@ -203,18 +202,96 @@ public class RabbitMQMessageConverter { return headerValue; } else if (headerValue instanceof LongString) { return headerValue; - } else if (headerValue instanceof Timestamp) { - return headerValue; - } else if (headerValue instanceof Byte) { - return headerValue; - } else if (headerValue instanceof Double) { - return headerValue; - } else if (headerValue instanceof Float) { - return headerValue; - } else if (headerValue instanceof Long) { - return headerValue; } return null; } + + public void populateRabbitExchange(Exchange camelExchange, Envelope envelope, AMQP.BasicProperties properties, byte[] body, final boolean out) { + Message message = resolveMessageFrom(camelExchange, out); + populateMessageHeaders(message, envelope, properties); + populateMessageBody(message, camelExchange, properties, body); + } + + private Message resolveMessageFrom(final Exchange camelExchange, final boolean out) { + Message message; + if (out) { + // use OUT message + message = camelExchange.getOut(); + } else { + if (camelExchange.getIn() != null) { + // Use the existing message so we keep the headers + message = camelExchange.getIn(); + } else { + message = new DefaultMessage(); + camelExchange.setIn(message); + } + } + return message; + } + + private void populateMessageHeaders(final Message message, final Envelope envelope, final AMQP.BasicProperties properties) { + populateRoutingInfoHeaders(message, envelope); + populateMessageHeadersFromRabbitMQHeaders(message, properties); + } + + private void populateRoutingInfoHeaders(final Message message, final Envelope envelope) { + if (envelope != null) { + message.setHeader(RabbitMQConstants.ROUTING_KEY, envelope.getRoutingKey()); + message.setHeader(RabbitMQConstants.EXCHANGE_NAME, envelope.getExchange()); + message.setHeader(RabbitMQConstants.DELIVERY_TAG, envelope.getDeliveryTag()); + } + } + + private void populateMessageHeadersFromRabbitMQHeaders(final Message message, final AMQP.BasicProperties properties) { + Map<String, Object> headers = properties.getHeaders(); + if (headers != null) { + for (Map.Entry<String, Object> entry : headers.entrySet()) { + // Convert LongStrings to String. + if (entry.getValue() instanceof LongString) { + message.setHeader(entry.getKey(), entry.getValue().toString()); + } else { + message.setHeader(entry.getKey(), entry.getValue()); + } + } + } + } + + private void populateMessageBody(final Message message, final Exchange camelExchange, final AMQP.BasicProperties properties, final byte[] body) { + if (hasSerializeHeader(properties)) { + deserializeBody(camelExchange, message, body); + } else { + // Set the body as a byte[] and let the type converter deal with it + message.setBody(body); + } + } + + private void deserializeBody(final Exchange camelExchange, final Message message, final byte[] body) { + Object messageBody = null; + try (InputStream b = new ByteArrayInputStream(body); + ObjectInputStream o = new ObjectInputStream(b)) { + messageBody = o.readObject(); + } catch (IOException | ClassNotFoundException e) { + LOG.warn("Could not deserialize the object"); + camelExchange.setException(e); + } + if (messageBody instanceof Throwable) { + LOG.debug("Reply was an Exception. Setting the Exception on the Exchange"); + camelExchange.setException((Throwable) messageBody); + } else { + message.setBody(messageBody); + } + } + + private boolean hasSerializeHeader(AMQP.BasicProperties properties) { + return hasHeaders(properties) && Boolean.TRUE.equals(isSerializeHeaderEnabled(properties)); + } + + private boolean hasHeaders(final AMQP.BasicProperties properties) { + return properties != null && properties.getHeaders() != null; + } + + private Object isSerializeHeaderEnabled(final AMQP.BasicProperties properties) { + return properties.getHeaders().get(RabbitMQEndpoint.SERIALIZE_HEADER); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/48a28a2c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java new file mode 100644 index 0000000..6f50ec4 --- /dev/null +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.rabbitmq; + +import java.io.*; +import java.util.concurrent.TimeoutException; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import org.apache.camel.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A method object for publishing to RabbitMQ + */ +public class RabbitMQMessagePublisher { + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQMessagePublisher.class); + private final Exchange camelExchange; + private final Channel channel; + private final String routingKey; + private final RabbitMQEndpoint endpoint; + private final Message message; + + public RabbitMQMessagePublisher(final Exchange camelExchange, final Channel channel, final String routingKey, final RabbitMQEndpoint endpoint) { + this.camelExchange = camelExchange; + this.channel = channel; + this.routingKey = routingKey; + this.endpoint = endpoint; + this.message = resolveMessageFrom(camelExchange); + } + + private Message resolveMessageFrom(final Exchange camelExchange) { + Message message = camelExchange.hasOut() ? camelExchange.getOut() : camelExchange.getIn(); + + // Remove the SERIALIZE_HEADER in case it was previously set + if (message.getHeaders() != null && message.getHeaders().containsKey(RabbitMQEndpoint.SERIALIZE_HEADER)) { + LOG.debug("Removing the {} header", RabbitMQEndpoint.SERIALIZE_HEADER); + message.getHeaders().remove(RabbitMQEndpoint.SERIALIZE_HEADER); + } + + return message; + } + + public void publish() throws IOException { + AMQP.BasicProperties properties; + byte[] body; + try { + // To maintain backwards compatibility try the TypeConverter (The DefaultTypeConverter seems to only work on Strings) + body = camelExchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class, camelExchange, message.getBody()); + + properties = endpoint.getMessageConverter().buildProperties(camelExchange).build(); + } catch (NoTypeConversionAvailableException | TypeConversionException e) { + if (message.getBody() instanceof Serializable) { + // Add the header so the reply processor knows to de-serialize it + message.getHeaders().put(RabbitMQEndpoint.SERIALIZE_HEADER, true); + properties = endpoint.getMessageConverter().buildProperties(camelExchange).build(); + body = serializeBodyFrom(message); + } else if (message.getBody() == null) { + properties = endpoint.getMessageConverter().buildProperties(camelExchange).build(); + body = null; + } else { + LOG.warn("Could not convert {} to byte[]", message.getBody()); + throw new RuntimeCamelException(e); + } + } + + publishToRabbitMQ(properties, body); + } + + private void publishToRabbitMQ(final AMQP.BasicProperties properties, final byte[] body) throws IOException { + String rabbitExchange = endpoint.getExchangeName(message); + + Boolean mandatory = camelExchange.getIn().getHeader(RabbitMQConstants.MANDATORY, endpoint.isMandatory(), Boolean.class); + Boolean immediate = camelExchange.getIn().getHeader(RabbitMQConstants.IMMEDIATE, endpoint.isImmediate(), Boolean.class); + + LOG.debug("Sending message to exchange: {} with CorrelationId = {}", rabbitExchange, properties.getCorrelationId()); + + if (endpoint.isPublisherAcknowledgements()) { + channel.confirmSelect(); + } + + channel.basicPublish(rabbitExchange, routingKey, mandatory, immediate, properties, body); + + if (endpoint.isPublisherAcknowledgements()) { + waitForConfirmation(); + } + } + + private void waitForConfirmation() throws IOException { + try { + LOG.debug("Waiting for publisher acknowledgements for {}ms", endpoint.getPublisherAcknowledgementsTimeout()); + channel.waitForConfirmsOrDie(endpoint.getPublisherAcknowledgementsTimeout()); + } catch (InterruptedException | TimeoutException e) { + LOG.warn("Acknowledgement error for {}", camelExchange); + throw new RuntimeCamelException(e); + } + } + + private byte[] serializeBodyFrom(final Message msg) throws IOException { + try (ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b)) { + o.writeObject(msg.getBody()); + return b.toByteArray(); + } catch (NotSerializableException nse) { + LOG.warn("Can not send object " + msg.getBody().getClass() + " via RabbitMQ because it contains non-serializable objects."); + throw new RuntimeCamelException(nse); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/48a28a2c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java index 52ccc90..b6dacfa 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java @@ -29,6 +29,7 @@ import org.apache.camel.Exchange; import org.apache.camel.ExchangeTimedOutException; import org.apache.camel.component.rabbitmq.RabbitMQConstants; import org.apache.camel.component.rabbitmq.RabbitMQEndpoint; +import org.apache.camel.component.rabbitmq.RabbitMQMessageConverter; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ObjectHelper; @@ -37,19 +38,21 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public abstract class ReplyManagerSupport extends ServiceSupport implements ReplyManager { + private static final int CLOSE_TIMEOUT = 30 * 1000; + protected final Logger log = LoggerFactory.getLogger(ReplyManagerSupport.class); protected final CamelContext camelContext; protected final CountDownLatch replyToLatch = new CountDownLatch(1); protected final long replyToTimeout = 1000; - + protected ScheduledExecutorService executorService; protected RabbitMQEndpoint endpoint; protected String replyTo; - protected Connection listenerContainer; + protected Connection listenerContainer; protected CorrelationTimeoutMap correlation; - - private int closeTimeout = 30 * 1000; + + private final RabbitMQMessageConverter messageConverter = new RabbitMQMessageConverter(); public ReplyManagerSupport(CamelContext camelContext) { this.camelContext = camelContext; @@ -133,14 +136,15 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl if (log.isWarnEnabled()) { log.warn("Timeout occurred after {} millis waiting for reply message with correlationID [{}] on destination {}." + " Setting ExchangeTimedOutException on {} and continue routing.", - new Object[]{holder.getRequestTimeout(), holder.getCorrelationId(), replyTo, ExchangeHelper.logIds(exchange)}); + holder.getRequestTimeout(), holder.getCorrelationId(), replyTo, ExchangeHelper.logIds(exchange)); } // no response, so lets set a timed out exception String msg = "reply message with correlationID: " + holder.getCorrelationId() + " not received on destination: " + replyTo; exchange.setException(new ExchangeTimedOutException(exchange, holder.getRequestTimeout(), msg)); } else { - endpoint.setRabbitExchange(exchange, null, holder.getProperties(), holder.getMessage(), true); + + messageConverter.populateRabbitExchange(exchange, null, holder.getProperties(), holder.getMessage(), true); // restore correlation id in case the remote server messed with it if (holder.getOriginalCorrelationId() != null) { @@ -198,7 +202,7 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl if (answer != null) { if (log.isTraceEnabled()) { log.trace("Early reply with correlationID [{}] has been matched after {} attempts and can be processed using handler: {}", - new Object[]{correlationID, counter, answer}); + correlationID, counter, answer); } } } @@ -229,8 +233,8 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl ServiceHelper.stopService(correlation); if (listenerContainer != null) { - log.debug("Closing connection: {} with timeout: {} ms.", listenerContainer, closeTimeout); - listenerContainer.close(closeTimeout); + log.debug("Closing connection: {} with timeout: {} ms.", listenerContainer, CLOSE_TIMEOUT); + listenerContainer.close(CLOSE_TIMEOUT); listenerContainer = null; } http://git-wip-us.apache.org/repos/asf/camel/blob/48a28a2c/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java index 239fb36..5f859ad 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java @@ -59,7 +59,6 @@ public class RabbitMQProducerIntTest extends CamelTestSupport { protected RouteBuilder createRouteBuilder() throws Exception { context().setTracing(true); return new RouteBuilder() { - @Override public void configure() throws Exception { from("direct:start").to(BASIC_URI);