This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 8b9b27ff24f9934e0287b1db039022c4636f2a83
Author: connormcauliffe-toast <connor.mcauli...@toasttab.com>
AuthorDate: Fri Feb 21 16:58:09 2020 -0500

    CAMEL-14607: add configurable negativeAckRedeliveryDelayMicros property
---
 .../camel/component/pulsar/PulsarEndpointConfigurer.java     |  2 ++
 .../resources/org/apache/camel/component/pulsar/pulsar.json  |  1 +
 components/camel-pulsar/src/main/docs/pulsar-component.adoc  |  3 ++-
 .../component/pulsar/configuration/PulsarConfiguration.java  | 12 ++++++++++++
 .../pulsar/utils/consumers/CommonCreationStrategyImpl.java   |  1 +
 .../component/pulsar/PulsarConsumerAcknowledgementTest.java  |  2 +-
 6 files changed, 19 insertions(+), 2 deletions(-)

diff --git 
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
 
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
index 3e23dce..90e004b 100644
--- 
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
+++ 
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
@@ -29,6 +29,8 @@ public class PulsarEndpointConfigurer extends 
PropertyConfigurerSupport implemen
         case "consumerNamePrefix": 
target.getPulsarConfiguration().setConsumerNamePrefix(property(camelContext, 
java.lang.String.class, value)); return true;
         case "consumerqueuesize":
         case "consumerQueueSize": 
target.getPulsarConfiguration().setConsumerQueueSize(property(camelContext, 
int.class, value)); return true;
+        case "negativeackredeliverydelaymicros":
+        case "negativeAckRedeliveryDelayMicros": 
target.getPulsarConfiguration().setNegativeAckRedeliveryDelayMicros(property(camelContext,
 long.class, value)); return true;
         case "numberofconsumers":
         case "numberOfConsumers": 
target.getPulsarConfiguration().setNumberOfConsumers(property(camelContext, 
int.class, value)); return true;
         case "subscriptioninitialposition":
diff --git 
a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
 
