Polished
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/04f3d11c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/04f3d11c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/04f3d11c Branch: refs/heads/master Commit: 04f3d11cabc4b06d392da0a9d1dfec8112802317 Parents: bd3251c Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Feb 20 09:39:47 2016 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Feb 20 09:39:47 2016 +0100 ---------------------------------------------------------------------- .../camel/component/rabbitmq/RabbitMQProducer.java | 14 +++----------- .../camel/component/rabbitmq/reply/ReplyHolder.java | 1 - .../camel/component/rabbitmq/reply/ReplyManager.java | 7 ++----- .../component/rabbitmq/reply/ReplyManagerSupport.java | 3 +-- .../rabbitmq/reply/TemporaryQueueReplyHandler.java | 1 - .../rabbitmq/reply/TemporaryQueueReplyManager.java | 6 +++--- ...seMessageIdAsCorrelationIdMessageSentCallback.java | 2 -- 7 files changed, 9 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/04f3d11c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java index 8c877aa..edefd6e 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java @@ -133,7 +133,6 @@ public class RabbitMQProducer extends DefaultAsyncProducer { @Override protected void doStart() throws Exception { this.executorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "CamelRabbitMQProducer[" + getEndpoint().getQueue() + "]"); - try { openConnectionAndChannelPool(); } catch (IOException e) { @@ -236,7 +235,8 @@ public class RabbitMQProducer extends DefaultAsyncProducer { basicPublish(exchange, exchangeName, key); } catch (Exception e) { replyManager.cancelCorrelationId(correlationId); - throw e; + exchange.setException(e); + return true; } // continue routing asynchronously (reply will be processed async when its received) return false; @@ -255,20 +255,12 @@ public class RabbitMQProducer extends DefaultAsyncProducer { } basicPublish(exchange, exchangeName, key); - if (callback != null) { - // we are synchronous so return true - callback.done(true); - } + callback.done(true); return true; } /** * Send a message borrowing a channel from the pool. - * - * @param exchange Target exchange - * @param routingKey Routing key - * @param properties Header properties - * @param body Body content */ private void basicPublish(final Exchange camelExchange, final String rabbitExchange, final String routingKey) throws Exception { if (channelPool == null) { http://git-wip-us.apache.org/repos/asf/camel/blob/04f3d11c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java index b6fe68d..7ed0975 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java @@ -115,7 +115,6 @@ public class ReplyHolder { /** * The message properties - * @return */ public AMQP.BasicProperties getProperties() { return properties; http://git-wip-us.apache.org/repos/asf/camel/blob/04f3d11c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java index 4b6110a..9e6386b 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java @@ -22,17 +22,16 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.component.rabbitmq.RabbitMQEndpoint; - /** * The {@link ReplyManager} is responsible for handling <a href="http://camel.apache.org/request-reply.html">request-reply</a> * over RabbitMQ. * - * @version + * @version */ public interface ReplyManager { /** - * Sets the belonging {@link org.apache.camel.component.jms.JmsEndpoint}. + * Sets the belonging {@link RabbitMQEndpoint} */ void setEndpoint(RabbitMQEndpoint endpoint); @@ -90,8 +89,6 @@ public interface ReplyManager { /** * Unregister a correlationId when you no longer need a reply - * - * @param correlationId */ void cancelCorrelationId(String correlationId); } http://git-wip-us.apache.org/repos/asf/camel/blob/04f3d11c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java index 4b4f75a..15b990a 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java @@ -230,8 +230,7 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl correlation = new CorrelationTimeoutMap(executorService, endpoint.getRequestTimeoutCheckerInterval()); ServiceHelper.startService(correlation); - // create JMS listener and start it - + // create listener and start it listenerContainer = createListenerContainer(); log.debug("Using executor {}", executorService); http://git-wip-us.apache.org/repos/asf/camel/blob/04f3d11c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java index bb0a102..542e8c0 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java @@ -23,7 +23,6 @@ import org.apache.camel.Exchange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * {@link ReplyHandler} to handle processing replies when using temporary queues. * http://git-wip-us.apache.org/repos/asf/camel/blob/04f3d11c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java index d3c2283..05cea1a 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java @@ -89,10 +89,10 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport { //Let the server pick a random name for us DeclareOk result = channel.queueDeclare(); - log.debug("Temporary queue name {}", result.getQueue()); + log.info("Using temporary queue name: {}", result.getQueue()); setReplyTo(result.getQueue()); - //TODO check for the RabbitMQConstants.EXCHANGE_NAME header + //TODO check for the RabbitMQConstants.EXCHANGE_NAME header channel.queueBind(getReplyTo(), endpoint.getExchangeName(), getReplyTo()); consumer = new RabbitConsumer(this, channel); @@ -129,7 +129,6 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { - consumer.onMessage(properties, body); } @@ -152,4 +151,5 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport { } } } + } http://git-wip-us.apache.org/repos/asf/camel/blob/04f3d11c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java index 3521bec..7e3b111 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java @@ -20,8 +20,6 @@ import java.util.UUID; import com.rabbitmq.client.Connection; - - /** * Callback to be used when using the option <tt>useMessageIDAsCorrelationID</tt>. * <p/>