This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 60fb3bf  CAMEL-14608: add support for dead letter policies to 
camel-pulsar (#3595)
60fb3bf is described below

commit 60fb3bfd056e7aab4545b122ae1bc4034ccc9a9c
Author: Connor McAuliffe <connor.mcauli...@toasttab.com>
AuthorDate: Tue Feb 25 02:15:03 2020 -0500

    CAMEL-14608: add support for dead letter policies to camel-pulsar (#3595)
    
    * CAMEL-14608: add support for dead letter policies to camel-pulsar
    
    * CAMEL-14608: address documentation issues
---
 .../component/pulsar/PulsarEndpointConfigurer.java |   4 +
 .../org/apache/camel/component/pulsar/pulsar.json  |   2 +
 .../src/main/docs/pulsar-component.adoc            |   4 +-
 .../pulsar/configuration/PulsarConfiguration.java  |  19 +++
 .../consumers/CommonCreationStrategyImpl.java      |  15 +-
 .../pulsar/PulsarConsumerDeadLetterPolicyTest.java | 169 +++++++++++++++++++++
 6 files changed, 211 insertions(+), 2 deletions(-)

diff --git 
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
 
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
index 90e004b..3f6dd0c 100644
--- 
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
+++ 
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
@@ -31,6 +31,10 @@ public class PulsarEndpointConfigurer extends 
PropertyConfigurerSupport implemen
         case "consumerQueueSize": 
target.getPulsarConfiguration().setConsumerQueueSize(property(camelContext, 
int.class, value)); return true;
         case "negativeackredeliverydelaymicros":
         case "negativeAckRedeliveryDelayMicros": 
target.getPulsarConfiguration().setNegativeAckRedeliveryDelayMicros(property(camelContext,
 long.class, value)); return true;
+        case "deadlettertopic":
+        case "deadLetterTopic": 
target.getPulsarConfiguration().setDeadLetterTopic(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "maxredelivercount":
+        case "maxRedeliverCount": 
target.getPulsarConfiguration().setMaxRedeliverCount(property(camelContext, 
java.lang.Integer.class, value)); return true;
         case "numberofconsumers":
         case "numberOfConsumers": 
target.getPulsarConfiguration().setNumberOfConsumers(property(camelContext, 
int.class, value)); return true;
         case "subscriptioninitialposition":
diff --git 
a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
 
b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
index 029a1c2..04f8db9 100644
--- 
a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
+++ 
b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
@@ -40,6 +40,8 @@
     "consumerNamePrefix": { "kind": "parameter", "displayName": "Consumer Name 
Prefix", "group": "consumer", "label": "consumer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "secret": false, 
"defaultValue": "cons", "configurationClass": 
"org.apache.camel.component.pulsar.configuration.PulsarConfiguration", 
"configurationField": "pulsarConfiguration", "description": "Prefix to add to 
consumer names when a SHARED or FAILOVER subscription is used" },
     "consumerQueueSize": { "kind": "parameter", "displayName": "Consumer Queue 
Size", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "int", "deprecated": false, "secret": false, 
"defaultValue": "10", "configurationClass": 
"org.apache.camel.component.pulsar.configuration.PulsarConfiguration", 
"configurationField": "pulsarConfiguration", "description": "Size of the 
consumer queue - defaults to 10" },
     "negativeAckRedeliveryDelayMicros": { "kind": "parameter", "displayName": 
"Negative Ack Redelivery Delay Micros", "group": "consumer", "label": 
"consumer", "required": false, "type": "integer", "javaType": "long", 
"deprecated": false, "secret": false, "defaultValue": "60000000", 
"configurationClass": 
"org.apache.camel.component.pulsar.configuration.PulsarConfiguration", 
"configurationField": "pulsarConfiguration", "description": "Set the negative 
acknowledgement delay" },
+    "deadLetterTopic": { "kind": "parameter", "displayName": "Dead Letter 
Topic", "group": "consumer", "label": "consumer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "secret": false, 
"configurationClass": 
"org.apache.camel.component.pulsar.configuration.PulsarConfiguration", 
"configurationField": "pulsarConfiguration", "description": "Name of the topic 
where the messages which fail maxRedeliverCount times will be sent. Note: if 
not set, def [...]
+    "maxRedeliverCount": { "kind": "parameter", "displayName": "Max Redeliver 
Count", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "secret": 
false, "configurationClass": 
"org.apache.camel.component.pulsar.configuration.PulsarConfiguration", 
"configurationField": "pulsarConfiguration", "description": "Maximum number of 
times that a message will be redelivered before being sent to the dead letter 
queue [...]
     "numberOfConsumers": { "kind": "parameter", "displayName": "Number Of 
Consumers", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "int", "deprecated": false, "secret": false, 
"defaultValue": "1", "configurationClass": 
"org.apache.camel.component.pulsar.configuration.PulsarConfiguration", 
"configurationField": "pulsarConfiguration", "description": "Number of 
consumers - defaults to 1" },
     "subscriptionInitialPosition": { "kind": "parameter", "displayName": 
"Subscription Initial Position", "group": "consumer", "label": "consumer", 
"required": false, "type": "object", "javaType": 
"org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition",
 "enum": [ "EARLIEST", "LATEST" ], "deprecated": false, "secret": false, 
"defaultValue": "LATEST", "configurationClass": 
"org.apache.camel.component.pulsar.configuration.PulsarConfiguration", 
"configurationField": " [...]
     "subscriptionName": { "kind": "parameter", "displayName": "Subscription 
Name", "group": "consumer", "label": "consumer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "secret": false, 
"defaultValue": "subs", "configurationClass": 
"org.apache.camel.component.pulsar.configuration.PulsarConfiguration", 
"configurationField": "pulsarConfiguration", "description": "Name of the 
subscription to use" },
diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc 
b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
index 2be30b4..517af8e 100644
--- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc
+++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
@@ -74,7 +74,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (29 parameters):
+=== Query Parameters (31 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -88,6 +88,8 @@ with the following path and query parameters:
 | *consumerNamePrefix* (consumer) | Prefix to add to consumer names when a 
SHARED or FAILOVER subscription is used | cons | String
 | *consumerQueueSize* (consumer) | Size of the consumer queue - defaults to 10 
| 10 | int
 | *negativeAckRedeliveryDelay Micros* (consumer) | Set the negative 
acknowledgement delay | 60000000 | long
+| *deadLetterTopic* (consumer) | Name of the topic where the messages which 
fail maxRedeliverCount times will be sent. Note: if not set, default topic name 
will be topicName-subscriptionName-DLQ |  | String
+| *maxRedeliverCount* (consumer) | Maximum number of times that a message will 
be redelivered before being sent to the dead letter queue. If this value is not 
set, no Dead Letter Policy will be created |  | Integer
 | *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 | 
int
 | *subscriptionInitialPosition* (consumer) | Control the initial position in 
the topic of a newly created subscription. Default is latest message. The value 
can be one of: EARLIEST, LATEST | LATEST | SubscriptionInitialPosition
 | *subscriptionName* (consumer) | Name of the subscription to use | subs | 
String
diff --git 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
index 4ecfd7d..c73fd17 100644
--- 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
+++ 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
@@ -57,6 +57,10 @@ public class PulsarConfiguration {
     private long ackGroupTimeMillis = 100;
     @UriParam(label = "consumer", defaultValue = "LATEST")
     private SubscriptionInitialPosition subscriptionInitialPosition = LATEST;
+    @UriParam(label = "consumer", description = "Maximum number of times that 
a message will be redelivered before being sent to the dead letter queue. If 
this value is not set, no Dead Letter Policy will be created")
+    private Integer maxRedeliverCount;
+    @UriParam(label = "consumer", description = "Name of the topic where the 
messages which fail maxRedeliverCount times will be sent. Note: if not set, 
default topic name will be topicName-subscriptionName-DLQ")
+    private String deadLetterTopic;
     @UriParam(label = "producer", description = "Send timeout in 
milliseconds", defaultValue = "30000")
     private int sendTimeoutMs = 30000;
     @UriParam(label = "producer", description = "Whether to block the 
producing thread if pending messages queue is full or to throw a 
ProducerQueueIsFullError", defaultValue = "false")
@@ -366,4 +370,19 @@ public class PulsarConfiguration {
     public void setNegativeAckRedeliveryDelayMicros(long 
negativeAckRedeliveryDelayMicros) {
         this.negativeAckRedeliveryDelayMicros = 
negativeAckRedeliveryDelayMicros;
     }
+    public Integer getMaxRedeliverCount() {
+        return maxRedeliverCount;
+    }
+
+    public void setMaxRedeliverCount(Integer maxRedeliverCount) {
+        this.maxRedeliverCount = maxRedeliverCount;
+    }
+
+    public String getDeadLetterTopic() {
+        return deadLetterTopic;
+    }
+
+    public void setDeadLetterTopic(String deadLetterTopic) {
+        this.deadLetterTopic = deadLetterTopic;
+    }
 }
diff --git 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
index b5c72bc..3d4913e 100644
--- 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
+++ 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
@@ -23,6 +23,8 @@ import org.apache.camel.component.pulsar.PulsarEndpoint;
 import org.apache.camel.component.pulsar.PulsarMessageListener;
 import org.apache.camel.component.pulsar.configuration.PulsarConfiguration;
 import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.DeadLetterPolicy.DeadLetterPolicyBuilder;
 
 public final class CommonCreationStrategyImpl {
 
@@ -32,11 +34,22 @@ public final class CommonCreationStrategyImpl {
     public static ConsumerBuilder<byte[]> create(final String name, final 
PulsarEndpoint pulsarEndpoint, final PulsarConsumer pulsarConsumer) {
         final PulsarConfiguration endpointConfiguration = 
pulsarEndpoint.getPulsarConfiguration();
 
-        return 
pulsarEndpoint.getPulsarClient().newConsumer().topic(pulsarEndpoint.getUri()).subscriptionName(endpointConfiguration.getSubscriptionName())
+        ConsumerBuilder<byte[]> builder = 
pulsarEndpoint.getPulsarClient().newConsumer().topic(pulsarEndpoint.getUri()).subscriptionName(endpointConfiguration.getSubscriptionName())
             
.receiverQueueSize(endpointConfiguration.getConsumerQueueSize()).consumerName(name).ackTimeout(endpointConfiguration.getAckTimeoutMillis(),
 TimeUnit.MILLISECONDS)
             
.subscriptionInitialPosition(endpointConfiguration.getSubscriptionInitialPosition().toPulsarSubscriptionInitialPosition())
             
.acknowledgmentGroupTime(endpointConfiguration.getAckGroupTimeMillis(), 
TimeUnit.MILLISECONDS)
             
.negativeAckRedeliveryDelay(endpointConfiguration.getNegativeAckRedeliveryDelayMicros(),
 TimeUnit.MICROSECONDS)
             .messageListener(new PulsarMessageListener(pulsarEndpoint, 
pulsarConsumer.getExceptionHandler(), pulsarConsumer.getProcessor()));
+
+        if (endpointConfiguration.getMaxRedeliverCount() != null) {
+            DeadLetterPolicyBuilder policy = DeadLetterPolicy.builder()
+                    
.maxRedeliverCount(endpointConfiguration.getMaxRedeliverCount());
+            if (endpointConfiguration.getDeadLetterTopic() != null) {
+                
policy.deadLetterTopic(endpointConfiguration.getDeadLetterTopic());
+            }
+
+            builder.deadLetterPolicy(policy.build());
+        }
+        return builder;
     }
 }
diff --git 
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerDeadLetterPolicyTest.java
 
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerDeadLetterPolicyTest.java
new file mode 100644
index 0000000..9c171a0
--- /dev/null
+++ 
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerDeadLetterPolicyTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.pulsar;
+
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.pulsar.utils.AutoConfiguration;
+import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
+import org.apache.camel.spi.Registry;
+import org.apache.camel.support.SimpleRegistry;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PulsarConsumerDeadLetterPolicyTest extends PulsarTestSupport {
+
+    private static final String TOPIC_URI = 
"persistent://public/default/camel-topic";
+    private static final String PRODUCER = "camel-producer-1";
+
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    @EndpointInject("mock:deadLetter")
+    private MockEndpoint deadLetter;
+
+    private Producer<String> producer;
+
+    @Override
+    protected Registry createCamelRegistry() throws Exception {
+        Registry registry = new SimpleRegistry();
+
+        registerPulsarBeans(registry);
+
+        return registry;
+    }
+
+    private void registerPulsarBeans(final Registry registry) throws 
PulsarClientException {
+        PulsarClient pulsarClient = givenPulsarClient();
+        AutoConfiguration autoConfiguration = new AutoConfiguration(null, 
null);
+
+        registry.bind("pulsarClient", pulsarClient);
+        PulsarComponent comp = new PulsarComponent(context);
+        comp.setAutoConfiguration(autoConfiguration);
+        comp.setPulsarClient(pulsarClient);
+        registry.bind("pulsar", comp);
+    }
+
+    @Before
+    public void buildProducer() throws PulsarClientException {
+        try {
+            context.removeRoute("myRoute");
+            context.removeRoute("myDeadLetterRoute");
+        } catch (Exception ignored) {
+
+        }
+        producer = 
givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create();
+
+    }
+
+    @Test
+    public void 
givenNoMaxRedeliverCountAndDeadLetterTopicverifyValuesAreNull() throws 
Exception {
+        PulsarComponent component = context.getComponent("pulsar", 
PulsarComponent.class);
+
+        PulsarEndpoint endpoint = (PulsarEndpoint) 
component.createEndpoint("pulsar:" + TOPIC_URI);
+
+        assertNull(endpoint.getPulsarConfiguration().getMaxRedeliverCount());
+        assertNull(endpoint.getPulsarConfiguration().getDeadLetterTopic());
+    }
+
+    @Test
+    public void 
givenMaxRedeliverCountverifyMessageGetsSentToDefaultDeadLetterTopicAfterCountExceeded()
+            throws Exception {
+        PulsarComponent component = context.getComponent("pulsar", 
PulsarComponent.class);
+
+        PulsarEndpoint from = (PulsarEndpoint) 
component.createEndpoint("pulsar:" + TOPIC_URI + 
"?maxRedeliverCount=5&subscriptionType=Shared&allowManualAcknowledgement=true&ackTimeoutMillis=1000");
+        PulsarEndpoint deadLetterFrom = (PulsarEndpoint) 
component.createEndpoint("pulsar:" + TOPIC_URI + "-subs-DLQ");
+
+        to.expectedMessageCount(5);
+        deadLetter.expectedMessageCount(1);
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() {
+                from(from).routeId("myRoute").to(to);
+
+                
from(deadLetterFrom).routeId("myDeadLetterRoute").to(deadLetter);
+            }
+        });
+        producer.send("Hello World!");
+
+        assertMockEndpointsSatisfied(10, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void 
givenMaxRedeliverCountAndDeadLetterTopicverifyMessageGetsSentToSpecifiedDeadLetterTopicAfterCountExceeded()
 throws Exception {
+        PulsarComponent component = context.getComponent("pulsar", 
PulsarComponent.class);
+
+        PulsarEndpoint from = (PulsarEndpoint) 
component.createEndpoint("pulsar:" + TOPIC_URI + 
"?maxRedeliverCount=5&deadLetterTopic=customTopic&subscriptionType=Shared&allowManualAcknowledgement=true&ackTimeoutMillis=1000");
+        PulsarEndpoint deadLetterFrom = (PulsarEndpoint) 
component.createEndpoint("pulsar:persistent://public/default/customTopic");
+
+        to.expectedMessageCount(5);
+        deadLetter.expectedMessageCount(1);
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() {
+                from(from).routeId("myRoute").to(to);
+
+                
from(deadLetterFrom).routeId("myDeadLetterRoute").to(deadLetter);
+            }
+        });
+
+        producer.send("Hello World!");
+        assertMockEndpointsSatisfied(10, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void 
givenOnlyDeadLetterTopicverifyMessageDoesNotGetSentToSpecifiedTopic() throws 
Exception {
+        PulsarComponent component = context.getComponent("pulsar", 
PulsarComponent.class);
+
+        PulsarEndpoint from = (PulsarEndpoint) 
component.createEndpoint("pulsar:" + TOPIC_URI + 
"?maxRedeliverCount=5&deadLetterTopic=customTopic&subscriptionType=Shared&allowManualAcknowledgement=true&ackTimeoutMillis=1000");
+        PulsarEndpoint deadLetterFrom = (PulsarEndpoint) 
component.createEndpoint("pulsar:persistent://public/default/customTopic");
+
+        to.expectedMessageCount(6);
+        deadLetter.expectedMessageCount(0);
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() {
+                from(from).routeId("myRoute").to(to).process(exchange -> {
+                    Integer tries = exchange.getProperty("retryCount", 1, 
Integer.class);
+                    if (tries >= 6) {
+                        PulsarMessageReceipt receipt = (PulsarMessageReceipt) 
exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+                        receipt.acknowledge();
+                    }
+                    exchange.setProperty("retryCount", tries + 1);
+                });
+
+                
from(deadLetterFrom).routeId("myDeadLetterRoute").to(deadLetter);
+            }
+        });
+
+        producer.send("Hello World!");
+        assertMockEndpointsSatisfied(10, TimeUnit.SECONDS);
+    }
+
+    private PulsarClient givenPulsarClient() throws PulsarClientException {
+        return new 
ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build();
+    }
+}

Reply via email to