This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 0f9f1f0 CAMEL-15639: Add requeue option to camel-rabbitmq consumer to use to control requeue behaviour. Thanks to Ayyanar for patch. 0f9f1f0 is described below commit 0f9f1f0bd9deb42c6f2a454a56684e3a8c690090 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Oct 8 18:54:03 2020 +0200 CAMEL-15639: Add requeue option to camel-rabbitmq consumer to use to control requeue behaviour. Thanks to Ayyanar for patch. --- .../camel/catalog/docs/rabbitmq-component.adoc | 3 +- .../rabbitmq/RabbitMQEndpointConfigurer.java | 5 +++ .../rabbitmq/RabbitMQEndpointUriFactory.java | 3 +- .../apache/camel/component/rabbitmq/rabbitmq.json | 1 + .../src/main/docs/rabbitmq-component.adoc | 3 +- .../camel/component/rabbitmq/RabbitConsumer.java | 7 ++-- .../camel/component/rabbitmq/RabbitMQEndpoint.java | 18 +++++++++- .../dsl/RabbitMQEndpointBuilderFactory.java | 38 ++++++++++++++++++++++ .../modules/ROOT/pages/rabbitmq-component.adoc | 3 +- 9 files changed, 73 insertions(+), 8 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/rabbitmq-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/rabbitmq-component.adoc index 48ffda6..6bf4caa 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/rabbitmq-component.adoc +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/rabbitmq-component.adoc @@ -142,7 +142,7 @@ with the following path and query parameters: |=== -=== Query Parameters (66 parameters): +=== Query Parameters (67 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -179,6 +179,7 @@ with the following path and query parameters: | *prefetchEnabled* (consumer) | Enables the quality of service on the RabbitMQConsumer side. You need to specify the option of prefetchSize, prefetchCount, prefetchGlobal at the same time | false | boolean | *prefetchGlobal* (consumer) | If the settings should be applied to the entire channel rather than each consumer You need to specify the option of prefetchSize, prefetchCount, prefetchGlobal at the same time | false | boolean | *prefetchSize* (consumer) | The maximum amount of content (measured in octets) that the server will deliver, 0 if unlimited. You need to specify the option of prefetchSize, prefetchCount, prefetchGlobal at the same time | | int +| *reQueue* (consumer) | This is used by the consumer to control rejection of the message. When the consumer is complete processing the exchange, and if the exchange failed, then the consumer is going to reject the message from the RabbitMQ broker. If the header CamelRabbitmqRequeue is present then the value of the header will be used, otherwise this endpoint value is used as fallback. If the value is false (by default) then the message is discarded/dead-lettered. If the value is true, t [...] | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. There are 3 enums and the value can be one of: InOnly, InOut, InOptionalOut | | ExchangePattern | *threadPoolSize* (consumer) | The consumer uses a Thread Pool Executor with a fixed number of threads. This setting allows you to set that number of threads. | 10 | int diff --git a/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointConfigurer.java b/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointConfigurer.java index b92cc95..513dee0 100644 --- a/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointConfigurer.java +++ b/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointConfigurer.java @@ -50,6 +50,7 @@ public class RabbitMQEndpointConfigurer extends PropertyConfigurerSupport implem map.put("prefetchEnabled", boolean.class); map.put("prefetchGlobal", boolean.class); map.put("prefetchSize", int.class); + map.put("reQueue", boolean.class); map.put("exceptionHandler", org.apache.camel.spi.ExceptionHandler.class); map.put("exchangePattern", org.apache.camel.ExchangePattern.class); map.put("threadPoolSize", int.class); @@ -177,6 +178,8 @@ public class RabbitMQEndpointConfigurer extends PropertyConfigurerSupport implem case "publisheracknowledgementstimeout": case "publisherAcknowledgementsTimeout": target.setPublisherAcknowledgementsTimeout(property(camelContext, long.class, value)); return true; case "queue": target.setQueue(property(camelContext, java.lang.String.class, value)); return true; + case "requeue": + case "reQueue": target.setReQueue(property(camelContext, boolean.class, value)); return true; case "requesttimeout": case "requestTimeout": target.setRequestTimeout(property(camelContext, long.class, value)); return true; case "requesttimeoutcheckerinterval": @@ -308,6 +311,8 @@ public class RabbitMQEndpointConfigurer extends PropertyConfigurerSupport implem case "publisheracknowledgementstimeout": case "publisherAcknowledgementsTimeout": return target.getPublisherAcknowledgementsTimeout(); case "queue": return target.getQueue(); + case "requeue": + case "reQueue": return target.isReQueue(); case "requesttimeout": case "requestTimeout": return target.getRequestTimeout(); case "requesttimeoutcheckerinterval": diff --git a/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointUriFactory.java b/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointUriFactory.java index 6751325..4b90572 100644 --- a/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointUriFactory.java +++ b/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointUriFactory.java @@ -18,7 +18,7 @@ public class RabbitMQEndpointUriFactory extends org.apache.camel.support.compone private static final Set<String> PROPERTY_NAMES; static { - Set<String> set = new HashSet<>(67); + Set<String> set = new HashSet<>(68); set.add("exchangeName"); set.add("addresses"); set.add("autoDelete"); @@ -51,6 +51,7 @@ public class RabbitMQEndpointUriFactory extends org.apache.camel.support.compone set.add("prefetchEnabled"); set.add("prefetchGlobal"); set.add("prefetchSize"); + set.add("reQueue"); set.add("exceptionHandler"); set.add("exchangePattern"); set.add("threadPoolSize"); diff --git a/components/camel-rabbitmq/src/generated/resources/org/apache/camel/component/rabbitmq/rabbitmq.json b/components/camel-rabbitmq/src/generated/resources/org/apache/camel/component/rabbitmq/rabbitmq.json index 3a028c1..a8fdaa6 100644 --- a/components/camel-rabbitmq/src/generated/resources/org/apache/camel/component/rabbitmq/rabbitmq.json +++ b/components/camel-rabbitmq/src/generated/resources/org/apache/camel/component/rabbitmq/rabbitmq.json @@ -111,6 +111,7 @@ "prefetchEnabled": { "kind": "parameter", "displayName": "Prefetch Enabled", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Enables the quality of service on the RabbitMQConsumer side. You need to specify the option of prefetchSize, prefetchCount, prefetchGlobal at the same time" }, "prefetchGlobal": { "kind": "parameter", "displayName": "Prefetch Global", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "If the settings should be applied to the entire channel rather than each consumer You need to specify the option of prefetchSize, prefetchCount, prefetchGlobal at the same time" }, "prefetchSize": { "kind": "parameter", "displayName": "Prefetch Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "description": "The maximum amount of content (measured in octets) that the server will deliver, 0 if unlimited. You need to specify the option of prefetchSize, prefetchCount, prefetchGlobal at the same time" }, + "reQueue": { "kind": "parameter", "displayName": "Re Queue", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "This is used by the consumer to control rejection of the message. When the consumer is complete processing the exchange, and if the exchange failed, then the consumer is going to reject the message from the RabbitMQ broker. If the header CamelRabb [...] "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with [...] "exchangePattern": { "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", "InOptionalOut" ], "deprecated": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, "threadPoolSize": { "kind": "parameter", "displayName": "Thread Pool Size", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": "10", "description": "The consumer uses a Thread Pool Executor with a fixed number of threads. This setting allows you to set that number of threads." }, diff --git a/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc b/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc index 48ffda6..6bf4caa 100644 --- a/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc +++ b/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc @@ -142,7 +142,7 @@ with the following path and query parameters: |=== -=== Query Parameters (66 parameters): +=== Query Parameters (67 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -179,6 +179,7 @@ with the following path and query parameters: | *prefetchEnabled* (consumer) | Enables the quality of service on the RabbitMQConsumer side. You need to specify the option of prefetchSize, prefetchCount, prefetchGlobal at the same time | false | boolean | *prefetchGlobal* (consumer) | If the settings should be applied to the entire channel rather than each consumer You need to specify the option of prefetchSize, prefetchCount, prefetchGlobal at the same time | false | boolean | *prefetchSize* (consumer) | The maximum amount of content (measured in octets) that the server will deliver, 0 if unlimited. You need to specify the option of prefetchSize, prefetchCount, prefetchGlobal at the same time | | int +| *reQueue* (consumer) | This is used by the consumer to control rejection of the message. When the consumer is complete processing the exchange, and if the exchange failed, then the consumer is going to reject the message from the RabbitMQ broker. If the header CamelRabbitmqRequeue is present then the value of the header will be used, otherwise this endpoint value is used as fallback. If the value is false (by default) then the message is discarded/dead-lettered. If the value is true, t [...] | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. There are 3 enums and the value can be one of: InOnly, InOut, InOptionalOut | | ExchangePattern | *threadPoolSize* (consumer) | The consumer uses a Thread Pool Executor with a fixed number of threads. This setting allows you to set that number of threads. | 10 | int diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java index ccfce12..f1bd416 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java @@ -42,7 +42,6 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu private final RabbitMQConsumer consumer; private Channel channel; private String tag; - /** Consumer tag for this consumer. */ private volatile String consumerTag; private volatile boolean stopping; @@ -177,9 +176,11 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu channel.basicAck(deliveryTag, false); } } else { - boolean isRequeueHeaderSet = false; + boolean isRequeueHeaderSet = consumer.getEndpoint().isReQueue(); try { - isRequeueHeaderSet = msg.getHeader(RabbitMQConstants.REQUEUE, false, boolean.class); + isRequeueHeaderSet = msg.getHeader(RabbitMQConstants.REQUEUE, isRequeueHeaderSet, boolean.class); + LOG.trace("Consumer requeue property is overridden using the message header requeue property as: {}", + isRequeueHeaderSet); } catch (Exception e) { // ignore as its an invalid header } diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java index 5ff131d..7f865de 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java @@ -184,7 +184,8 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint { private ExceptionHandler connectionFactoryExceptionHandler; @UriParam(label = "allowMessageBodySerialization", defaultValue = "false") private boolean allowMessageBodySerialization; - + @UriParam(label = "consumer") + private boolean reQueue; // camel-jms supports this setting but it is not currently configurable in // camel-rabbitmq private boolean useMessageIDAsCorrelationID = true; @@ -1038,4 +1039,19 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint { public void setConnectionFactoryExceptionHandler(ExceptionHandler connectionFactoryExceptionHandler) { this.connectionFactoryExceptionHandler = connectionFactoryExceptionHandler; } + + /** + * This is used by the consumer to control rejection of the message. When the consumer is complete processing the + * exchange, and if the exchange failed, then the consumer is going to reject the message from the RabbitMQ broker. + * If the header CamelRabbitmqRequeue is present then the value of the header will be used, otherwise this endpoint + * value is used as fallback. If the value is false (by default) then the message is discarded/dead-lettered. If the + * value is true, then the message is re-queued. + */ + public boolean isReQueue() { + return reQueue; + } + + public void setReQueue(boolean reQueue) { + this.reQueue = reQueue; + } } diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/RabbitMQEndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/RabbitMQEndpointBuilderFactory.java index 088a97c..7c17f4d 100644 --- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/RabbitMQEndpointBuilderFactory.java +++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/RabbitMQEndpointBuilderFactory.java @@ -743,6 +743,44 @@ public interface RabbitMQEndpointBuilderFactory { return this; } /** + * This is used by the consumer to control rejection of the message. + * When the consumer is complete processing the exchange, and if the + * exchange failed, then the consumer is going to reject the message + * from the RabbitMQ broker. If the header CamelRabbitmqRequeue is + * present then the value of the header will be used, otherwise this + * endpoint value is used as fallback. If the value is false (by + * default) then the message is discarded/dead-lettered. If the value is + * true, then the message is re-queued. + * + * The option is a: <code>boolean</code> type. + * + * Default: false + * Group: consumer + */ + default RabbitMQEndpointConsumerBuilder reQueue(boolean reQueue) { + doSetProperty("reQueue", reQueue); + return this; + } + /** + * This is used by the consumer to control rejection of the message. + * When the consumer is complete processing the exchange, and if the + * exchange failed, then the consumer is going to reject the message + * from the RabbitMQ broker. If the header CamelRabbitmqRequeue is + * present then the value of the header will be used, otherwise this + * endpoint value is used as fallback. If the value is false (by + * default) then the message is discarded/dead-lettered. If the value is + * true, then the message is re-queued. + * + * The option will be converted to a <code>boolean</code> type. + * + * Default: false + * Group: consumer + */ + default RabbitMQEndpointConsumerBuilder reQueue(String reQueue) { + doSetProperty("reQueue", reQueue); + return this; + } + /** * Whether to allow Java serialization of the message body or not. If * this value is true, the message body will be serialized on the * producer side using Java serialization, if no type converter can diff --git a/docs/components/modules/ROOT/pages/rabbitmq-component.adoc b/docs/components/modules/ROOT/pages/rabbitmq-component.adoc index f4d9111..1cf378a 100644 --- a/docs/components/modules/ROOT/pages/rabbitmq-component.adoc +++ b/docs/components/modules/ROOT/pages/rabbitmq-component.adoc @@ -144,7 +144,7 @@ with the following path and query parameters: |=== -=== Query Parameters (66 parameters): +=== Query Parameters (67 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -181,6 +181,7 @@ with the following path and query parameters: | *prefetchEnabled* (consumer) | Enables the quality of service on the RabbitMQConsumer side. You need to specify the option of prefetchSize, prefetchCount, prefetchGlobal at the same time | false | boolean | *prefetchGlobal* (consumer) | If the settings should be applied to the entire channel rather than each consumer You need to specify the option of prefetchSize, prefetchCount, prefetchGlobal at the same time | false | boolean | *prefetchSize* (consumer) | The maximum amount of content (measured in octets) that the server will deliver, 0 if unlimited. You need to specify the option of prefetchSize, prefetchCount, prefetchGlobal at the same time | | int +| *reQueue* (consumer) | This is used by the consumer to control rejection of the message. When the consumer is complete processing the exchange, and if the exchange failed, then the consumer is going to reject the message from the RabbitMQ broker. If the header CamelRabbitmqRequeue is present then the value of the header will be used, otherwise this endpoint value is used as fallback. If the value is false (by default) then the message is discarded/dead-lettered. If the value is true, t [...] | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. There are 3 enums and the value can be one of: InOnly, InOut, InOptionalOut | | ExchangePattern | *threadPoolSize* (consumer) | The consumer uses a Thread Pool Executor with a fixed number of threads. This setting allows you to set that number of threads. | 10 | int