This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 8b9b27ff24f9934e0287b1db039022c4636f2a83 Author: connormcauliffe-toast <connor.mcauli...@toasttab.com> AuthorDate: Fri Feb 21 16:58:09 2020 -0500 CAMEL-14607: add configurable negativeAckRedeliveryDelayMicros property --- .../camel/component/pulsar/PulsarEndpointConfigurer.java | 2 ++ .../resources/org/apache/camel/component/pulsar/pulsar.json | 1 + components/camel-pulsar/src/main/docs/pulsar-component.adoc | 3 ++- .../component/pulsar/configuration/PulsarConfiguration.java | 12 ++++++++++++ .../pulsar/utils/consumers/CommonCreationStrategyImpl.java | 1 + .../component/pulsar/PulsarConsumerAcknowledgementTest.java | 2 +- 6 files changed, 19 insertions(+), 2 deletions(-) diff --git a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java index 3e23dce..90e004b 100644 --- a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java +++ b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java @@ -29,6 +29,8 @@ public class PulsarEndpointConfigurer extends PropertyConfigurerSupport implemen case "consumerNamePrefix": target.getPulsarConfiguration().setConsumerNamePrefix(property(camelContext, java.lang.String.class, value)); return true; case "consumerqueuesize": case "consumerQueueSize": target.getPulsarConfiguration().setConsumerQueueSize(property(camelContext, int.class, value)); return true; + case "negativeackredeliverydelaymicros": + case "negativeAckRedeliveryDelayMicros": target.getPulsarConfiguration().setNegativeAckRedeliveryDelayMicros(property(camelContext, long.class, value)); return true; case "numberofconsumers": case "numberOfConsumers": target.getPulsarConfiguration().setNumberOfConsumers(property(camelContext, int.class, value)); return true; case "subscriptioninitialposition": diff --git a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json index 33692b4..029a1c2 100644 --- a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json +++ b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json @@ -39,6 +39,7 @@ "consumerName": { "kind": "parameter", "displayName": "Consumer Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "defaultValue": "sole-consumer", "configurationClass": "org.apache.camel.component.pulsar.configuration.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Name of the consumer when subscription is EXCLUSIVE" }, "consumerNamePrefix": { "kind": "parameter", "displayName": "Consumer Name Prefix", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "defaultValue": "cons", "configurationClass": "org.apache.camel.component.pulsar.configuration.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Prefix to add to consumer names when a SHARED or FAILOVER subscription is used" }, "consumerQueueSize": { "kind": "parameter", "displayName": "Consumer Queue Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": "10", "configurationClass": "org.apache.camel.component.pulsar.configuration.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Size of the consumer queue - defaults to 10" }, + "negativeAckRedeliveryDelayMicros": { "kind": "parameter", "displayName": "Negative Ack Redelivery Delay Micros", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": "60000000", "configurationClass": "org.apache.camel.component.pulsar.configuration.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Set the negative acknowledgement delay" }, "numberOfConsumers": { "kind": "parameter", "displayName": "Number Of Consumers", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": "1", "configurationClass": "org.apache.camel.component.pulsar.configuration.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Number of consumers - defaults to 1" }, "subscriptionInitialPosition": { "kind": "parameter", "displayName": "Subscription Initial Position", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition", "enum": [ "EARLIEST", "LATEST" ], "deprecated": false, "secret": false, "defaultValue": "LATEST", "configurationClass": "org.apache.camel.component.pulsar.configuration.PulsarConfiguration", "configurationField": " [...] "subscriptionName": { "kind": "parameter", "displayName": "Subscription Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "defaultValue": "subs", "configurationClass": "org.apache.camel.component.pulsar.configuration.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Name of the subscription to use" }, diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc b/components/camel-pulsar/src/main/docs/pulsar-component.adoc index 029a7e0..2be30b4 100644 --- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc +++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc @@ -74,7 +74,7 @@ with the following path and query parameters: |=== -=== Query Parameters (28 parameters): +=== Query Parameters (29 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -87,6 +87,7 @@ with the following path and query parameters: | *consumerName* (consumer) | Name of the consumer when subscription is EXCLUSIVE | sole-consumer | String | *consumerNamePrefix* (consumer) | Prefix to add to consumer names when a SHARED or FAILOVER subscription is used | cons | String | *consumerQueueSize* (consumer) | Size of the consumer queue - defaults to 10 | 10 | int +| *negativeAckRedeliveryDelay Micros* (consumer) | Set the negative acknowledgement delay | 60000000 | long | *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 | int | *subscriptionInitialPosition* (consumer) | Control the initial position in the topic of a newly created subscription. Default is latest message. The value can be one of: EARLIEST, LATEST | LATEST | SubscriptionInitialPosition | *subscriptionName* (consumer) | Name of the subscription to use | subs | String diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java index a1d3d9a..4ecfd7d 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java @@ -51,6 +51,8 @@ public class PulsarConfiguration { private boolean allowManualAcknowledgement; @UriParam(label = "consumer", defaultValue = "10000") private long ackTimeoutMillis = 10000; + @UriParam(label = "consumer", defaultValue = "60000000") + private long negativeAckRedeliveryDelayMicros = 60000000; @UriParam(label = "consumer", defaultValue = "100") private long ackGroupTimeMillis = 100; @UriParam(label = "consumer", defaultValue = "LATEST") @@ -354,4 +356,14 @@ public class PulsarConfiguration { this.messageRouter = messageRouter; } + public long getNegativeAckRedeliveryDelayMicros() { + return negativeAckRedeliveryDelayMicros; + } + + /** + * Set the negative acknowledgement delay + */ + public void setNegativeAckRedeliveryDelayMicros(long negativeAckRedeliveryDelayMicros) { + this.negativeAckRedeliveryDelayMicros = negativeAckRedeliveryDelayMicros; + } } diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java index e3c2c08..b5c72bc 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java @@ -36,6 +36,7 @@ public final class CommonCreationStrategyImpl { .receiverQueueSize(endpointConfiguration.getConsumerQueueSize()).consumerName(name).ackTimeout(endpointConfiguration.getAckTimeoutMillis(), TimeUnit.MILLISECONDS) .subscriptionInitialPosition(endpointConfiguration.getSubscriptionInitialPosition().toPulsarSubscriptionInitialPosition()) .acknowledgmentGroupTime(endpointConfiguration.getAckGroupTimeMillis(), TimeUnit.MILLISECONDS) + .negativeAckRedeliveryDelay(endpointConfiguration.getNegativeAckRedeliveryDelayMicros(), TimeUnit.MICROSECONDS) .messageListener(new PulsarMessageListener(pulsarEndpoint, pulsarConsumer.getExceptionHandler(), pulsarConsumer.getProcessor())); } } diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java index 180ea03..d2548b0 100644 --- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java @@ -45,7 +45,7 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport { private static final String PRODUCER = "camel-producer-1"; @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive" - + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" + "&allowManualAcknowledgement=true" + "&ackTimeoutMillis=1000") + + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" + "&allowManualAcknowledgement=true" + "&ackTimeoutMillis=1000" + "&negativeAckRedeliveryDelayMicros=100000") private Endpoint from; @EndpointInject("mock:result")