This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 489b837 CAMEL-15355: camel-rabbitmq fix arg.queue.x-single-active-consumer to be configured as boolean type for RabbitMQ to make it work. Thanks to Devansh Arora for reporting and suggested fix. 489b837 is described below commit 489b837b590a5783c0de44505b37e32535624213 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Jul 30 15:16:50 2020 +0200 CAMEL-15355: camel-rabbitmq fix arg.queue.x-single-active-consumer to be configured as boolean type for RabbitMQ to make it work. Thanks to Devansh Arora for reporting and suggested fix. --- .../camel/component/rabbitmq/RabbitMQConstants.java | 1 + .../camel/component/rabbitmq/RabbitMQDeclareSupport.java | 5 +++++ .../camel/component/rabbitmq/RabbitMQEndpointTest.java | 16 ++++++++++++++++ .../rabbitmq/integration/RabbitMQConsumerIntTest.java | 2 +- 4 files changed, 23 insertions(+), 1 deletion(-) diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java index 1d26f9a..493c2d4 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java @@ -49,6 +49,7 @@ public final class RabbitMQConstants { public static final String RABBITMQ_QUEUE_MAX_PRIORITY_KEY = "x-max-priority"; public static final String RABBITMQ_QUEUE_MESSAGE_TTL_KEY = "x-message-ttl"; public static final String RABBITMQ_QUEUE_TTL_KEY = "x-expires"; + public static final String RABBITMQ_QUEUE_SINGLE_ACTIVE_CONSUMER_KEY = "x-single-active-consumer"; private RabbitMQConstants() { // Constants class diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java index 06c1021..4a7478c 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java @@ -86,6 +86,11 @@ public class RabbitMQDeclareSupport { if (queueExpiration instanceof String) { queueArgs.put(RabbitMQConstants.RABBITMQ_QUEUE_TTL_KEY, Long.parseLong((String)queueExpiration)); } + + Object singleConsumer = queueArgs.get(RabbitMQConstants.RABBITMQ_QUEUE_SINGLE_ACTIVE_CONSUMER_KEY); + if (singleConsumer instanceof String) { + queueArgs.put(RabbitMQConstants.RABBITMQ_QUEUE_SINGLE_ACTIVE_CONSUMER_KEY, Boolean.parseBoolean((String)singleConsumer)); + } } private void populateQueueArgumentsFromDeadLetterExchange(final Map<String, Object> queueArgs) { diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java index 1d20bef..3c4e57d 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java @@ -38,6 +38,7 @@ import org.apache.camel.component.rabbitmq.integration.RabbitMQDeadLetterRouting import org.apache.camel.spi.Registry; import org.apache.camel.support.SimpleRegistry; import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.Assert; import org.junit.jupiter.api.Test; import org.mockito.Mockito; import org.slf4j.Logger; @@ -300,4 +301,19 @@ public class RabbitMQEndpointTest extends CamelTestSupport { RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?passive=true", RabbitMQEndpoint.class); assertTrue(endpoint.isPassive()); } + + @Test + public void testEndpointArgsIssue() throws Exception { + RabbitMQEndpoint endpoint1 = context.getEndpoint("rabbitmq://localhost:5672/mydirectdelayed?queue=testQ4" + + "&routingKey=testKey&username=me&password=mypwd&threadPoolSize=1&concurrentConsumers=1&autoDelete=false" + + "&vhost=myvhost&arg.queue.x-single-active-consumer=true&arg.exchange.x-delayed-type=direct&exchangeType=x-delayed-message", RabbitMQEndpoint.class); + + Assert.assertNotNull(endpoint1.getArgs()); + Assert.assertEquals(2, endpoint1.getArgs().size()); + Assert.assertNotNull(endpoint1.getExchangeArgs()); + Assert.assertEquals(1, endpoint1.getExchangeArgs().size()); + Assert.assertNotNull(endpoint1.getQueueArgs()); + Assert.assertEquals(1, endpoint1.getQueueArgs().size()); + } + } diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQConsumerIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQConsumerIntTest.java index 6f71edd..c21b05e 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQConsumerIntTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQConsumerIntTest.java @@ -43,7 +43,7 @@ public class RabbitMQConsumerIntTest extends AbstractRabbitMQIntTest { private static final String QUEUE = "q1"; private static final String MSG = "hello world"; - @EndpointInject("rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest") + @EndpointInject("rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest&arg.queue.x-single-active-consumer=true") private Endpoint from; @EndpointInject("mock:result")