CAMEL-8742 - Reconect when connections are closed. * Cancel reply manager when sending a message in the producer fails * Change the SuspendResume test to use a different queue since it conflicts with other queue settings
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f3eae07c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f3eae07c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f3eae07c Branch: refs/heads/master Commit: f3eae07c562708499e6b9240d8ad21a4ed640013 Parents: 034b73c Author: Brad Reitmeyer <brrei...@cisco.com> Authored: Tue Jan 26 16:39:12 2016 -0600 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Jan 28 09:21:20 2016 +0100 ---------------------------------------------------------------------- .../component/rabbitmq/RabbitConsumer.java | 293 +++++++++++++++++++ .../component/rabbitmq/RabbitMQConsumer.java | 184 +++--------- .../component/rabbitmq/RabbitMQEndpoint.java | 5 +- .../component/rabbitmq/RabbitMQProducer.java | 64 +++- .../component/rabbitmq/reply/ReplyManager.java | 7 + .../rabbitmq/reply/ReplyManagerSupport.java | 9 + .../rabbitmq/RabbitMQConsumerTest.java | 2 + .../rabbitmq/RabbitMQReConnectionIntTest.java | 3 +- .../rabbitmq/RabbitMQSupendResumeIntTest.java | 4 +- 9 files changed, 404 insertions(+), 167 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/f3eae07c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java new file mode 100644 index 0000000..a03e7f8 --- /dev/null +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java @@ -0,0 +1,293 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.rabbitmq; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.Consumer; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.ShutdownSignalException; + +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Message; +import org.apache.camel.RuntimeCamelException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class RabbitConsumer implements com.rabbitmq.client.Consumer { + private final Logger log = LoggerFactory.getLogger(getClass()); + private final RabbitMQConsumer consumer; + private Channel channel; + private String tag; + /** Consumer tag for this consumer. */ + private volatile String consumerTag; + private volatile boolean stopping; + + /** + * Constructs a new instance and records its association to the passed-in + * channel. + * + * @param channel + * the channel to which this consumer is attached + */ + public RabbitConsumer(RabbitMQConsumer consumer) { + // super(channel); + this.consumer = consumer; + try { + Connection conn = consumer.getConnection(); + this.channel = openChannel(conn); + } catch (IOException | TimeoutException e) { + log.warn("Unable to open channel for RabbitMQConsumer. Continuing and will try again", e); + } + } + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + Exchange exchange = consumer.getEndpoint().createRabbitExchange(envelope, properties, body); + consumer.getEndpoint().getMessageConverter().mergeAmqpProperties(exchange, properties); + + boolean sendReply = properties.getReplyTo() != null; + if (sendReply && !exchange.getPattern().isOutCapable()) { + log.debug("In an inOut capable route"); + exchange.setPattern(ExchangePattern.InOut); + } + + log.trace("Created exchange [exchange={}]", exchange); + long deliveryTag = envelope.getDeliveryTag(); + try { + consumer.getProcessor().process(exchange); + } catch (Exception e) { + exchange.setException(e); + } + + // obtain the message after processing + Message msg; + if (exchange.hasOut()) { + msg = exchange.getOut(); + } else { + msg = exchange.getIn(); + } + + if (exchange.getException() != null) { + consumer.getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); + } + + if (!exchange.isFailed()) { + // processing success + if (sendReply && exchange.getPattern().isOutCapable()) { + try { + consumer.getEndpoint().publishExchangeToChannel(exchange, channel, properties.getReplyTo()); + } catch (RuntimeCamelException e) { + // set the exception on the exchange so it can send the + // exception back to the producer + exchange.setException(e); + consumer.getExceptionHandler().handleException("Error processing exchange", exchange, e); + } + } + if (!consumer.getEndpoint().isAutoAck()) { + log.trace("Acknowledging receipt [delivery_tag={}]", deliveryTag); + channel.basicAck(deliveryTag, false); + } + } + // The exchange could have failed when sending the above message + if (exchange.isFailed()) { + if (consumer.getEndpoint().isTransferException() && exchange.getPattern().isOutCapable()) { + // the inOut exchange failed so put the exception in the body + // and send back + msg.setBody(exchange.getException()); + exchange.setOut(msg); + exchange.getOut().setHeader(RabbitMQConstants.CORRELATIONID, exchange.getIn().getHeader(RabbitMQConstants.CORRELATIONID)); + try { + consumer.getEndpoint().publishExchangeToChannel(exchange, channel, properties.getReplyTo()); + } catch (RuntimeCamelException e) { + consumer.getExceptionHandler().handleException("Error processing exchange", exchange, e); + } + + if (!consumer.getEndpoint().isAutoAck()) { + log.trace("Acknowledging receipt when transferring exception [delivery_tag={}]", deliveryTag); + channel.basicAck(deliveryTag, false); + } + } else { + boolean isRequeueHeaderSet = msg.getHeader(RabbitMQConstants.REQUEUE, false, boolean.class); + // processing failed, then reject and handle the exception + if (deliveryTag != 0 && !consumer.getEndpoint().isAutoAck()) { + log.trace("Rejecting receipt [delivery_tag={}] with requeue={}", deliveryTag, isRequeueHeaderSet); + if (isRequeueHeaderSet) { + channel.basicReject(deliveryTag, true); + } else { + channel.basicReject(deliveryTag, false); + } + } + } + } + } + + /** + * Bind consumer to channel + */ + public void start() throws IOException { + if (channel == null) { + throw new IOException("The RabbitMQ channel is not open"); + } + tag = channel.basicConsume(consumer.getEndpoint().getQueue(), consumer.getEndpoint().isAutoAck(), this); + } + + /** + * Unbind consumer from channel + */ + public void stop() throws IOException, TimeoutException { + stopping = true; + if (channel == null) { + return; + } + if (tag != null) { + channel.basicCancel(tag); + } + try { + channel.close(); + } catch (TimeoutException e) { + log.error("Timeout occured"); + throw e; + } + } + + /** + * Stores the most recently passed-in consumerTag - semantically, there + * should be only one. + * + * @see Consumer#handleConsumeOk + */ + public void handleConsumeOk(String consumerTag) { + this.consumerTag = consumerTag; + } + + /** + * Retrieve the consumer tag. + * + * @return the most recently notified consumer tag. + */ + public String getConsumerTag() { + return consumerTag; + } + + /** + * No-op implementation of {@link Consumer#handleCancelOk}. + * + * @param consumerTag + * the defined consumer tag (client- or server-generated) + */ + public void handleCancelOk(String consumerTag) { + // no work to do + log.debug("Recieved cancelOk signal on the rabbitMQ channel"); + } + + /** + * No-op implementation of {@link Consumer#handleCancel(String)} + * + * @param consumerTag + * the defined consumer tag (client- or server-generated) + */ + public void handleCancel(String consumerTag) throws IOException { + // no work to do + log.debug("Recieved cancel signal on the rabbitMQ channel"); + } + + /** + * No-op implementation of {@link Consumer#handleShutdownSignal}. + */ + public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { + log.info("Recieved shutdown signal on the rabbitMQ channel"); + + // Check if the consumer closed the connection or something else + if (!sig.isInitiatedByApplication()) { + // Something else closed the connection so reconnect + boolean connected = false; + while (!connected && !stopping) { + try { + reconnect(); + connected = true; + } catch (IOException | TimeoutException e) { + log.warn("Unable to obtain a RabbitMQ channel. Will try again"); + + Integer networkRecoveryInterval = consumer.getEndpoint().getNetworkRecoveryInterval(); + final long connectionRetryInterval = networkRecoveryInterval != null && networkRecoveryInterval > 0 + ? networkRecoveryInterval : 100L; + try { + Thread.sleep(connectionRetryInterval); + } catch (InterruptedException e1) { + Thread.currentThread().interrupt(); + } + } + } + } + } + + /** + * No-op implementation of {@link Consumer#handleRecoverOk}. + */ + public void handleRecoverOk(String consumerTag) { + // no work to do + log.debug("Recieved recover ok signal on the rabbitMQ channel"); + } + + /** + * If the RabbitMQ connection is good this returns without changing + * anything. If the connection is down it will attempt to reconnect + */ + public void reconnect() throws IOException, TimeoutException { + if (isChannelOpen()) { + // The connection is good, so nothing to do + return; + } + log.info("Attempting to open a new rabbitMQ channel"); + Connection conn = consumer.getConnection(); + channel = openChannel(conn); + // Register the channel to the tag + start(); + } + + private boolean isChannelOpen() { + return channel != null && channel.isOpen(); + } + + /** + * Open channel + */ + private Channel openChannel(Connection conn) throws IOException { + log.trace("Creating channel..."); + Channel channel = conn.createChannel(); + log.debug("Created channel: {}", channel); + // setup the basicQos + if (consumer.getEndpoint().isPrefetchEnabled()) { + channel.basicQos(consumer.getEndpoint().getPrefetchSize(), consumer.getEndpoint().getPrefetchCount(), + consumer.getEndpoint().isPrefetchGlobal()); + } + + // This really only needs to be called on the first consumer or on + // reconnections. + if (consumer.getEndpoint().isDeclare()) { + consumer.getEndpoint().declareExchangeAndQueue(channel); + } + return channel; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/f3eae07c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java index 280ed2a..24b2856 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java @@ -24,16 +24,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; -import com.rabbitmq.client.Envelope; -import org.apache.camel.Exchange; -import org.apache.camel.ExchangePattern; -import org.apache.camel.Message; import org.apache.camel.Processor; -import org.apache.camel.RuntimeCamelException; import org.apache.camel.impl.DefaultConsumer; public class RabbitMQConsumer extends DefaultConsumer { @@ -72,42 +65,46 @@ public class RabbitMQConsumer extends DefaultConsumer { } /** - * Open channel + * Returns the exiting open connection or opens a new one + * @throws IOException + * @throws TimeoutException */ - private Channel openChannel() throws IOException { - log.trace("Creating channel..."); - Channel channel = conn.createChannel(); - log.debug("Created channel: {}", channel); - // setup the basicQos - if (endpoint.isPrefetchEnabled()) { - channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(), endpoint.isPrefetchGlobal()); + protected synchronized Connection getConnection() throws IOException, TimeoutException { + if (this.conn != null && this.conn.isOpen()) { + return this.conn; } - return channel; + log.debug("The existing connection is closed"); + openConnection(); + return this.conn; } + /** * Add a consumer thread for given channel */ private void startConsumers() throws IOException { - // First channel used to declare Exchange and Queue - Channel channel = openChannel(); - if (getEndpoint().isDeclare()) { - getEndpoint().declareExchangeAndQueue(channel); + + // Create consumers but don't start yet + for (int i = 0; i < endpoint.getConcurrentConsumers(); i++) { + createConsumer(); } - startConsumer(channel); - // Other channels - for (int i = 1; i < endpoint.getConcurrentConsumers(); i++) { - channel = openChannel(); - startConsumer(channel); + + // Try starting consumers (which will fail if RabbitMQ can't connect) + try { + for (RabbitConsumer consumer : this.consumers) { + consumer.start(); + } + } catch (Exception e) { + log.info("Connection failed, will start background thread to retry!", e); + reconnect(); } } /** * Add a consumer thread for given channel */ - private void startConsumer(Channel channel) throws IOException { - RabbitConsumer consumer = new RabbitConsumer(this, channel); - consumer.start(); + private void createConsumer() throws IOException { + RabbitConsumer consumer = new RabbitConsumer(this); this.consumers.add(consumer); } @@ -115,16 +112,13 @@ public class RabbitMQConsumer extends DefaultConsumer { protected void doStart() throws Exception { executor = endpoint.createExecutor(); log.debug("Using executor {}", executor); - try { - openConnection(); - startConsumers(); - } catch (Exception e) { - log.info("Connection failed, will start background thread to retry!", e); - reconnect(); - } + startConsumers(); } - private void reconnect() { + private synchronized void reconnect() { + if (startConsumerCallable != null) { + return; + } // Open connection, and start message listener in background Integer networkRecoveryInterval = getEndpoint().getNetworkRecoveryInterval(); final long connectionRetryInterval = networkRecoveryInterval != null && networkRecoveryInterval > 0 ? networkRecoveryInterval : 100L; @@ -179,119 +173,7 @@ public class RabbitMQConsumer extends DefaultConsumer { } } - class RabbitConsumer extends com.rabbitmq.client.DefaultConsumer { - - private final RabbitMQConsumer consumer; - private final Channel channel; - private String tag; - - /** - * Constructs a new instance and records its association to the - * passed-in channel. - * - * @param channel the channel to which this consumer is attached - */ - public RabbitConsumer(RabbitMQConsumer consumer, Channel channel) { - super(channel); - this.consumer = consumer; - this.channel = channel; - } - - @Override - public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { - Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, properties, body); - endpoint.getMessageConverter().mergeAmqpProperties(exchange, properties); - - boolean sendReply = properties.getReplyTo() != null; - if (sendReply && !exchange.getPattern().isOutCapable()) { - log.debug("In an inOut capable route"); - exchange.setPattern(ExchangePattern.InOut); - } - - log.trace("Created exchange [exchange={}]", exchange); - long deliveryTag = envelope.getDeliveryTag(); - try { - consumer.getProcessor().process(exchange); - } catch (Exception e) { - exchange.setException(e); - } - - // obtain the message after processing - Message msg; - if (exchange.hasOut()) { - msg = exchange.getOut(); - } else { - msg = exchange.getIn(); - } - - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); - } - - if (!exchange.isFailed()) { - // processing success - if (sendReply && exchange.getPattern().isOutCapable()) { - try { - endpoint.publishExchangeToChannel(exchange, channel, properties.getReplyTo()); - } catch (RuntimeCamelException e) { - getExceptionHandler().handleException("Error processing exchange", exchange, e); - } - } - if (!consumer.endpoint.isAutoAck()) { - log.trace("Acknowledging receipt [delivery_tag={}]", deliveryTag); - channel.basicAck(deliveryTag, false); - } - } else if (endpoint.isTransferException() && exchange.getPattern().isOutCapable()) { - // the inOut exchange failed so put the exception in the body - // and send back - msg.setBody(exchange.getException()); - exchange.setOut(msg); - try { - endpoint.publishExchangeToChannel(exchange, channel, properties.getReplyTo()); - } catch (RuntimeCamelException e) { - getExceptionHandler().handleException("Error processing exchange", exchange, e); - } - - if (!consumer.endpoint.isAutoAck()) { - log.trace("Acknowledging receipt when transferring exception [delivery_tag={}]", deliveryTag); - channel.basicAck(deliveryTag, false); - } - } else { - boolean isRequeueHeaderSet = msg.getHeader(RabbitMQConstants.REQUEUE, false, boolean.class); - // processing failed, then reject and handle the exception - if (deliveryTag != 0 && !consumer.endpoint.isAutoAck()) { - log.trace("Rejecting receipt [delivery_tag={}] with requeue={}", deliveryTag, isRequeueHeaderSet); - if (isRequeueHeaderSet) { - channel.basicReject(deliveryTag, true); - } else { - channel.basicReject(deliveryTag, false); - } - } - } - } - /** - * Bind consumer to channel - */ - public void start() throws IOException { - tag = channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(), this); - } - - /** - * Unbind consumer from channel - */ - public void stop() throws IOException, TimeoutException { - if (tag != null) { - channel.basicCancel(tag); - } - try { - channel.close(); - } catch (TimeoutException e) { - log.error("Timeout occured"); - throw e; - } - } - } /** * Task in charge of opening connection and adding listener when consumer is @@ -316,10 +198,12 @@ public class RabbitMQConsumer extends DefaultConsumer { // Reconnection loop while (running.get() && connectionFailed) { try { - openConnection(); + for (RabbitConsumer consumer : consumers) { + consumer.reconnect(); + } connectionFailed = false; } catch (Exception e) { - log.info("Connection failed, will retry in {}" + connectionRetryInterval + "ms", e); + log.info("Connection failed, will retry in " + connectionRetryInterval + "ms", e); Thread.sleep(connectionRetryInterval); } } http://git-wip-us.apache.org/repos/asf/camel/blob/f3eae07c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java index 41eab3f..3600d33 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java @@ -23,6 +23,7 @@ import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeoutException; + import javax.net.ssl.TrustManager; import com.rabbitmq.client.AMQP; @@ -31,6 +32,7 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Envelope; + import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -41,14 +43,11 @@ import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @UriEndpoint(scheme = "rabbitmq", title = "RabbitMQ", syntax = "rabbitmq:hostname:portNumber/exchangeName", consumerClass = RabbitMQConsumer.class, label = "messaging") public class RabbitMQEndpoint extends DefaultEndpoint { // header to indicate that the message body needs to be de-serialized public static final String SERIALIZE_HEADER = "CamelSerialize"; - private static final Logger LOG = LoggerFactory.getLogger(RabbitMQEndpoint.class); @UriPath @Metadata(required = "true") private String hostname; http://git-wip-us.apache.org/repos/asf/camel/blob/f3eae07c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java index a96d6fd..8c877aa 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java @@ -22,7 +22,6 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; - import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; @@ -70,7 +69,21 @@ public class RabbitMQProducer extends DefaultAsyncProducer { * Do something with a pooled channel (similar to Spring JDBC TransactionTemplate#execute) */ private <T> T execute(ChannelCallback<T> callback) throws Exception { - Channel channel = channelPool.borrowObject(); + Channel channel; + try { + channel = channelPool.borrowObject(); + } catch (IllegalStateException e) { + // Since this method is not synchronized its possible the + // channelPool has been cleared by another thread + checkConnectionAndChannelPool(); + channel = channelPool.borrowObject(); + } + if (!channel.isOpen()) { + log.warn("Got a closed channel from the pool"); + // Reconnect if another thread hasn't yet + checkConnectionAndChannelPool(); + channel = channelPool.borrowObject(); + } try { return callback.doWithChannel(channel); } finally { @@ -80,8 +93,9 @@ public class RabbitMQProducer extends DefaultAsyncProducer { /** * Open connection and initialize channel pool + * @throws Exception */ - private void openConnectionAndChannelPool() throws Exception { + private synchronized void openConnectionAndChannelPool() throws Exception { log.trace("Creating connection..."); this.conn = getEndpoint().connect(executorService); log.debug("Created connection: {}", conn); @@ -100,6 +114,22 @@ public class RabbitMQProducer extends DefaultAsyncProducer { } } + /** + * This will reconnect only if the connection is closed. + * @throws Exception + */ + private synchronized void checkConnectionAndChannelPool() throws Exception { + if (this.conn == null || !this.conn.isOpen()) { + log.info("Reconnecting to RabbitMQ"); + try { + closeConnectionAndChannel(); + } catch (Exception e) { + // no op + } + openConnectionAndChannelPool(); + } + } + @Override protected void doStart() throws Exception { this.executorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "CamelRabbitMQProducer[" + getEndpoint().getQueue() + "]"); @@ -107,15 +137,23 @@ public class RabbitMQProducer extends DefaultAsyncProducer { try { openConnectionAndChannelPool(); } catch (IOException e) { - log.warn("Failed to create connection", e); + log.warn("Failed to create connection. It will attempt to connect again when publishing a message.", e); } } /** * If needed, close Connection and Channel + * @throws IOException */ - private void closeConnectionAndChannel() throws Exception { - channelPool.close(); + private synchronized void closeConnectionAndChannel() throws IOException { + if (channelPool != null) { + try { + channelPool.close(); + channelPool = null; + } catch (Exception e) { + throw new IOException("Error closing channelPool", e); + } + } if (conn != null) { log.debug("Closing connection: {} with timeout: {} ms.", conn, closeTimeout); conn.close(closeTimeout); @@ -194,8 +232,12 @@ public class RabbitMQProducer extends DefaultAsyncProducer { log.debug("Registering reply for {}", correlationId); replyManager.registerReply(replyManager, exchange, callback, originalCorrelationId, correlationId, timeout); - - basicPublish(exchange, exchangeName, key); + try { + basicPublish(exchange, exchangeName, key); + } catch (Exception e) { + replyManager.cancelCorrelationId(correlationId); + throw e; + } // continue routing asynchronously (reply will be processed async when its received) return false; } @@ -230,8 +272,8 @@ public class RabbitMQProducer extends DefaultAsyncProducer { */ private void basicPublish(final Exchange camelExchange, final String rabbitExchange, final String routingKey) throws Exception { if (channelPool == null) { - // Open connection and channel lazily - openConnectionAndChannelPool(); + // Open connection and channel lazily if another thread hasn't + checkConnectionAndChannelPool(); } execute(new ChannelCallback<Void>() { @Override @@ -327,4 +369,4 @@ public class RabbitMQProducer extends DefaultAsyncProducer { return replyManager; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/camel/blob/f3eae07c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java index f6eb64a..4b6110a 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java @@ -87,4 +87,11 @@ public interface ReplyManager { * @param holder containing needed data to process the reply and continue routing */ void processReply(ReplyHolder holder); + + /** + * Unregister a correlationId when you no longer need a reply + * + * @param correlationId + */ + void cancelCorrelationId(String correlationId); } http://git-wip-us.apache.org/repos/asf/camel/blob/f3eae07c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java index b6dacfa..9897159 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java @@ -110,6 +110,15 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl protected abstract ReplyHandler createReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, String originalCorrelationId, String correlationId, long requestTimeout); + + + public void cancelCorrelationId(String correlationId) { + ReplyHandler handler = correlation.get(correlationId); + if (handler != null) { + log.warn("Cancelling correlationID: {}", correlationId); + correlation.remove(correlationId); + } + } public void onMessage(AMQP.BasicProperties properties, byte[] message) { String correlationID = properties.getCorrelationId(); http://git-wip-us.apache.org/repos/asf/camel/blob/f3eae07c/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java index a6676b7..ef6b096 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java @@ -44,6 +44,7 @@ public class RabbitMQConsumerTest { ThreadPoolExecutor e = (ThreadPoolExecutor) Executors.newFixedThreadPool(3); Mockito.when(endpoint.createExecutor()).thenReturn(e); + Mockito.when(endpoint.getConcurrentConsumers()).thenReturn(1); Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn); Mockito.when(conn.createChannel()).thenReturn(channel); @@ -59,6 +60,7 @@ public class RabbitMQConsumerTest { RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor); Mockito.when(endpoint.createExecutor()).thenReturn(Executors.newFixedThreadPool(3)); + Mockito.when(endpoint.getConcurrentConsumers()).thenReturn(1); Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn); Mockito.when(conn.createChannel()).thenReturn(channel); http://git-wip-us.apache.org/repos/asf/camel/blob/f3eae07c/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java index 302440c..ec72f7b 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java @@ -33,13 +33,14 @@ import org.junit.Test; /** * Integration test to check that RabbitMQ Endpoint is able to reconnect to broker when broker - * is not avaibable. + * is not available. * <ul> * <li>Stop the broker</li> * <li>Run the test: the producer complains it can not send messages, the consumer is silent</li> * <li>Start the broker: the producer sends messages, and the consumer receives messages</li> * <li>Stop the broker: the producer complains it can not send messages, the consumer is silent</li> * <li>Start the broker: the producer sends messages, and the consumer receives messages</li> + * <li>Kill all connections from the broker: the producer sends messages, and the consumer receives messages</li> * </ul> */ public class RabbitMQReConnectionIntTest extends CamelTestSupport { http://git-wip-us.apache.org/repos/asf/camel/blob/f3eae07c/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java index fd269a8..c5c3481 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java @@ -28,12 +28,12 @@ import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; public class RabbitMQSupendResumeIntTest extends CamelTestSupport { - private static final String EXCHANGE = "ex4"; + private static final String EXCHANGE = "ex6"; @EndpointInject(uri = "mock:result") private MockEndpoint resultEndpoint; - @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest&queue=q3&routingKey=rk3&autoDelete=false") + @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest&queue=q6&routingKey=rk3&autoDelete=false") private Endpoint rabbitMQEndpoint; @Produce(uri = "direct:start")