Repository: camel Updated Branches: refs/heads/master f2dff165b -> 945b46fff
Upgrade Rabbitmq Amqp Client to version 3.5.4 Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/945b46ff Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/945b46ff Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/945b46ff Branch: refs/heads/master Commit: 945b46fffc988b86c49d637d7fb66cc2b7a66a8e Parents: f2dff16 Author: Andrea Cosentino <anco...@gmail.com> Authored: Fri Jul 31 17:30:21 2015 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Fri Jul 31 17:30:51 2015 +0200 ---------------------------------------------------------------------- .../component/rabbitmq/RabbitMQConsumer.java | 25 +++++++++++++++----- .../component/rabbitmq/RabbitMQEndpoint.java | 3 ++- .../reply/TemporaryQueueReplyManager.java | 3 ++- .../rabbitmq/RabbitMQConsumerIntTest.java | 3 ++- .../rabbitmq/RabbitMQEndpointTest.java | 3 ++- .../rabbitmq/RabbitMQProducerIntTest.java | 4 ++-- .../rabbitmq/RabbitMQProducerTest.java | 3 ++- .../rabbitmq/RabbitMQSpringIntTest.java | 10 ++++---- parent/pom.xml | 2 +- 9 files changed, 38 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/945b46ff/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java index 48ec60f..1fded4d 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import com.rabbitmq.client.AMQP; @@ -64,7 +65,7 @@ public class RabbitMQConsumer extends DefaultConsumer { /** * Open connection */ - private void openConnection() throws IOException { + private void openConnection() throws IOException, TimeoutException { log.trace("Creating connection..."); this.conn = getEndpoint().connect(executor); log.debug("Created connection: {}", conn); @@ -129,14 +130,20 @@ public class RabbitMQConsumer extends DefaultConsumer { } /** - * If needed, close Connection and Channels + * If needed, close Connection and Channels */ - private void closeConnectionAndChannel() throws IOException { + private void closeConnectionAndChannel() throws IOException, TimeoutException { if (startConsumerCallable != null) { startConsumerCallable.stop(); } for (RabbitConsumer consumer : this.consumers) { - consumer.stop(); + try { + consumer.stop(); + } + catch (TimeoutException e) { + log.error("Timeout occured"); + throw e; + } } this.consumers.clear(); if (conn != null) { @@ -248,11 +255,17 @@ public class RabbitMQConsumer extends DefaultConsumer { /** * Unbind consumer from channel */ - public void stop() throws IOException { + public void stop() throws IOException, TimeoutException { if (tag != null) { channel.basicCancel(tag); } - channel.close(); + try { + channel.close(); + } + catch (TimeoutException e) { + log.error("Timeout occured"); + throw e; + } } } http://git-wip-us.apache.org/repos/asf/camel/blob/945b46ff/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java index 54760db..5b49e42 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; import javax.net.ssl.TrustManager; @@ -332,7 +333,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return consumer; } - public Connection connect(ExecutorService executor) throws IOException { + public Connection connect(ExecutorService executor) throws IOException, TimeoutException { if (getAddresses() == null) { return getOrCreateConnectionFactory().newConnection(executor); } else { http://git-wip-us.apache.org/repos/asf/camel/blob/945b46ff/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java index 4bd1242..d3c2283 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java @@ -17,6 +17,7 @@ package org.apache.camel.component.rabbitmq.reply; import java.io.IOException; +import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.AMQP.Queue.DeclareOk; @@ -142,7 +143,7 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport { /** * Unbind consumer from channel */ - private void stop() throws IOException { + private void stop() throws IOException, TimeoutException { if (channel.isOpen()) { if (tag != null) { channel.basicCancel(tag); http://git-wip-us.apache.org/repos/asf/camel/blob/945b46ff/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerIntTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerIntTest.java index da7b27a..2066380 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerIntTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerIntTest.java @@ -17,6 +17,7 @@ package org.apache.camel.component.rabbitmq; import java.io.IOException; +import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; @@ -52,7 +53,7 @@ public class RabbitMQConsumerIntTest extends CamelTestSupport { } @Test - public void sentMessageIsReceived() throws InterruptedException, IOException { + public void sentMessageIsReceived() throws InterruptedException, IOException, TimeoutException { to.expectedMessageCount(1); to.expectedHeaderReceived(RabbitMQConstants.REPLY_TO, "myReply"); http://git-wip-us.apache.org/repos/asf/camel/blob/945b46ff/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java index 1fa4d17..ad9aca8 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Address; @@ -172,7 +173,7 @@ public class RabbitMQEndpointTest extends CamelTestSupport { assertEquals("Get a wrong endpoint address.", new Address("server2", 12345), endpoint.getAddresses()[1]); } - private ConnectionFactory createConnectionFactory(String uri) { + private ConnectionFactory createConnectionFactory(String uri) throws TimeoutException { RabbitMQEndpoint endpoint = context.getEndpoint(uri, RabbitMQEndpoint.class); try { endpoint.connect(Executors.newSingleThreadExecutor()); http://git-wip-us.apache.org/repos/asf/camel/blob/945b46ff/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java index cf73ea7..377f8a9 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java @@ -19,6 +19,7 @@ package org.apache.camel.component.rabbitmq; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; @@ -27,7 +28,6 @@ import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; - import org.apache.camel.Endpoint; import org.apache.camel.EndpointInject; import org.apache.camel.Produce; @@ -60,7 +60,7 @@ public class RabbitMQProducerIntTest extends CamelTestSupport { } @Test - public void producedMessageIsReceived() throws InterruptedException, IOException { + public void producedMessageIsReceived() throws InterruptedException, IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); http://git-wip-us.apache.org/repos/asf/camel/blob/945b46ff/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java index cefece5..ee72d8a 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java @@ -23,6 +23,7 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Connection; @@ -47,7 +48,7 @@ public class RabbitMQProducerTest { private Connection conn = Mockito.mock(Connection.class); @Before - public void before() throws IOException { + public void before() throws IOException, TimeoutException { Mockito.when(exchange.getIn()).thenReturn(message); Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn); Mockito.when(conn.createChannel()).thenReturn(null); http://git-wip-us.apache.org/repos/asf/camel/blob/945b46ff/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java index 6a3a3a5..5a397fe 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java @@ -17,6 +17,7 @@ package org.apache.camel.component.rabbitmq; import java.io.IOException; +import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; @@ -24,6 +25,7 @@ import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; + import org.apache.camel.Produce; import org.apache.camel.ProducerTemplate; import org.junit.After; @@ -58,7 +60,7 @@ public class RabbitMQSpringIntTest { return connection != null && connection.isOpen(); } - private Connection openConnection() throws IOException { + private Connection openConnection() throws IOException, TimeoutException { if (!isConnectionOpened()) { LOGGER.info("Open connection"); connection = connectionFactory.newConnection(); @@ -70,7 +72,7 @@ public class RabbitMQSpringIntTest { return channel != null && channel.isOpen(); } - private Channel openChannel() throws IOException { + private Channel openChannel() throws IOException, TimeoutException { if (!isChannelOpened()) { LOGGER.info("Open channel"); channel = openConnection().createChannel(); @@ -79,12 +81,12 @@ public class RabbitMQSpringIntTest { } @Before - public void bindQueueExchange() throws IOException { + public void bindQueueExchange() throws IOException, TimeoutException { openChannel(); } @After - public void closeConnection() { + public void closeConnection() throws TimeoutException { if (isChannelOpened()) { try { LOGGER.info("Close channel"); http://git-wip-us.apache.org/repos/asf/camel/blob/945b46ff/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index 8ecff90..0c1159d 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -404,7 +404,7 @@ <quartz-version-range>[1.8,2)</quartz-version-range> <quartz2-version>2.2.1</quartz2-version> <quickfix-bundle-version>1.6.0_1</quickfix-bundle-version> - <rabbitmq-amqp-client-version>3.3.4</rabbitmq-amqp-client-version> + <rabbitmq-amqp-client-version>3.5.4</rabbitmq-amqp-client-version> <reflections-bundle-version>0.9.8_1</reflections-bundle-version> <regexp-bundle-version>1.4_1</regexp-bundle-version> <restlet-version>2.3.1</restlet-version>