This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new c736c07 CAMEL-15772: camel-pulsar readCompacted support (#4535) c736c07 is described below commit c736c07b9f82e93f8f3ca1edb9be5ba6de5bfc24 Author: ludovic-boutros <boutr...@gmail.com> AuthorDate: Fri Oct 30 05:56:29 2020 +0100 CAMEL-15772: camel-pulsar readCompacted support (#4535) --- .../apache/camel/catalog/components/pulsar.json | 2 + .../camel/catalog/docs/pulsar-component.adoc | 6 +- .../pulsar/PulsarComponentConfigurer.java | 5 + .../component/pulsar/PulsarEndpointConfigurer.java | 5 + .../component/pulsar/PulsarEndpointUriFactory.java | 3 +- .../org/apache/camel/component/pulsar/pulsar.json | 2 + .../src/main/docs/pulsar-component.adoc | 6 +- .../component/pulsar/PulsarConfiguration.java | 13 ++ .../consumers/CommonCreationStrategyImpl.java | 3 +- .../component/pulsar/PulsarComponentTest.java | 1 + .../pulsar/PulsarConsumerReadCompactedTest.java | 155 +++++++++++++++++++++ .../dsl/PulsarComponentBuilderFactory.java | 13 ++ .../endpoint/dsl/PulsarEndpointBuilderFactory.java | 25 ++++ .../modules/ROOT/pages/pulsar-component.adoc | 6 +- 14 files changed, 237 insertions(+), 8 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pulsar.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pulsar.json index ffcc9c5..879c934 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pulsar.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pulsar.json @@ -34,6 +34,7 @@ "maxRedeliverCount": { "kind": "property", "displayName": "Max Redeliver Count", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Maximum number of times that a message will be redelivered before being sent to the dead letter queue. If this value is no [...] "negativeAckRedeliveryDelayMicros": { "kind": "property", "displayName": "Negative Ack Redelivery Delay Micros", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": 60000000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Set the negative acknowledgement delay" }, "numberOfConsumers": { "kind": "property", "displayName": "Number Of Consumers", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": 1, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Number of consumers - defaults to 1" }, + "readCompacted": { "kind": "property", "displayName": "Read Compacted", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Enable compacted topic reading." }, "subscriptionInitialPosition": { "kind": "property", "displayName": "Subscription Initial Position", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition", "enum": [ "EARLIEST", "LATEST" ], "deprecated": false, "secret": false, "defaultValue": "LATEST", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", [...] "subscriptionName": { "kind": "property", "displayName": "Subscription Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "defaultValue": "subs", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Name of the subscription to use" }, "subscriptionTopicsMode": { "kind": "property", "displayName": "Subscription Topics Mode", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.pulsar.client.api.RegexSubscriptionMode", "enum": [ "PersistentOnly", "NonPersistentOnly", "AllTopics" ], "deprecated": false, "secret": false, "defaultValue": "PersistentOnly", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration" [...] @@ -74,6 +75,7 @@ "maxRedeliverCount": { "kind": "parameter", "displayName": "Max Redeliver Count", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Maximum number of times that a message will be redelivered before being sent to the dead letter queue. If this valu [...] "negativeAckRedeliveryDelayMicros": { "kind": "parameter", "displayName": "Negative Ack Redelivery Delay Micros", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": 60000000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Set the negative acknowledgement delay" }, "numberOfConsumers": { "kind": "parameter", "displayName": "Number Of Consumers", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": 1, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Number of consumers - defaults to 1" }, + "readCompacted": { "kind": "parameter", "displayName": "Read Compacted", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Enable compacted topic reading." }, "subscriptionInitialPosition": { "kind": "parameter", "displayName": "Subscription Initial Position", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition", "enum": [ "EARLIEST", "LATEST" ], "deprecated": false, "secret": false, "defaultValue": "LATEST", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfigur [...] "subscriptionName": { "kind": "parameter", "displayName": "Subscription Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "defaultValue": "subs", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Name of the subscription to use" }, "subscriptionTopicsMode": { "kind": "parameter", "displayName": "Subscription Topics Mode", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.pulsar.client.api.RegexSubscriptionMode", "enum": [ "PersistentOnly", "NonPersistentOnly", "AllTopics" ], "deprecated": false, "secret": false, "defaultValue": "PersistentOnly", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfigu [...] diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/pulsar-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/pulsar-component.adoc index db3962b..cb375e6 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/pulsar-component.adoc +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/pulsar-component.adoc @@ -36,7 +36,7 @@ pulsar:[persistent|non-persistent]://tenant/namespace/topic // component options: START -The Pulsar component supports 35 options, which are listed below. +The Pulsar component supports 36 options, which are listed below. @@ -55,6 +55,7 @@ The Pulsar component supports 35 options, which are listed below. | *maxRedeliverCount* (consumer) | Maximum number of times that a message will be redelivered before being sent to the dead letter queue. If this value is not set, no Dead Letter Policy will be created | | Integer | *negativeAckRedeliveryDelay{zwsp}Micros* (consumer) | Set the negative acknowledgement delay | 60000000 | long | *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 | int +| *readCompacted* (consumer) | Enable compacted topic reading. | false | boolean | *subscriptionInitialPosition* (consumer) | Control the initial position in the topic of a newly created subscription. Default is latest message. There are 2 enums and the value can be one of: EARLIEST, LATEST | LATEST | SubscriptionInitialPosition | *subscriptionName* (consumer) | Name of the subscription to use | subs | String | *subscriptionTopicsMode* (consumer) | Determines to which topics this consumer should be subscribed to - Persistent, Non-Persistent, or both. Only used with pattern subscriptions. There are 3 enums and the value can be one of: PersistentOnly, NonPersistentOnly, AllTopics | PersistentOnly | RegexSubscriptionMode @@ -107,7 +108,7 @@ with the following path and query parameters: |=== -=== Query Parameters (34 parameters): +=== Query Parameters (35 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -124,6 +125,7 @@ with the following path and query parameters: | *maxRedeliverCount* (consumer) | Maximum number of times that a message will be redelivered before being sent to the dead letter queue. If this value is not set, no Dead Letter Policy will be created | | Integer | *negativeAckRedeliveryDelay{zwsp}Micros* (consumer) | Set the negative acknowledgement delay | 60000000 | long | *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 | int +| *readCompacted* (consumer) | Enable compacted topic reading. | false | boolean | *subscriptionInitialPosition* (consumer) | Control the initial position in the topic of a newly created subscription. Default is latest message. There are 2 enums and the value can be one of: EARLIEST, LATEST | LATEST | SubscriptionInitialPosition | *subscriptionName* (consumer) | Name of the subscription to use | subs | String | *subscriptionTopicsMode* (consumer) | Determines to which topics this consumer should be subscribed to - Persistent, Non-Persistent, or both. Only used with pattern subscriptions. There are 3 enums and the value can be one of: PersistentOnly, NonPersistentOnly, AllTopics | PersistentOnly | RegexSubscriptionMode diff --git a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java index 4480dc7..bc6689f 100644 --- a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java +++ b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java @@ -30,6 +30,7 @@ public class PulsarComponentConfigurer extends PropertyConfigurerSupport impleme map.put("maxRedeliverCount", java.lang.Integer.class); map.put("negativeAckRedeliveryDelayMicros", long.class); map.put("numberOfConsumers", int.class); + map.put("readCompacted", boolean.class); map.put("subscriptionInitialPosition", org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition.class); map.put("subscriptionName", java.lang.String.class); map.put("subscriptionTopicsMode", org.apache.pulsar.client.api.RegexSubscriptionMode.class); @@ -124,6 +125,8 @@ public class PulsarComponentConfigurer extends PropertyConfigurerSupport impleme case "pulsarClient": target.setPulsarClient(property(camelContext, org.apache.pulsar.client.api.PulsarClient.class, value)); return true; case "pulsarmessagereceiptfactory": case "pulsarMessageReceiptFactory": target.setPulsarMessageReceiptFactory(property(camelContext, org.apache.camel.component.pulsar.PulsarMessageReceiptFactory.class, value)); return true; + case "readcompacted": + case "readCompacted": getOrCreateConfiguration(target).setReadCompacted(property(camelContext, boolean.class, value)); return true; case "sendtimeoutms": case "sendTimeoutMs": getOrCreateConfiguration(target).setSendTimeoutMs(property(camelContext, int.class, value)); return true; case "subscriptioninitialposition": @@ -206,6 +209,8 @@ public class PulsarComponentConfigurer extends PropertyConfigurerSupport impleme case "pulsarClient": return target.getPulsarClient(); case "pulsarmessagereceiptfactory": case "pulsarMessageReceiptFactory": return target.getPulsarMessageReceiptFactory(); + case "readcompacted": + case "readCompacted": return getOrCreateConfiguration(target).isReadCompacted(); case "sendtimeoutms": case "sendTimeoutMs": return getOrCreateConfiguration(target).getSendTimeoutMs(); case "subscriptioninitialposition": diff --git a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java index 69bd2e8..be19279 100644 --- a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java +++ b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java @@ -33,6 +33,7 @@ public class PulsarEndpointConfigurer extends PropertyConfigurerSupport implemen map.put("maxRedeliverCount", java.lang.Integer.class); map.put("negativeAckRedeliveryDelayMicros", long.class); map.put("numberOfConsumers", int.class); + map.put("readCompacted", boolean.class); map.put("subscriptionInitialPosition", org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition.class); map.put("subscriptionName", java.lang.String.class); map.put("subscriptionTopicsMode", org.apache.pulsar.client.api.RegexSubscriptionMode.class); @@ -117,6 +118,8 @@ public class PulsarEndpointConfigurer extends PropertyConfigurerSupport implemen case "numberOfConsumers": target.getPulsarConfiguration().setNumberOfConsumers(property(camelContext, int.class, value)); return true; case "producername": case "producerName": target.getPulsarConfiguration().setProducerName(property(camelContext, java.lang.String.class, value)); return true; + case "readcompacted": + case "readCompacted": target.getPulsarConfiguration().setReadCompacted(property(camelContext, boolean.class, value)); return true; case "sendtimeoutms": case "sendTimeoutMs": target.getPulsarConfiguration().setSendTimeoutMs(property(camelContext, int.class, value)); return true; case "subscriptioninitialposition": @@ -197,6 +200,8 @@ public class PulsarEndpointConfigurer extends PropertyConfigurerSupport implemen case "numberOfConsumers": return target.getPulsarConfiguration().getNumberOfConsumers(); case "producername": case "producerName": return target.getPulsarConfiguration().getProducerName(); + case "readcompacted": + case "readCompacted": return target.getPulsarConfiguration().isReadCompacted(); case "sendtimeoutms": case "sendTimeoutMs": return target.getPulsarConfiguration().getSendTimeoutMs(); case "subscriptioninitialposition": diff --git a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java index 5e70ee5..34ac4a1 100644 --- a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java +++ b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java @@ -20,7 +20,7 @@ public class PulsarEndpointUriFactory extends org.apache.camel.support.component private static final Set<String> PROPERTY_NAMES; private static final Set<String> SECRET_PROPERTY_NAMES; static { - Set<String> props = new HashSet<>(38); + Set<String> props = new HashSet<>(39); props.add("basicPropertyBinding"); props.add("initialSequenceId"); props.add("maxRedeliverCount"); @@ -49,6 +49,7 @@ public class PulsarEndpointUriFactory extends org.apache.camel.support.component props.add("messageRoutingMode"); props.add("ackTimeoutMillis"); props.add("consumerNamePrefix"); + props.add("readCompacted"); props.add("lazyStartProducer"); props.add("subscriptionType"); props.add("subscriptionName"); diff --git a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json index ffcc9c5..879c934 100644 --- a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json +++ b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json @@ -34,6 +34,7 @@ "maxRedeliverCount": { "kind": "property", "displayName": "Max Redeliver Count", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Maximum number of times that a message will be redelivered before being sent to the dead letter queue. If this value is no [...] "negativeAckRedeliveryDelayMicros": { "kind": "property", "displayName": "Negative Ack Redelivery Delay Micros", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": 60000000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Set the negative acknowledgement delay" }, "numberOfConsumers": { "kind": "property", "displayName": "Number Of Consumers", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": 1, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Number of consumers - defaults to 1" }, + "readCompacted": { "kind": "property", "displayName": "Read Compacted", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Enable compacted topic reading." }, "subscriptionInitialPosition": { "kind": "property", "displayName": "Subscription Initial Position", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition", "enum": [ "EARLIEST", "LATEST" ], "deprecated": false, "secret": false, "defaultValue": "LATEST", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", [...] "subscriptionName": { "kind": "property", "displayName": "Subscription Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "defaultValue": "subs", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Name of the subscription to use" }, "subscriptionTopicsMode": { "kind": "property", "displayName": "Subscription Topics Mode", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.pulsar.client.api.RegexSubscriptionMode", "enum": [ "PersistentOnly", "NonPersistentOnly", "AllTopics" ], "deprecated": false, "secret": false, "defaultValue": "PersistentOnly", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration" [...] @@ -74,6 +75,7 @@ "maxRedeliverCount": { "kind": "parameter", "displayName": "Max Redeliver Count", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Maximum number of times that a message will be redelivered before being sent to the dead letter queue. If this valu [...] "negativeAckRedeliveryDelayMicros": { "kind": "parameter", "displayName": "Negative Ack Redelivery Delay Micros", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": 60000000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Set the negative acknowledgement delay" }, "numberOfConsumers": { "kind": "parameter", "displayName": "Number Of Consumers", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": 1, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Number of consumers - defaults to 1" }, + "readCompacted": { "kind": "parameter", "displayName": "Read Compacted", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Enable compacted topic reading." }, "subscriptionInitialPosition": { "kind": "parameter", "displayName": "Subscription Initial Position", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition", "enum": [ "EARLIEST", "LATEST" ], "deprecated": false, "secret": false, "defaultValue": "LATEST", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfigur [...] "subscriptionName": { "kind": "parameter", "displayName": "Subscription Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "defaultValue": "subs", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Name of the subscription to use" }, "subscriptionTopicsMode": { "kind": "parameter", "displayName": "Subscription Topics Mode", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.pulsar.client.api.RegexSubscriptionMode", "enum": [ "PersistentOnly", "NonPersistentOnly", "AllTopics" ], "deprecated": false, "secret": false, "defaultValue": "PersistentOnly", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfigu [...] diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc b/components/camel-pulsar/src/main/docs/pulsar-component.adoc index db3962b..cb375e6 100644 --- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc +++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc @@ -36,7 +36,7 @@ pulsar:[persistent|non-persistent]://tenant/namespace/topic // component options: START -The Pulsar component supports 35 options, which are listed below. +The Pulsar component supports 36 options, which are listed below. @@ -55,6 +55,7 @@ The Pulsar component supports 35 options, which are listed below. | *maxRedeliverCount* (consumer) | Maximum number of times that a message will be redelivered before being sent to the dead letter queue. If this value is not set, no Dead Letter Policy will be created | | Integer | *negativeAckRedeliveryDelay{zwsp}Micros* (consumer) | Set the negative acknowledgement delay | 60000000 | long | *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 | int +| *readCompacted* (consumer) | Enable compacted topic reading. | false | boolean | *subscriptionInitialPosition* (consumer) | Control the initial position in the topic of a newly created subscription. Default is latest message. There are 2 enums and the value can be one of: EARLIEST, LATEST | LATEST | SubscriptionInitialPosition | *subscriptionName* (consumer) | Name of the subscription to use | subs | String | *subscriptionTopicsMode* (consumer) | Determines to which topics this consumer should be subscribed to - Persistent, Non-Persistent, or both. Only used with pattern subscriptions. There are 3 enums and the value can be one of: PersistentOnly, NonPersistentOnly, AllTopics | PersistentOnly | RegexSubscriptionMode @@ -107,7 +108,7 @@ with the following path and query parameters: |=== -=== Query Parameters (34 parameters): +=== Query Parameters (35 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -124,6 +125,7 @@ with the following path and query parameters: | *maxRedeliverCount* (consumer) | Maximum number of times that a message will be redelivered before being sent to the dead letter queue. If this value is not set, no Dead Letter Policy will be created | | Integer | *negativeAckRedeliveryDelay{zwsp}Micros* (consumer) | Set the negative acknowledgement delay | 60000000 | long | *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 | int +| *readCompacted* (consumer) | Enable compacted topic reading. | false | boolean | *subscriptionInitialPosition* (consumer) | Control the initial position in the topic of a newly created subscription. Default is latest message. There are 2 enums and the value can be one of: EARLIEST, LATEST | LATEST | SubscriptionInitialPosition | *subscriptionName* (consumer) | Name of the subscription to use | subs | String | *subscriptionTopicsMode* (consumer) | Determines to which topics this consumer should be subscribed to - Persistent, Non-Persistent, or both. Only used with pattern subscriptions. There are 3 enums and the value can be one of: PersistentOnly, NonPersistentOnly, AllTopics | PersistentOnly | RegexSubscriptionMode diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java index a10c1a1..c43b511 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java @@ -63,6 +63,8 @@ public class PulsarConfiguration implements Cloneable { private long ackGroupTimeMillis = 100; @UriParam(label = "consumer", defaultValue = "LATEST") private SubscriptionInitialPosition subscriptionInitialPosition = LATEST; + @UriParam(label = "consumer", defaultValue = "false") + private boolean readCompacted; @UriParam(label = "consumer", description = "Maximum number of times that a message will be redelivered before being sent to the dead letter queue. If this value is not set, no Dead Letter Policy will be created") private Integer maxRedeliverCount; @@ -367,6 +369,17 @@ public class PulsarConfiguration implements Cloneable { } /** + * Enable compacted topic reading. + */ + public boolean isReadCompacted() { + return readCompacted; + } + + public void setReadCompacted(boolean readCompacted) { + this.readCompacted = readCompacted; + } + + /** * Set the baseline for the sequence ids for messages published by the producer. First message will be using * (initialSequenceId 1) as its sequence id and subsequent messages will be assigned incremental sequence ids, if * not otherwise specified. diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java index f429980..86fe75c 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java @@ -51,7 +51,8 @@ public final class CommonCreationStrategyImpl { endpointConfiguration.getSubscriptionInitialPosition().toPulsarSubscriptionInitialPosition()) .acknowledgmentGroupTime(endpointConfiguration.getAckGroupTimeMillis(), TimeUnit.MILLISECONDS) .negativeAckRedeliveryDelay(endpointConfiguration.getNegativeAckRedeliveryDelayMicros(), TimeUnit.MICROSECONDS) - .messageListener(new PulsarMessageListener(pulsarEndpoint, pulsarConsumer)); + .messageListener(new PulsarMessageListener(pulsarEndpoint, pulsarConsumer)) + .readCompacted(endpointConfiguration.isReadCompacted()); if (endpointConfiguration.getMaxRedeliverCount() != null) { DeadLetterPolicyBuilder policy = DeadLetterPolicy.builder() diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java index 146586e..d86f981 100644 --- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java @@ -70,6 +70,7 @@ public class PulsarComponentTest extends CamelTestSupport { assertEquals("subs", endpoint.getPulsarConfiguration().getSubscriptionName()); assertEquals(SubscriptionType.EXCLUSIVE, endpoint.getPulsarConfiguration().getSubscriptionType()); assertFalse(endpoint.getPulsarConfiguration().isAllowManualAcknowledgement()); + assertFalse(endpoint.getPulsarConfiguration().isReadCompacted()); } @Test diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerReadCompactedTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerReadCompactedTest.java new file mode 100644 index 0000000..e80e64a --- /dev/null +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerReadCompactedTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.pulsar; + +import java.util.concurrent.TimeUnit; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.pulsar.utils.AutoConfiguration; +import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders; +import org.apache.camel.spi.Registry; +import org.apache.camel.support.SimpleRegistry; +import org.apache.pulsar.client.admin.LongRunningProcessStatus; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.Topics; +import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PulsarConsumerReadCompactedTest extends PulsarTestSupport { + + private static final Logger LOGGER = LoggerFactory.getLogger(PulsarConsumerReadCompactedTest.class); + + private static final String TOPIC_URI = "persistent://public/default/camel-topic"; + private static final String PRODUCER = "camel-producer-1"; + + @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive" + + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" + + "&allowManualAcknowledgement=true" + "&ackTimeoutMillis=1000" + + "&readCompacted=true" + + "&negativeAckRedeliveryDelayMicros=100000") + private Endpoint from; + + @EndpointInject("mock:result") + private MockEndpoint to; + + private Producer<String> producer; + + @BeforeEach + public void setup() throws Exception { + context.removeRoute("myRoute"); + producer = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create(); + } + + @Override + protected Registry createCamelRegistry() throws Exception { + Registry registry = new SimpleRegistry(); + + registerPulsarBeans(registry); + + return registry; + } + + private void registerPulsarBeans(final Registry registry) throws PulsarClientException { + PulsarClient pulsarClient = givenPulsarClient(); + AutoConfiguration autoConfiguration = new AutoConfiguration(null, null); + + registry.bind("pulsarClient", pulsarClient); + PulsarComponent comp = new PulsarComponent(context); + comp.setAutoConfiguration(autoConfiguration); + comp.setPulsarClient(pulsarClient); + registry.bind("pulsar", comp); + } + + private PulsarClient givenPulsarClient() throws PulsarClientException { + return new ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build(); + } + + private PulsarAdmin givenPulsarAdmin() throws PulsarClientException { + return new PulsarAdminBuilderImpl().serviceHttpUrl(getPulsarAdminUrl()).build(); + } + + private void triggerCompaction() throws PulsarAdminException, PulsarClientException { + final Topics topics = givenPulsarAdmin().topics(); + + topics.triggerCompaction(TOPIC_URI); + while (!topics.compactionStatus(TOPIC_URI).status.equals(LongRunningProcessStatus.Status.RUNNING)) { + LOGGER.info("Waiting for compaction completeness..."); + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + } + } + + @Test + public void testReadCompacted() throws Exception { + to.expectedMessageCount(1); + triggerCompaction(); + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from(from).routeId("myRoute").to(to).process(exchange -> { + LOGGER.info("Processing message {}", exchange.getIn().getBody()); + + PulsarMessageReceipt receipt + = (PulsarMessageReceipt) exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT); + receipt.acknowledge(); + }); + } + }); + + producer.newMessage().key("myKey").value("Hello World!").send(); + producer.newMessage().key("myKey").value("Hello World! Again!").send(); + + MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to); + } + + @Test + public void testReadNotCompacted() throws Exception { + to.expectedMessageCount(2); + triggerCompaction(); + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from(from).routeId("myRoute").to(to).process(exchange -> { + LOGGER.info("Processing message {}", exchange.getIn().getBody()); + + PulsarMessageReceipt receipt + = (PulsarMessageReceipt) exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT); + receipt.acknowledge(); + }); + } + }); + + producer.newMessage().key("mySecondKey").value("Hello World!").send(); + producer.newMessage().key("mySecondKey").value("Hello World! Again!").send(); + + MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to); + } +} diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/PulsarComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/PulsarComponentBuilderFactory.java index 02fc71e..de3ac84 100644 --- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/PulsarComponentBuilderFactory.java +++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/PulsarComponentBuilderFactory.java @@ -220,6 +220,18 @@ public interface PulsarComponentBuilderFactory { return this; } /** + * Enable compacted topic reading. + * + * The option is a: <code>boolean</code> type. + * + * Default: false + * Group: consumer + */ + default PulsarComponentBuilder readCompacted(boolean readCompacted) { + doSetProperty("readCompacted", readCompacted); + return this; + } + /** * Control the initial position in the topic of a newly created * subscription. Default is latest message. * @@ -578,6 +590,7 @@ public interface PulsarComponentBuilderFactory { case "maxRedeliverCount": getOrCreateConfiguration((PulsarComponent) component).setMaxRedeliverCount((java.lang.Integer) value); return true; case "negativeAckRedeliveryDelayMicros": getOrCreateConfiguration((PulsarComponent) component).setNegativeAckRedeliveryDelayMicros((long) value); return true; case "numberOfConsumers": getOrCreateConfiguration((PulsarComponent) component).setNumberOfConsumers((int) value); return true; + case "readCompacted": getOrCreateConfiguration((PulsarComponent) component).setReadCompacted((boolean) value); return true; case "subscriptionInitialPosition": getOrCreateConfiguration((PulsarComponent) component).setSubscriptionInitialPosition((org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition) value); return true; case "subscriptionName": getOrCreateConfiguration((PulsarComponent) component).setSubscriptionName((java.lang.String) value); return true; case "subscriptionTopicsMode": getOrCreateConfiguration((PulsarComponent) component).setSubscriptionTopicsMode((org.apache.pulsar.client.api.RegexSubscriptionMode) value); return true; diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java index ce57097..fa79a01 100644 --- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java +++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java @@ -319,6 +319,31 @@ public interface PulsarEndpointBuilderFactory { return this; } /** + * Enable compacted topic reading. + * + * The option is a: <code>boolean</code> type. + * + * Default: false + * Group: consumer + */ + default PulsarEndpointConsumerBuilder readCompacted( + boolean readCompacted) { + doSetProperty("readCompacted", readCompacted); + return this; + } + /** + * Enable compacted topic reading. + * + * The option will be converted to a <code>boolean</code> type. + * + * Default: false + * Group: consumer + */ + default PulsarEndpointConsumerBuilder readCompacted(String readCompacted) { + doSetProperty("readCompacted", readCompacted); + return this; + } + /** * Control the initial position in the topic of a newly created * subscription. Default is latest message. * diff --git a/docs/components/modules/ROOT/pages/pulsar-component.adoc b/docs/components/modules/ROOT/pages/pulsar-component.adoc index dd2c35b..0076543 100644 --- a/docs/components/modules/ROOT/pages/pulsar-component.adoc +++ b/docs/components/modules/ROOT/pages/pulsar-component.adoc @@ -38,7 +38,7 @@ pulsar:[persistent|non-persistent]://tenant/namespace/topic // component options: START -The Pulsar component supports 35 options, which are listed below. +The Pulsar component supports 36 options, which are listed below. @@ -57,6 +57,7 @@ The Pulsar component supports 35 options, which are listed below. | *maxRedeliverCount* (consumer) | Maximum number of times that a message will be redelivered before being sent to the dead letter queue. If this value is not set, no Dead Letter Policy will be created | | Integer | *negativeAckRedeliveryDelay{zwsp}Micros* (consumer) | Set the negative acknowledgement delay | 60000000 | long | *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 | int +| *readCompacted* (consumer) | Enable compacted topic reading. | false | boolean | *subscriptionInitialPosition* (consumer) | Control the initial position in the topic of a newly created subscription. Default is latest message. There are 2 enums and the value can be one of: EARLIEST, LATEST | LATEST | SubscriptionInitialPosition | *subscriptionName* (consumer) | Name of the subscription to use | subs | String | *subscriptionTopicsMode* (consumer) | Determines to which topics this consumer should be subscribed to - Persistent, Non-Persistent, or both. Only used with pattern subscriptions. There are 3 enums and the value can be one of: PersistentOnly, NonPersistentOnly, AllTopics | PersistentOnly | RegexSubscriptionMode @@ -109,7 +110,7 @@ with the following path and query parameters: |=== -=== Query Parameters (34 parameters): +=== Query Parameters (35 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -126,6 +127,7 @@ with the following path and query parameters: | *maxRedeliverCount* (consumer) | Maximum number of times that a message will be redelivered before being sent to the dead letter queue. If this value is not set, no Dead Letter Policy will be created | | Integer | *negativeAckRedeliveryDelay{zwsp}Micros* (consumer) | Set the negative acknowledgement delay | 60000000 | long | *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 | int +| *readCompacted* (consumer) | Enable compacted topic reading. | false | boolean | *subscriptionInitialPosition* (consumer) | Control the initial position in the topic of a newly created subscription. Default is latest message. There are 2 enums and the value can be one of: EARLIEST, LATEST | LATEST | SubscriptionInitialPosition | *subscriptionName* (consumer) | Name of the subscription to use | subs | String | *subscriptionTopicsMode* (consumer) | Determines to which topics this consumer should be subscribed to - Persistent, Non-Persistent, or both. Only used with pattern subscriptions. There are 3 enums and the value can be one of: PersistentOnly, NonPersistentOnly, AllTopics | PersistentOnly | RegexSubscriptionMode