Repository: camel Updated Branches: refs/heads/master 18f42eaa9 -> 0c55b9c60
CAMEL-11704: Camel-RabbitMQ: Allow passive queue declaration Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a651701e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a651701e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a651701e Branch: refs/heads/master Commit: a651701eee0f1587d69526fd47d8f96725dd0c1d Parents: 18f42ea Author: Christoph Schachinger <[email protected]> Authored: Fri Aug 25 10:56:07 2017 +0200 Committer: Andrea Cosentino <[email protected]> Committed: Fri Aug 25 13:55:27 2017 +0200 ---------------------------------------------------------------------- .../rabbitmq/RabbitMQDeclareSupport.java | 12 +++++++-- .../component/rabbitmq/RabbitMQEndpoint.java | 28 +++++++++++++++----- .../rabbitmq/RabbitMQEndpointTest.java | 18 ++++++++----- 3 files changed, 43 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/a651701e/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java index b38eae9..a68c521 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java @@ -100,7 +100,10 @@ public class RabbitMQDeclareSupport { } private void declareExchange(final Channel channel, final String exchange, final String exchangeType, final Map<String, Object> exchangeArgs) throws IOException { - channel.exchangeDeclare(exchange, exchangeType, endpoint.isDurable(), endpoint.isAutoDelete(), exchangeArgs); + if (endpoint.isPassive()) + channel.exchangeDeclarePassive(exchange); + else + channel.exchangeDeclare(exchange, exchangeType, endpoint.isDurable(), endpoint.isAutoDelete(), exchangeArgs); } private void declareAndBindQueue(final Channel channel, @@ -111,7 +114,12 @@ public class RabbitMQDeclareSupport { final Map<String, Object> bindingArgs) throws IOException { - channel.queueDeclare(queue, endpoint.isDurable(), endpoint.isExclusive(), endpoint.isAutoDelete(), queueArgs); + + if (endpoint.isPassive()) + channel.queueDeclarePassive(queue); + else + channel.queueDeclare(queue, endpoint.isDurable(), endpoint.isExclusive(), endpoint.isAutoDelete(), queueArgs); + if (shouldBindQueue()) { channel.queueBind(queue, exchange, emptyIfNull(routingKey), bindingArgs); } http://git-wip-us.apache.org/repos/asf/camel/blob/a651701e/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java index ddfdfe8..5e2be3d 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java @@ -27,13 +27,6 @@ import java.util.concurrent.TimeoutException; import javax.net.ssl.TrustManager; -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.Address; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.Envelope; - import org.apache.camel.AsyncEndpoint; import org.apache.camel.Consumer; import org.apache.camel.Exchange; @@ -46,6 +39,13 @@ import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Address; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.Envelope; + /** * The rabbitmq component allows you produce and consume messages from <a href="http://www.rabbitmq.com/">RabbitMQ</a> instances. */ @@ -77,6 +77,8 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint { private boolean durable = true; @UriParam(label = "common", defaultValue = "false") private boolean exclusive; + @UriParam(label = "common", defaultValue = "false") + private boolean passive = false; @UriParam(label = "producer") private boolean bridgeEndpoint; @UriParam(label = "common") @@ -958,5 +960,17 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint { public void setExclusive(boolean exclusive) { this.exclusive = exclusive; } + + public boolean isPassive() { + return passive; + } + + /** + * Passive queues depend on the queue already to be available at RabbitMQ. + */ + public void setPassive(boolean passive) { + this.passive = passive; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/a651701e/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java index 6b7d737..4ad3257 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java @@ -27,18 +27,18 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeoutException; -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.Address; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.Envelope; -import com.rabbitmq.client.impl.LongStringHelper; - import org.apache.camel.Exchange; import org.apache.camel.impl.JndiRegistry; import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; import org.mockito.Mockito; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Address; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.impl.LongStringHelper; + public class RabbitMQEndpointTest extends CamelTestSupport { private Envelope envelope = Mockito.mock(Envelope.class); @@ -355,4 +355,10 @@ public class RabbitMQEndpointTest extends CamelTestSupport { RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?exclusive=true", RabbitMQEndpoint.class); assertTrue(endpoint.isExclusive()); } + + @Test + public void createEndpointWithPassiveEnabled() throws Exception { + RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?passive=true", RabbitMQEndpoint.class); + assertTrue(endpoint.isPassive()); + } }
