This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f9f1f0  CAMEL-15639: Add requeue option to camel-rabbitmq consumer to 
use to control requeue behaviour. Thanks to Ayyanar for patch.
0f9f1f0 is described below

commit 0f9f1f0bd9deb42c6f2a454a56684e3a8c690090
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Thu Oct 8 18:54:03 2020 +0200

    CAMEL-15639: Add requeue option to camel-rabbitmq consumer to use to 
control requeue behaviour. Thanks to Ayyanar for patch.
---
 .../camel/catalog/docs/rabbitmq-component.adoc     |  3 +-
 .../rabbitmq/RabbitMQEndpointConfigurer.java       |  5 +++
 .../rabbitmq/RabbitMQEndpointUriFactory.java       |  3 +-
 .../apache/camel/component/rabbitmq/rabbitmq.json  |  1 +
 .../src/main/docs/rabbitmq-component.adoc          |  3 +-
 .../camel/component/rabbitmq/RabbitConsumer.java   |  7 ++--
 .../camel/component/rabbitmq/RabbitMQEndpoint.java | 18 +++++++++-
 .../dsl/RabbitMQEndpointBuilderFactory.java        | 38 ++++++++++++++++++++++
 .../modules/ROOT/pages/rabbitmq-component.adoc     |  3 +-
 9 files changed, 73 insertions(+), 8 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/rabbitmq-component.adoc
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/rabbitmq-component.adoc
index 48ffda6..6bf4caa 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/rabbitmq-component.adoc
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/rabbitmq-component.adoc
@@ -142,7 +142,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (66 parameters):
+=== Query Parameters (67 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -179,6 +179,7 @@ with the following path and query parameters:
 | *prefetchEnabled* (consumer) | Enables the quality of service on the 
RabbitMQConsumer side. You need to specify the option of prefetchSize, 
prefetchCount, prefetchGlobal at the same time | false | boolean
 | *prefetchGlobal* (consumer) | If the settings should be applied to the 
entire channel rather than each consumer You need to specify the option of 
prefetchSize, prefetchCount, prefetchGlobal at the same time | false | boolean
 | *prefetchSize* (consumer) | The maximum amount of content (measured in 
octets) that the server will deliver, 0 if unlimited. You need to specify the 
option of prefetchSize, prefetchCount, prefetchGlobal at the same time |  | int
+| *reQueue* (consumer) | This is used by the consumer to control rejection of 
the message. When the consumer is complete processing the exchange, and if the 
exchange failed, then the consumer is going to reject the message from the 
RabbitMQ broker. If the header CamelRabbitmqRequeue is present then the value 
of the header will be used, otherwise this endpoint value is used as fallback. 
If the value is false (by default) then the message is discarded/dead-lettered. 
If the value is true, t [...]
 | *exceptionHandler* (consumer) | To let the consumer use a custom 
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this 
option is not in use. By default the consumer will deal with exceptions, that 
will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer 
creates an exchange. There are 3 enums and the value can be one of: InOnly, 
InOut, InOptionalOut |  | ExchangePattern
 | *threadPoolSize* (consumer) | The consumer uses a Thread Pool Executor with 
a fixed number of threads. This setting allows you to set that number of 
threads. | 10 | int
diff --git 
a/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointConfigurer.java
 
b/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointConfigurer.java
index b92cc95..513dee0 100644
--- 
a/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointConfigurer.java
+++ 
b/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointConfigurer.java
@@ -50,6 +50,7 @@ public class RabbitMQEndpointConfigurer extends 
PropertyConfigurerSupport implem
         map.put("prefetchEnabled", boolean.class);
         map.put("prefetchGlobal", boolean.class);
         map.put("prefetchSize", int.class);
+        map.put("reQueue", boolean.class);
         map.put("exceptionHandler", 
org.apache.camel.spi.ExceptionHandler.class);
         map.put("exchangePattern", org.apache.camel.ExchangePattern.class);
         map.put("threadPoolSize", int.class);
@@ -177,6 +178,8 @@ public class RabbitMQEndpointConfigurer extends 
PropertyConfigurerSupport implem
         case "publisheracknowledgementstimeout":
         case "publisherAcknowledgementsTimeout": 
target.setPublisherAcknowledgementsTimeout(property(camelContext, long.class, 
value)); return true;
         case "queue": target.setQueue(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "requeue":
+        case "reQueue": target.setReQueue(property(camelContext, 
boolean.class, value)); return true;
         case "requesttimeout":
         case "requestTimeout": target.setRequestTimeout(property(camelContext, 
long.class, value)); return true;
         case "requesttimeoutcheckerinterval":
@@ -308,6 +311,8 @@ public class RabbitMQEndpointConfigurer extends 
PropertyConfigurerSupport implem
         case "publisheracknowledgementstimeout":
         case "publisherAcknowledgementsTimeout": return 
target.getPublisherAcknowledgementsTimeout();
         case "queue": return target.getQueue();
+        case "requeue":
+        case "reQueue": return target.isReQueue();
         case "requesttimeout":
         case "requestTimeout": return target.getRequestTimeout();
         case "requesttimeoutcheckerinterval":
diff --git 
a/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointUriFactory.java
 
b/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointUriFactory.java
index 6751325..4b90572 100644
--- 
a/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointUriFactory.java
+++ 
b/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointUriFactory.java
@@ -18,7 +18,7 @@ public class RabbitMQEndpointUriFactory extends 
org.apache.camel.support.compone
 
     private static final Set<String> PROPERTY_NAMES;
     static {
-        Set<String> set = new HashSet<>(67);
+        Set<String> set = new HashSet<>(68);
         set.add("exchangeName");
         set.add("addresses");
         set.add("autoDelete");
@@ -51,6 +51,7 @@ public class RabbitMQEndpointUriFactory extends 
org.apache.camel.support.compone
         set.add("prefetchEnabled");
         set.add("prefetchGlobal");
         set.add("prefetchSize");
+        set.add("reQueue");
         set.add("exceptionHandler");
         set.add("exchangePattern");
         set.add("threadPoolSize");
diff --git 
a/components/camel-rabbitmq/src/generated/resources/org/apache/camel/component/rabbitmq/rabbitmq.json
 
b/components/camel-rabbitmq/src/generated/resources/org/apache/camel/component/rabbitmq/rabbitmq.json
index 3a028c1..a8fdaa6 100644
--- 
a/components/camel-rabbitmq/src/generated/resources/org/apache/camel/component/rabbitmq/rabbitmq.json
+++ 
b/components/camel-rabbitmq/src/generated/resources/org/apache/camel/component/rabbitmq/rabbitmq.json
@@ -111,6 +111,7 @@
     "prefetchEnabled": { "kind": "parameter", "displayName": "Prefetch 
Enabled", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "secret": false, 
"defaultValue": false, "description": "Enables the quality of service on the 
RabbitMQConsumer side. You need to specify the option of prefetchSize, 
prefetchCount, prefetchGlobal at the same time" },
     "prefetchGlobal": { "kind": "parameter", "displayName": "Prefetch Global", 
"group": "consumer", "label": "consumer", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": 
false, "description": "If the settings should be applied to the entire channel 
rather than each consumer You need to specify the option of prefetchSize, 
prefetchCount, prefetchGlobal at the same time" },
     "prefetchSize": { "kind": "parameter", "displayName": "Prefetch Size", 
"group": "consumer", "label": "consumer", "required": false, "type": "integer", 
"javaType": "int", "deprecated": false, "secret": false, "description": "The 
maximum amount of content (measured in octets) that the server will deliver, 0 
if unlimited. You need to specify the option of prefetchSize, prefetchCount, 
prefetchGlobal at the same time" },
+    "reQueue": { "kind": "parameter", "displayName": "Re Queue", "group": 
"consumer", "label": "consumer", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": 
false, "description": "This is used by the consumer to control rejection of the 
message. When the consumer is complete processing the exchange, and if the 
exchange failed, then the consumer is going to reject the message from the 
RabbitMQ broker. If the header CamelRabb [...]
     "exceptionHandler": { "kind": "parameter", "displayName": "Exception 
Handler", "group": "consumer (advanced)", "label": "consumer,advanced", 
"required": false, "type": "object", "javaType": 
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", 
"deprecated": false, "secret": false, "description": "To let the consumer use a 
custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled 
then this option is not in use. By default the consumer will deal with [...]
     "exchangePattern": { "kind": "parameter", "displayName": "Exchange 
Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", 
"required": false, "type": "object", "javaType": 
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", 
"InOptionalOut" ], "deprecated": false, "secret": false, "description": "Sets 
the exchange pattern when the consumer creates an exchange." },
     "threadPoolSize": { "kind": "parameter", "displayName": "Thread Pool 
Size", "group": "consumer (advanced)", "label": "consumer,advanced", 
"required": false, "type": "integer", "javaType": "int", "deprecated": false, 
"secret": false, "defaultValue": "10", "description": "The consumer uses a 
Thread Pool Executor with a fixed number of threads. This setting allows you to 
set that number of threads." },
diff --git a/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc 
b/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
index 48ffda6..6bf4caa 100644
--- a/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
+++ b/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
@@ -142,7 +142,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (66 parameters):
+=== Query Parameters (67 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -179,6 +179,7 @@ with the following path and query parameters:
 | *prefetchEnabled* (consumer) | Enables the quality of service on the 
RabbitMQConsumer side. You need to specify the option of prefetchSize, 
prefetchCount, prefetchGlobal at the same time | false | boolean
 | *prefetchGlobal* (consumer) | If the settings should be applied to the 
entire channel rather than each consumer You need to specify the option of 
prefetchSize, prefetchCount, prefetchGlobal at the same time | false | boolean
 | *prefetchSize* (consumer) | The maximum amount of content (measured in 
octets) that the server will deliver, 0 if unlimited. You need to specify the 
option of prefetchSize, prefetchCount, prefetchGlobal at the same time |  | int
+| *reQueue* (consumer) | This is used by the consumer to control rejection of 
the message. When the consumer is complete processing the exchange, and if the 
exchange failed, then the consumer is going to reject the message from the 
RabbitMQ broker. If the header CamelRabbitmqRequeue is present then the value 
of the header will be used, otherwise this endpoint value is used as fallback. 
If the value is false (by default) then the message is discarded/dead-lettered. 
If the value is true, t [...]
 | *exceptionHandler* (consumer) | To let the consumer use a custom 
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this 
option is not in use. By default the consumer will deal with exceptions, that 
will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer 
creates an exchange. There are 3 enums and the value can be one of: InOnly, 
InOut, InOptionalOut |  | ExchangePattern
 | *threadPoolSize* (consumer) | The consumer uses a Thread Pool Executor with 
a fixed number of threads. This setting allows you to set that number of 
threads. | 10 | int
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index ccfce12..f1bd416 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -42,7 +42,6 @@ class RabbitConsumer extends ServiceSupport implements 
com.rabbitmq.client.Consu
     private final RabbitMQConsumer consumer;
     private Channel channel;
     private String tag;
-    /** Consumer tag for this consumer. */
     private volatile String consumerTag;
     private volatile boolean stopping;
 
@@ -177,9 +176,11 @@ class RabbitConsumer extends ServiceSupport implements 
com.rabbitmq.client.Consu
                     channel.basicAck(deliveryTag, false);
                 }
             } else {
-                boolean isRequeueHeaderSet = false;
+                boolean isRequeueHeaderSet = 
consumer.getEndpoint().isReQueue();
                 try {
-                    isRequeueHeaderSet = 
msg.getHeader(RabbitMQConstants.REQUEUE, false, boolean.class);
+                    isRequeueHeaderSet = 
msg.getHeader(RabbitMQConstants.REQUEUE, isRequeueHeaderSet, boolean.class);
+                    LOG.trace("Consumer requeue property is overridden using 
the message header requeue property as: {}",
+                            isRequeueHeaderSet);
                 } catch (Exception e) {
                     // ignore as its an invalid header
                 }
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 5ff131d..7f865de 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -184,7 +184,8 @@ public class RabbitMQEndpoint extends DefaultEndpoint 
implements AsyncEndpoint {
     private ExceptionHandler connectionFactoryExceptionHandler;
     @UriParam(label = "allowMessageBodySerialization", defaultValue = "false")
     private boolean allowMessageBodySerialization;
-
+    @UriParam(label = "consumer")
+    private boolean reQueue;
     // camel-jms supports this setting but it is not currently configurable in
     // camel-rabbitmq
     private boolean useMessageIDAsCorrelationID = true;
@@ -1038,4 +1039,19 @@ public class RabbitMQEndpoint extends DefaultEndpoint 
implements AsyncEndpoint {
     public void setConnectionFactoryExceptionHandler(ExceptionHandler 
connectionFactoryExceptionHandler) {
         this.connectionFactoryExceptionHandler = 
connectionFactoryExceptionHandler;
     }
+
+    /**
+     * This is used by the consumer to control rejection of the message. When 
the consumer is complete processing the
+     * exchange, and if the exchange failed, then the consumer is going to 
reject the message from the RabbitMQ broker.
+     * If the header CamelRabbitmqRequeue is present then the value of the 
header will be used, otherwise this endpoint
+     * value is used as fallback. If the value is false (by default) then the 
message is discarded/dead-lettered. If the
+     * value is true, then the message is re-queued.
+     */
+    public boolean isReQueue() {
+        return reQueue;
+    }
+
+    public void setReQueue(boolean reQueue) {
+        this.reQueue = reQueue;
+    }
 }
diff --git 
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/RabbitMQEndpointBuilderFactory.java
 
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/RabbitMQEndpointBuilderFactory.java
index 088a97c..7c17f4d 100644
--- 
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/RabbitMQEndpointBuilderFactory.java
+++ 
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/RabbitMQEndpointBuilderFactory.java
@@ -743,6 +743,44 @@ public interface RabbitMQEndpointBuilderFactory {
             return this;
         }
         /**
+         * This is used by the consumer to control rejection of the message.
+         * When the consumer is complete processing the exchange, and if the
+         * exchange failed, then the consumer is going to reject the message
+         * from the RabbitMQ broker. If the header CamelRabbitmqRequeue is
+         * present then the value of the header will be used, otherwise this
+         * endpoint value is used as fallback. If the value is false (by
+         * default) then the message is discarded/dead-lettered. If the value 
is
+         * true, then the message is re-queued.
+         * 
+         * The option is a: <code>boolean</code> type.
+         * 
+         * Default: false
+         * Group: consumer
+         */
+        default RabbitMQEndpointConsumerBuilder reQueue(boolean reQueue) {
+            doSetProperty("reQueue", reQueue);
+            return this;
+        }
+        /**
+         * This is used by the consumer to control rejection of the message.
+         * When the consumer is complete processing the exchange, and if the
+         * exchange failed, then the consumer is going to reject the message
+         * from the RabbitMQ broker. If the header CamelRabbitmqRequeue is
+         * present then the value of the header will be used, otherwise this
+         * endpoint value is used as fallback. If the value is false (by
+         * default) then the message is discarded/dead-lettered. If the value 
is
+         * true, then the message is re-queued.
+         * 
+         * The option will be converted to a <code>boolean</code> type.
+         * 
+         * Default: false
+         * Group: consumer
+         */
+        default RabbitMQEndpointConsumerBuilder reQueue(String reQueue) {
+            doSetProperty("reQueue", reQueue);
+            return this;
+        }
+        /**
          * Whether to allow Java serialization of the message body or not. If
          * this value is true, the message body will be serialized on the
          * producer side using Java serialization, if no type converter can
diff --git a/docs/components/modules/ROOT/pages/rabbitmq-component.adoc 
b/docs/components/modules/ROOT/pages/rabbitmq-component.adoc
index f4d9111..1cf378a 100644
--- a/docs/components/modules/ROOT/pages/rabbitmq-component.adoc
+++ b/docs/components/modules/ROOT/pages/rabbitmq-component.adoc
@@ -144,7 +144,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (66 parameters):
+=== Query Parameters (67 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -181,6 +181,7 @@ with the following path and query parameters:
 | *prefetchEnabled* (consumer) | Enables the quality of service on the 
RabbitMQConsumer side. You need to specify the option of prefetchSize, 
prefetchCount, prefetchGlobal at the same time | false | boolean
 | *prefetchGlobal* (consumer) | If the settings should be applied to the 
entire channel rather than each consumer You need to specify the option of 
prefetchSize, prefetchCount, prefetchGlobal at the same time | false | boolean
 | *prefetchSize* (consumer) | The maximum amount of content (measured in 
octets) that the server will deliver, 0 if unlimited. You need to specify the 
option of prefetchSize, prefetchCount, prefetchGlobal at the same time |  | int
+| *reQueue* (consumer) | This is used by the consumer to control rejection of 
the message. When the consumer is complete processing the exchange, and if the 
exchange failed, then the consumer is going to reject the message from the 
RabbitMQ broker. If the header CamelRabbitmqRequeue is present then the value 
of the header will be used, otherwise this endpoint value is used as fallback. 
If the value is false (by default) then the message is discarded/dead-lettered. 
If the value is true, t [...]
 | *exceptionHandler* (consumer) | To let the consumer use a custom 
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this 
option is not in use. By default the consumer will deal with exceptions, that 
will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer 
creates an exchange. There are 3 enums and the value can be one of: InOnly, 
InOut, InOptionalOut |  | ExchangePattern
 | *threadPoolSize* (consumer) | The consumer uses a Thread Pool Executor with 
a fixed number of threads. This setting allows you to set that number of 
threads. | 10 | int

Reply via email to