b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
index 33692b4..029a1c2 100644
--- 
a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
+++ 
b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
@@ -39,6 +39,7 @@
     "consumerName": { "kind": "parameter", "displayName": "Consumer Name", 
"group": "consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "secret": false, 
"defaultValue": "sole-consumer", "configurationClass": 
"org.apache.camel.component.pulsar.configuration.PulsarConfiguration", 
"configurationField": "pulsarConfiguration", "description": "Name of the 
consumer when subscription is EXCLUSIVE" },
     "consumerNamePrefix": { "kind": "parameter", "displayName": "Consumer Name 
Prefix", "group": "consumer", "label": "consumer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "secret": false, 
"defaultValue": "cons", "configurationClass": 
"org.apache.camel.component.pulsar.configuration.PulsarConfiguration", 
"configurationField": "pulsarConfiguration", "description": "Prefix to add to 
consumer names when a SHARED or FAILOVER subscription is used" },
     "consumerQueueSize": { "kind": "parameter", "displayName": "Consumer Queue 
Size", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "int", "deprecated": false, "secret": false, 
"defaultValue": "10", "configurationClass": 
"org.apache.camel.component.pulsar.configuration.PulsarConfiguration", 
"configurationField": "pulsarConfiguration", "description": "Size of the 
consumer queue - defaults to 10" },
+    "negativeAckRedeliveryDelayMicros": { "kind": "parameter", "displayName": 
"Negative Ack Redelivery Delay Micros", "group": "consumer", "label": 
"consumer", "required": false, "type": "integer", "javaType": "long", 
"deprecated": false, "secret": false, "defaultValue": "60000000", 
"configurationClass": 
"org.apache.camel.component.pulsar.configuration.PulsarConfiguration", 
"configurationField": "pulsarConfiguration", "description": "Set the negative 
acknowledgement delay" },
     "numberOfConsumers": { "kind": "parameter", "displayName": "Number Of 
Consumers", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "int", "deprecated": false, "secret": false, 
"defaultValue": "1", "configurationClass": 
"org.apache.camel.component.pulsar.configuration.PulsarConfiguration", 
"configurationField": "pulsarConfiguration", "description": "Number of 
consumers - defaults to 1" },
     "subscriptionInitialPosition": { "kind": "parameter", "displayName": 
"Subscription Initial Position", "group": "consumer", "label": "consumer", 
"required": false, "type": "object", "javaType": 
"org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition",
 "enum": [ "EARLIEST", "LATEST" ], "deprecated": false, "secret": false, 
"defaultValue": "LATEST", "configurationClass": 
"org.apache.camel.component.pulsar.configuration.PulsarConfiguration", 
"configurationField": " [...]
     "subscriptionName": { "kind": "parameter", "displayName": "Subscription 
Name", "group": "consumer", "label": "consumer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "secret": false, 
"defaultValue": "subs", "configurationClass": 
"org.apache.camel.component.pulsar.configuration.PulsarConfiguration", 
"configurationField": "pulsarConfiguration", "description": "Name of the 
subscription to use" },
diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc 
b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
index 029a7e0..2be30b4 100644
--- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc
+++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
@@ -74,7 +74,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (28 parameters):
+=== Query Parameters (29 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -87,6 +87,7 @@ with the following path and query parameters:
 | *consumerName* (consumer) | Name of the consumer when subscription is 
EXCLUSIVE | sole-consumer | String
 | *consumerNamePrefix* (consumer) | Prefix to add to consumer names when a 
SHARED or FAILOVER subscription is used | cons | String
 | *consumerQueueSize* (consumer) | Size of the consumer queue - defaults to 10 
| 10 | int
+| *negativeAckRedeliveryDelay Micros* (consumer) | Set the negative 
acknowledgement delay | 60000000 | long
 | *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 | 
int
 | *subscriptionInitialPosition* (consumer) | Control the initial position in 
the topic of a newly created subscription. Default is latest message. The value 
can be one of: EARLIEST, LATEST | LATEST | SubscriptionInitialPosition
 | *subscriptionName* (consumer) | Name of the subscription to use | subs | 
String
diff --git 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
index a1d3d9a..4ecfd7d 100644
--- 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
+++ 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
@@ -51,6 +51,8 @@ public class PulsarConfiguration {
     private boolean allowManualAcknowledgement;
     @UriParam(label = "consumer", defaultValue = "10000")
     private long ackTimeoutMillis = 10000;
+    @UriParam(label = "consumer", defaultValue = "60000000")
+    private long negativeAckRedeliveryDelayMicros = 60000000;
     @UriParam(label = "consumer", defaultValue = "100")
     private long ackGroupTimeMillis = 100;
     @UriParam(label = "consumer", defaultValue = "LATEST")
@@ -354,4 +356,14 @@ public class PulsarConfiguration {
         this.messageRouter = messageRouter;
     }
 
+    public long getNegativeAckRedeliveryDelayMicros() {
+        return negativeAckRedeliveryDelayMicros;
+    }
+
+    /**
+     * Set the negative acknowledgement delay
+     */
+    public void setNegativeAckRedeliveryDelayMicros(long 
negativeAckRedeliveryDelayMicros) {
+        this.negativeAckRedeliveryDelayMicros = 
negativeAckRedeliveryDelayMicros;
+    }
 }
diff --git 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
index e3c2c08..b5c72bc 100644
--- 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
+++ 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
@@ -36,6 +36,7 @@ public final class CommonCreationStrategyImpl {
             
.receiverQueueSize(endpointConfiguration.getConsumerQueueSize()).consumerName(name).ackTimeout(endpointConfiguration.getAckTimeoutMillis(),
 TimeUnit.MILLISECONDS)
             
.subscriptionInitialPosition(endpointConfiguration.getSubscriptionInitialPosition().toPulsarSubscriptionInitialPosition())
             
.acknowledgmentGroupTime(endpointConfiguration.getAckGroupTimeMillis(), 
TimeUnit.MILLISECONDS)
+            
.negativeAckRedeliveryDelay(endpointConfiguration.getNegativeAckRedeliveryDelayMicros(),
 TimeUnit.MICROSECONDS)
             .messageListener(new PulsarMessageListener(pulsarEndpoint, 
pulsarConsumer.getExceptionHandler(), pulsarConsumer.getProcessor()));
     }
 }
diff --git 
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
 
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
index 180ea03..d2548b0 100644
--- 
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
+++ 
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
@@ -45,7 +45,7 @@ public class PulsarConsumerAcknowledgementTest extends 
PulsarTestSupport {
     private static final String PRODUCER = "camel-producer-1";
 
     @EndpointInject("pulsar:" + TOPIC_URI + 
"?numberOfConsumers=1&subscriptionType=Exclusive"
-                          + 
"&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
 + "&allowManualAcknowledgement=true" + "&ackTimeoutMillis=1000")
+                          + 
"&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
 + "&allowManualAcknowledgement=true" + "&ackTimeoutMillis=1000" + 
"&negativeAckRedeliveryDelayMicros=100000")
     private Endpoint from;
 
     @EndpointInject("mock:result")

Reply via email to