This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 60fb3bf CAMEL-14608: add support for dead letter policies to camel-pulsar (#3595) 60fb3bf is described below commit 60fb3bfd056e7aab4545b122ae1bc4034ccc9a9c Author: Connor McAuliffe <connor.mcauli...@toasttab.com> AuthorDate: Tue Feb 25 02:15:03 2020 -0500 CAMEL-14608: add support for dead letter policies to camel-pulsar (#3595) * CAMEL-14608: add support for dead letter policies to camel-pulsar * CAMEL-14608: address documentation issues --- .../component/pulsar/PulsarEndpointConfigurer.java | 4 + .../org/apache/camel/component/pulsar/pulsar.json | 2 + .../src/main/docs/pulsar-component.adoc | 4 +- .../pulsar/configuration/PulsarConfiguration.java | 19 +++ .../consumers/CommonCreationStrategyImpl.java | 15 +- .../pulsar/PulsarConsumerDeadLetterPolicyTest.java | 169 +++++++++++++++++++++ 6 files changed, 211 insertions(+), 2 deletions(-) diff --git a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java index 90e004b..3f6dd0c 100644 --- a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java +++ b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java @@ -31,6 +31,10 @@ public class PulsarEndpointConfigurer extends PropertyConfigurerSupport implemen case "consumerQueueSize": target.getPulsarConfiguration().setConsumerQueueSize(property(camelContext, int.class, value)); return true; case "negativeackredeliverydelaymicros": case "negativeAckRedeliveryDelayMicros": target.getPulsarConfiguration().setNegativeAckRedeliveryDelayMicros(property(camelContext, long.class, value)); return true; + case "deadlettertopic": + case "deadLetterTopic": target.getPulsarConfiguration().setDeadLetterTopic(property(camelContext, java.lang.String.class, value)); return true; + case "maxredelivercount": + case "maxRedeliverCount": target.getPulsarConfiguration().setMaxRedeliverCount(property(camelContext, java.lang.Integer.class, value)); return true; case "numberofconsumers": case "numberOfConsumers": target.getPulsarConfiguration().setNumberOfConsumers(property(camelContext, int.class, value)); return true; case "subscriptioninitialposition": diff --git a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json index 029a1c2..04f8db9 100644 --- a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json +++ b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json @@ -40,6 +40,8 @@ "consumerNamePrefix": { "kind": "parameter", "displayName": "Consumer Name Prefix", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "defaultValue": "cons", "configurationClass": "org.apache.camel.component.pulsar.configuration.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Prefix to add to consumer names when a SHARED or FAILOVER subscription is used" }, "consumerQueueSize": { "kind": "parameter", "displayName": "Consumer Queue Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": "10", "configurationClass": "org.apache.camel.component.pulsar.configuration.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Size of the consumer queue - defaults to 10" }, "negativeAckRedeliveryDelayMicros": { "kind": "parameter", "displayName": "Negative Ack Redelivery Delay Micros", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": "60000000", "configurationClass": "org.apache.camel.component.pulsar.configuration.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Set the negative acknowledgement delay" }, + "deadLetterTopic": { "kind": "parameter", "displayName": "Dead Letter Topic", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.configuration.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Name of the topic where the messages which fail maxRedeliverCount times will be sent. Note: if not set, def [...] + "maxRedeliverCount": { "kind": "parameter", "displayName": "Max Redeliver Count", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.configuration.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Maximum number of times that a message will be redelivered before being sent to the dead letter queue [...] "numberOfConsumers": { "kind": "parameter", "displayName": "Number Of Consumers", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": "1", "configurationClass": "org.apache.camel.component.pulsar.configuration.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Number of consumers - defaults to 1" }, "subscriptionInitialPosition": { "kind": "parameter", "displayName": "Subscription Initial Position", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition", "enum": [ "EARLIEST", "LATEST" ], "deprecated": false, "secret": false, "defaultValue": "LATEST", "configurationClass": "org.apache.camel.component.pulsar.configuration.PulsarConfiguration", "configurationField": " [...] "subscriptionName": { "kind": "parameter", "displayName": "Subscription Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "defaultValue": "subs", "configurationClass": "org.apache.camel.component.pulsar.configuration.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Name of the subscription to use" }, diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc b/components/camel-pulsar/src/main/docs/pulsar-component.adoc index 2be30b4..517af8e 100644 --- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc +++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc @@ -74,7 +74,7 @@ with the following path and query parameters: |=== -=== Query Parameters (29 parameters): +=== Query Parameters (31 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -88,6 +88,8 @@ with the following path and query parameters: | *consumerNamePrefix* (consumer) | Prefix to add to consumer names when a SHARED or FAILOVER subscription is used | cons | String | *consumerQueueSize* (consumer) | Size of the consumer queue - defaults to 10 | 10 | int | *negativeAckRedeliveryDelay Micros* (consumer) | Set the negative acknowledgement delay | 60000000 | long +| *deadLetterTopic* (consumer) | Name of the topic where the messages which fail maxRedeliverCount times will be sent. Note: if not set, default topic name will be topicName-subscriptionName-DLQ | | String +| *maxRedeliverCount* (consumer) | Maximum number of times that a message will be redelivered before being sent to the dead letter queue. If this value is not set, no Dead Letter Policy will be created | | Integer | *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 | int | *subscriptionInitialPosition* (consumer) | Control the initial position in the topic of a newly created subscription. Default is latest message. The value can be one of: EARLIEST, LATEST | LATEST | SubscriptionInitialPosition | *subscriptionName* (consumer) | Name of the subscription to use | subs | String diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java index 4ecfd7d..c73fd17 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java @@ -57,6 +57,10 @@ public class PulsarConfiguration { private long ackGroupTimeMillis = 100; @UriParam(label = "consumer", defaultValue = "LATEST") private SubscriptionInitialPosition subscriptionInitialPosition = LATEST; + @UriParam(label = "consumer", description = "Maximum number of times that a message will be redelivered before being sent to the dead letter queue. If this value is not set, no Dead Letter Policy will be created") + private Integer maxRedeliverCount; + @UriParam(label = "consumer", description = "Name of the topic where the messages which fail maxRedeliverCount times will be sent. Note: if not set, default topic name will be topicName-subscriptionName-DLQ") + private String deadLetterTopic; @UriParam(label = "producer", description = "Send timeout in milliseconds", defaultValue = "30000") private int sendTimeoutMs = 30000; @UriParam(label = "producer", description = "Whether to block the producing thread if pending messages queue is full or to throw a ProducerQueueIsFullError", defaultValue = "false") @@ -366,4 +370,19 @@ public class PulsarConfiguration { public void setNegativeAckRedeliveryDelayMicros(long negativeAckRedeliveryDelayMicros) { this.negativeAckRedeliveryDelayMicros = negativeAckRedeliveryDelayMicros; } + public Integer getMaxRedeliverCount() { + return maxRedeliverCount; + } + + public void setMaxRedeliverCount(Integer maxRedeliverCount) { + this.maxRedeliverCount = maxRedeliverCount; + } + + public String getDeadLetterTopic() { + return deadLetterTopic; + } + + public void setDeadLetterTopic(String deadLetterTopic) { + this.deadLetterTopic = deadLetterTopic; + } } diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java index b5c72bc..3d4913e 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java @@ -23,6 +23,8 @@ import org.apache.camel.component.pulsar.PulsarEndpoint; import org.apache.camel.component.pulsar.PulsarMessageListener; import org.apache.camel.component.pulsar.configuration.PulsarConfiguration; import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.DeadLetterPolicy; +import org.apache.pulsar.client.api.DeadLetterPolicy.DeadLetterPolicyBuilder; public final class CommonCreationStrategyImpl { @@ -32,11 +34,22 @@ public final class CommonCreationStrategyImpl { public static ConsumerBuilder<byte[]> create(final String name, final PulsarEndpoint pulsarEndpoint, final PulsarConsumer pulsarConsumer) { final PulsarConfiguration endpointConfiguration = pulsarEndpoint.getPulsarConfiguration(); - return pulsarEndpoint.getPulsarClient().newConsumer().topic(pulsarEndpoint.getUri()).subscriptionName(endpointConfiguration.getSubscriptionName()) + ConsumerBuilder<byte[]> builder = pulsarEndpoint.getPulsarClient().newConsumer().topic(pulsarEndpoint.getUri()).subscriptionName(endpointConfiguration.getSubscriptionName()) .receiverQueueSize(endpointConfiguration.getConsumerQueueSize()).consumerName(name).ackTimeout(endpointConfiguration.getAckTimeoutMillis(), TimeUnit.MILLISECONDS) .subscriptionInitialPosition(endpointConfiguration.getSubscriptionInitialPosition().toPulsarSubscriptionInitialPosition()) .acknowledgmentGroupTime(endpointConfiguration.getAckGroupTimeMillis(), TimeUnit.MILLISECONDS) .negativeAckRedeliveryDelay(endpointConfiguration.getNegativeAckRedeliveryDelayMicros(), TimeUnit.MICROSECONDS) .messageListener(new PulsarMessageListener(pulsarEndpoint, pulsarConsumer.getExceptionHandler(), pulsarConsumer.getProcessor())); + + if (endpointConfiguration.getMaxRedeliverCount() != null) { + DeadLetterPolicyBuilder policy = DeadLetterPolicy.builder() + .maxRedeliverCount(endpointConfiguration.getMaxRedeliverCount()); + if (endpointConfiguration.getDeadLetterTopic() != null) { + policy.deadLetterTopic(endpointConfiguration.getDeadLetterTopic()); + } + + builder.deadLetterPolicy(policy.build()); + } + return builder; } } diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerDeadLetterPolicyTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerDeadLetterPolicyTest.java new file mode 100644 index 0000000..9c171a0 --- /dev/null +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerDeadLetterPolicyTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.pulsar; + + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.pulsar.utils.AutoConfiguration; +import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders; +import org.apache.camel.spi.Registry; +import org.apache.camel.support.SimpleRegistry; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.junit.Before; +import org.junit.Test; + +public class PulsarConsumerDeadLetterPolicyTest extends PulsarTestSupport { + + private static final String TOPIC_URI = "persistent://public/default/camel-topic"; + private static final String PRODUCER = "camel-producer-1"; + + @EndpointInject("mock:result") + private MockEndpoint to; + + @EndpointInject("mock:deadLetter") + private MockEndpoint deadLetter; + + private Producer<String> producer; + + @Override + protected Registry createCamelRegistry() throws Exception { + Registry registry = new SimpleRegistry(); + + registerPulsarBeans(registry); + + return registry; + } + + private void registerPulsarBeans(final Registry registry) throws PulsarClientException { + PulsarClient pulsarClient = givenPulsarClient(); + AutoConfiguration autoConfiguration = new AutoConfiguration(null, null); + + registry.bind("pulsarClient", pulsarClient); + PulsarComponent comp = new PulsarComponent(context); + comp.setAutoConfiguration(autoConfiguration); + comp.setPulsarClient(pulsarClient); + registry.bind("pulsar", comp); + } + + @Before + public void buildProducer() throws PulsarClientException { + try { + context.removeRoute("myRoute"); + context.removeRoute("myDeadLetterRoute"); + } catch (Exception ignored) { + + } + producer = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create(); + + } + + @Test + public void givenNoMaxRedeliverCountAndDeadLetterTopicverifyValuesAreNull() throws Exception { + PulsarComponent component = context.getComponent("pulsar", PulsarComponent.class); + + PulsarEndpoint endpoint = (PulsarEndpoint) component.createEndpoint("pulsar:" + TOPIC_URI); + + assertNull(endpoint.getPulsarConfiguration().getMaxRedeliverCount()); + assertNull(endpoint.getPulsarConfiguration().getDeadLetterTopic()); + } + + @Test + public void givenMaxRedeliverCountverifyMessageGetsSentToDefaultDeadLetterTopicAfterCountExceeded() + throws Exception { + PulsarComponent component = context.getComponent("pulsar", PulsarComponent.class); + + PulsarEndpoint from = (PulsarEndpoint) component.createEndpoint("pulsar:" + TOPIC_URI + "?maxRedeliverCount=5&subscriptionType=Shared&allowManualAcknowledgement=true&ackTimeoutMillis=1000"); + PulsarEndpoint deadLetterFrom = (PulsarEndpoint) component.createEndpoint("pulsar:" + TOPIC_URI + "-subs-DLQ"); + + to.expectedMessageCount(5); + deadLetter.expectedMessageCount(1); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from(from).routeId("myRoute").to(to); + + from(deadLetterFrom).routeId("myDeadLetterRoute").to(deadLetter); + } + }); + producer.send("Hello World!"); + + assertMockEndpointsSatisfied(10, TimeUnit.SECONDS); + } + + @Test + public void givenMaxRedeliverCountAndDeadLetterTopicverifyMessageGetsSentToSpecifiedDeadLetterTopicAfterCountExceeded() throws Exception { + PulsarComponent component = context.getComponent("pulsar", PulsarComponent.class); + + PulsarEndpoint from = (PulsarEndpoint) component.createEndpoint("pulsar:" + TOPIC_URI + "?maxRedeliverCount=5&deadLetterTopic=customTopic&subscriptionType=Shared&allowManualAcknowledgement=true&ackTimeoutMillis=1000"); + PulsarEndpoint deadLetterFrom = (PulsarEndpoint) component.createEndpoint("pulsar:persistent://public/default/customTopic"); + + to.expectedMessageCount(5); + deadLetter.expectedMessageCount(1); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from(from).routeId("myRoute").to(to); + + from(deadLetterFrom).routeId("myDeadLetterRoute").to(deadLetter); + } + }); + + producer.send("Hello World!"); + assertMockEndpointsSatisfied(10, TimeUnit.SECONDS); + } + + @Test + public void givenOnlyDeadLetterTopicverifyMessageDoesNotGetSentToSpecifiedTopic() throws Exception { + PulsarComponent component = context.getComponent("pulsar", PulsarComponent.class); + + PulsarEndpoint from = (PulsarEndpoint) component.createEndpoint("pulsar:" + TOPIC_URI + "?maxRedeliverCount=5&deadLetterTopic=customTopic&subscriptionType=Shared&allowManualAcknowledgement=true&ackTimeoutMillis=1000"); + PulsarEndpoint deadLetterFrom = (PulsarEndpoint) component.createEndpoint("pulsar:persistent://public/default/customTopic"); + + to.expectedMessageCount(6); + deadLetter.expectedMessageCount(0); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from(from).routeId("myRoute").to(to).process(exchange -> { + Integer tries = exchange.getProperty("retryCount", 1, Integer.class); + if (tries >= 6) { + PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT); + receipt.acknowledge(); + } + exchange.setProperty("retryCount", tries + 1); + }); + + from(deadLetterFrom).routeId("myDeadLetterRoute").to(deadLetter); + } + }); + + producer.send("Hello World!"); + assertMockEndpointsSatisfied(10, TimeUnit.SECONDS); + } + + private PulsarClient givenPulsarClient() throws PulsarClientException { + return new ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build(); + } +}