This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new c736c07  CAMEL-15772: camel-pulsar readCompacted support (#4535)
c736c07 is described below

commit c736c07b9f82e93f8f3ca1edb9be5ba6de5bfc24
Author: ludovic-boutros <boutr...@gmail.com>
AuthorDate: Fri Oct 30 05:56:29 2020 +0100

    CAMEL-15772: camel-pulsar readCompacted support (#4535)
---
 .../apache/camel/catalog/components/pulsar.json    |   2 +
 .../camel/catalog/docs/pulsar-component.adoc       |   6 +-
 .../pulsar/PulsarComponentConfigurer.java          |   5 +
 .../component/pulsar/PulsarEndpointConfigurer.java |   5 +
 .../component/pulsar/PulsarEndpointUriFactory.java |   3 +-
 .../org/apache/camel/component/pulsar/pulsar.json  |   2 +
 .../src/main/docs/pulsar-component.adoc            |   6 +-
 .../component/pulsar/PulsarConfiguration.java      |  13 ++
 .../consumers/CommonCreationStrategyImpl.java      |   3 +-
 .../component/pulsar/PulsarComponentTest.java      |   1 +
 .../pulsar/PulsarConsumerReadCompactedTest.java    | 155 +++++++++++++++++++++
 .../dsl/PulsarComponentBuilderFactory.java         |  13 ++
 .../endpoint/dsl/PulsarEndpointBuilderFactory.java |  25 ++++
 .../modules/ROOT/pages/pulsar-component.adoc       |   6 +-
 14 files changed, 237 insertions(+), 8 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pulsar.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pulsar.json
index ffcc9c5..879c934 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pulsar.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pulsar.json
@@ -34,6 +34,7 @@
     "maxRedeliverCount": { "kind": "property", "displayName": "Max Redeliver 
Count", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "secret": 
false, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"configuration", "description": "Maximum number of times that a message will be 
redelivered before being sent to the dead letter queue. If this value is no 
[...]
     "negativeAckRedeliveryDelayMicros": { "kind": "property", "displayName": 
"Negative Ack Redelivery Delay Micros", "group": "consumer", "label": 
"consumer", "required": false, "type": "integer", "javaType": "long", 
"deprecated": false, "secret": false, "defaultValue": 60000000, 
"configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", 
"configurationField": "configuration", "description": "Set the negative 
acknowledgement delay" },
     "numberOfConsumers": { "kind": "property", "displayName": "Number Of 
Consumers", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "int", "deprecated": false, "secret": false, 
"defaultValue": 1, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"configuration", "description": "Number of consumers - defaults to 1" },
+    "readCompacted": { "kind": "property", "displayName": "Read Compacted", 
"group": "consumer", "label": "consumer", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": 
false, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"configuration", "description": "Enable compacted topic reading." },
     "subscriptionInitialPosition": { "kind": "property", "displayName": 
"Subscription Initial Position", "group": "consumer", "label": "consumer", 
"required": false, "type": "object", "javaType": 
"org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition",
 "enum": [ "EARLIEST", "LATEST" ], "deprecated": false, "secret": false, 
"defaultValue": "LATEST", "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"configuration", [...]
     "subscriptionName": { "kind": "property", "displayName": "Subscription 
Name", "group": "consumer", "label": "consumer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "secret": false, 
"defaultValue": "subs", "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"configuration", "description": "Name of the subscription to use" },
     "subscriptionTopicsMode": { "kind": "property", "displayName": 
"Subscription Topics Mode", "group": "consumer", "label": "consumer", 
"required": false, "type": "object", "javaType": 
"org.apache.pulsar.client.api.RegexSubscriptionMode", "enum": [ 
"PersistentOnly", "NonPersistentOnly", "AllTopics" ], "deprecated": false, 
"secret": false, "defaultValue": "PersistentOnly", "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"configuration" [...]
@@ -74,6 +75,7 @@
     "maxRedeliverCount": { "kind": "parameter", "displayName": "Max Redeliver 
Count", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "secret": 
false, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"pulsarConfiguration", "description": "Maximum number of times that a message 
will be redelivered before being sent to the dead letter queue. If this valu 
[...]
     "negativeAckRedeliveryDelayMicros": { "kind": "parameter", "displayName": 
"Negative Ack Redelivery Delay Micros", "group": "consumer", "label": 
"consumer", "required": false, "type": "integer", "javaType": "long", 
"deprecated": false, "secret": false, "defaultValue": 60000000, 
"configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", 
"configurationField": "pulsarConfiguration", "description": "Set the negative 
acknowledgement delay" },
     "numberOfConsumers": { "kind": "parameter", "displayName": "Number Of 
Consumers", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "int", "deprecated": false, "secret": false, 
"defaultValue": 1, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"pulsarConfiguration", "description": "Number of consumers - defaults to 1" },
+    "readCompacted": { "kind": "parameter", "displayName": "Read Compacted", 
"group": "consumer", "label": "consumer", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": 
false, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"pulsarConfiguration", "description": "Enable compacted topic reading." },
     "subscriptionInitialPosition": { "kind": "parameter", "displayName": 
"Subscription Initial Position", "group": "consumer", "label": "consumer", 
"required": false, "type": "object", "javaType": 
"org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition",
 "enum": [ "EARLIEST", "LATEST" ], "deprecated": false, "secret": false, 
"defaultValue": "LATEST", "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"pulsarConfigur [...]
     "subscriptionName": { "kind": "parameter", "displayName": "Subscription 
Name", "group": "consumer", "label": "consumer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "secret": false, 
"defaultValue": "subs", "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"pulsarConfiguration", "description": "Name of the subscription to use" },
     "subscriptionTopicsMode": { "kind": "parameter", "displayName": 
"Subscription Topics Mode", "group": "consumer", "label": "consumer", 
"required": false, "type": "object", "javaType": 
"org.apache.pulsar.client.api.RegexSubscriptionMode", "enum": [ 
"PersistentOnly", "NonPersistentOnly", "AllTopics" ], "deprecated": false, 
"secret": false, "defaultValue": "PersistentOnly", "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"pulsarConfigu [...]
diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/pulsar-component.adoc
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/pulsar-component.adoc
index db3962b..cb375e6 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/pulsar-component.adoc
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/pulsar-component.adoc
@@ -36,7 +36,7 @@ pulsar:[persistent|non-persistent]://tenant/namespace/topic
 
 
 // component options: START
-The Pulsar component supports 35 options, which are listed below.
+The Pulsar component supports 36 options, which are listed below.
 
 
 
@@ -55,6 +55,7 @@ The Pulsar component supports 35 options, which are listed 
below.
 | *maxRedeliverCount* (consumer) | Maximum number of times that a message will 
be redelivered before being sent to the dead letter queue. If this value is not 
set, no Dead Letter Policy will be created |  | Integer
 | *negativeAckRedeliveryDelay{zwsp}Micros* (consumer) | Set the negative 
acknowledgement delay | 60000000 | long
 | *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 | 
int
+| *readCompacted* (consumer) | Enable compacted topic reading. | false | 
boolean
 | *subscriptionInitialPosition* (consumer) | Control the initial position in 
the topic of a newly created subscription. Default is latest message. There are 
2 enums and the value can be one of: EARLIEST, LATEST | LATEST | 
SubscriptionInitialPosition
 | *subscriptionName* (consumer) | Name of the subscription to use | subs | 
String
 | *subscriptionTopicsMode* (consumer) | Determines to which topics this 
consumer should be subscribed to - Persistent, Non-Persistent, or both. Only 
used with pattern subscriptions. There are 3 enums and the value can be one of: 
PersistentOnly, NonPersistentOnly, AllTopics | PersistentOnly | 
RegexSubscriptionMode
@@ -107,7 +108,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (34 parameters):
+=== Query Parameters (35 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -124,6 +125,7 @@ with the following path and query parameters:
 | *maxRedeliverCount* (consumer) | Maximum number of times that a message will 
be redelivered before being sent to the dead letter queue. If this value is not 
set, no Dead Letter Policy will be created |  | Integer
 | *negativeAckRedeliveryDelay{zwsp}Micros* (consumer) | Set the negative 
acknowledgement delay | 60000000 | long
 | *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 | 
int
+| *readCompacted* (consumer) | Enable compacted topic reading. | false | 
boolean
 | *subscriptionInitialPosition* (consumer) | Control the initial position in 
the topic of a newly created subscription. Default is latest message. There are 
2 enums and the value can be one of: EARLIEST, LATEST | LATEST | 
SubscriptionInitialPosition
 | *subscriptionName* (consumer) | Name of the subscription to use | subs | 
String
 | *subscriptionTopicsMode* (consumer) | Determines to which topics this 
consumer should be subscribed to - Persistent, Non-Persistent, or both. Only 
used with pattern subscriptions. There are 3 enums and the value can be one of: 
PersistentOnly, NonPersistentOnly, AllTopics | PersistentOnly | 
RegexSubscriptionMode
diff --git 
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java
 
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java
index 4480dc7..bc6689f 100644
--- 
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java
+++ 
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java
@@ -30,6 +30,7 @@ public class PulsarComponentConfigurer extends 
PropertyConfigurerSupport impleme
         map.put("maxRedeliverCount", java.lang.Integer.class);
         map.put("negativeAckRedeliveryDelayMicros", long.class);
         map.put("numberOfConsumers", int.class);
+        map.put("readCompacted", boolean.class);
         map.put("subscriptionInitialPosition", 
org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition.class);
         map.put("subscriptionName", java.lang.String.class);
         map.put("subscriptionTopicsMode", 
org.apache.pulsar.client.api.RegexSubscriptionMode.class);
@@ -124,6 +125,8 @@ public class PulsarComponentConfigurer extends 
PropertyConfigurerSupport impleme
         case "pulsarClient": target.setPulsarClient(property(camelContext, 
org.apache.pulsar.client.api.PulsarClient.class, value)); return true;
         case "pulsarmessagereceiptfactory":
         case "pulsarMessageReceiptFactory": 
target.setPulsarMessageReceiptFactory(property(camelContext, 
org.apache.camel.component.pulsar.PulsarMessageReceiptFactory.class, value)); 
return true;
+        case "readcompacted":
+        case "readCompacted": 
getOrCreateConfiguration(target).setReadCompacted(property(camelContext, 
boolean.class, value)); return true;
         case "sendtimeoutms":
         case "sendTimeoutMs": 
getOrCreateConfiguration(target).setSendTimeoutMs(property(camelContext, 
int.class, value)); return true;
         case "subscriptioninitialposition":
@@ -206,6 +209,8 @@ public class PulsarComponentConfigurer extends 
PropertyConfigurerSupport impleme
         case "pulsarClient": return target.getPulsarClient();
         case "pulsarmessagereceiptfactory":
         case "pulsarMessageReceiptFactory": return 
target.getPulsarMessageReceiptFactory();
+        case "readcompacted":
+        case "readCompacted": return 
getOrCreateConfiguration(target).isReadCompacted();
         case "sendtimeoutms":
         case "sendTimeoutMs": return 
getOrCreateConfiguration(target).getSendTimeoutMs();
         case "subscriptioninitialposition":
diff --git 
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
 
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
index 69bd2e8..be19279 100644
--- 
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
+++ 
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
@@ -33,6 +33,7 @@ public class PulsarEndpointConfigurer extends 
PropertyConfigurerSupport implemen
         map.put("maxRedeliverCount", java.lang.Integer.class);
         map.put("negativeAckRedeliveryDelayMicros", long.class);
         map.put("numberOfConsumers", int.class);
+        map.put("readCompacted", boolean.class);
         map.put("subscriptionInitialPosition", 
org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition.class);
         map.put("subscriptionName", java.lang.String.class);
         map.put("subscriptionTopicsMode", 
org.apache.pulsar.client.api.RegexSubscriptionMode.class);
@@ -117,6 +118,8 @@ public class PulsarEndpointConfigurer extends 
PropertyConfigurerSupport implemen
         case "numberOfConsumers": 
target.getPulsarConfiguration().setNumberOfConsumers(property(camelContext, 
int.class, value)); return true;
         case "producername":
         case "producerName": 
target.getPulsarConfiguration().setProducerName(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "readcompacted":
+        case "readCompacted": 
target.getPulsarConfiguration().setReadCompacted(property(camelContext, 
boolean.class, value)); return true;
         case "sendtimeoutms":
         case "sendTimeoutMs": 
target.getPulsarConfiguration().setSendTimeoutMs(property(camelContext, 
int.class, value)); return true;
         case "subscriptioninitialposition":
@@ -197,6 +200,8 @@ public class PulsarEndpointConfigurer extends 
PropertyConfigurerSupport implemen
         case "numberOfConsumers": return 
target.getPulsarConfiguration().getNumberOfConsumers();
         case "producername":
         case "producerName": return 
target.getPulsarConfiguration().getProducerName();
+        case "readcompacted":
+        case "readCompacted": return 
target.getPulsarConfiguration().isReadCompacted();
         case "sendtimeoutms":
         case "sendTimeoutMs": return 
target.getPulsarConfiguration().getSendTimeoutMs();
         case "subscriptioninitialposition":
diff --git 
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java
 
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java
index 5e70ee5..34ac4a1 100644
--- 
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java
+++ 
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java
@@ -20,7 +20,7 @@ public class PulsarEndpointUriFactory extends 
org.apache.camel.support.component
     private static final Set<String> PROPERTY_NAMES;
     private static final Set<String> SECRET_PROPERTY_NAMES;
     static {
-        Set<String> props = new HashSet<>(38);
+        Set<String> props = new HashSet<>(39);
         props.add("basicPropertyBinding");
         props.add("initialSequenceId");
         props.add("maxRedeliverCount");
@@ -49,6 +49,7 @@ public class PulsarEndpointUriFactory extends 
org.apache.camel.support.component
         props.add("messageRoutingMode");
         props.add("ackTimeoutMillis");
         props.add("consumerNamePrefix");
+        props.add("readCompacted");
         props.add("lazyStartProducer");
         props.add("subscriptionType");
         props.add("subscriptionName");
diff --git 
a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
 
b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
index ffcc9c5..879c934 100644
--- 
a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
+++ 
b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
@@ -34,6 +34,7 @@
     "maxRedeliverCount": { "kind": "property", "displayName": "Max Redeliver 
Count", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "secret": 
false, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"configuration", "description": "Maximum number of times that a message will be 
redelivered before being sent to the dead letter queue. If this value is no 
[...]
     "negativeAckRedeliveryDelayMicros": { "kind": "property", "displayName": 
"Negative Ack Redelivery Delay Micros", "group": "consumer", "label": 
"consumer", "required": false, "type": "integer", "javaType": "long", 
"deprecated": false, "secret": false, "defaultValue": 60000000, 
"configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", 
"configurationField": "configuration", "description": "Set the negative 
acknowledgement delay" },
     "numberOfConsumers": { "kind": "property", "displayName": "Number Of 
Consumers", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "int", "deprecated": false, "secret": false, 
"defaultValue": 1, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"configuration", "description": "Number of consumers - defaults to 1" },
+    "readCompacted": { "kind": "property", "displayName": "Read Compacted", 
"group": "consumer", "label": "consumer", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": 
false, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"configuration", "description": "Enable compacted topic reading." },
     "subscriptionInitialPosition": { "kind": "property", "displayName": 
"Subscription Initial Position", "group": "consumer", "label": "consumer", 
"required": false, "type": "object", "javaType": 
"org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition",
 "enum": [ "EARLIEST", "LATEST" ], "deprecated": false, "secret": false, 
"defaultValue": "LATEST", "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"configuration", [...]
     "subscriptionName": { "kind": "property", "displayName": "Subscription 
Name", "group": "consumer", "label": "consumer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "secret": false, 
"defaultValue": "subs", "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"configuration", "description": "Name of the subscription to use" },
     "subscriptionTopicsMode": { "kind": "property", "displayName": 
"Subscription Topics Mode", "group": "consumer", "label": "consumer", 
"required": false, "type": "object", "javaType": 
"org.apache.pulsar.client.api.RegexSubscriptionMode", "enum": [ 
"PersistentOnly", "NonPersistentOnly", "AllTopics" ], "deprecated": false, 
"secret": false, "defaultValue": "PersistentOnly", "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"configuration" [...]
@@ -74,6 +75,7 @@
     "maxRedeliverCount": { "kind": "parameter", "displayName": "Max Redeliver 
Count", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "secret": 
false, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"pulsarConfiguration", "description": "Maximum number of times that a message 
will be redelivered before being sent to the dead letter queue. If this valu 
[...]
     "negativeAckRedeliveryDelayMicros": { "kind": "parameter", "displayName": 
"Negative Ack Redelivery Delay Micros", "group": "consumer", "label": 
"consumer", "required": false, "type": "integer", "javaType": "long", 
"deprecated": false, "secret": false, "defaultValue": 60000000, 
"configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", 
"configurationField": "pulsarConfiguration", "description": "Set the negative 
acknowledgement delay" },
     "numberOfConsumers": { "kind": "parameter", "displayName": "Number Of 
Consumers", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "int", "deprecated": false, "secret": false, 
"defaultValue": 1, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"pulsarConfiguration", "description": "Number of consumers - defaults to 1" },
+    "readCompacted": { "kind": "parameter", "displayName": "Read Compacted", 
"group": "consumer", "label": "consumer", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": 
false, "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"pulsarConfiguration", "description": "Enable compacted topic reading." },
     "subscriptionInitialPosition": { "kind": "parameter", "displayName": 
"Subscription Initial Position", "group": "consumer", "label": "consumer", 
"required": false, "type": "object", "javaType": 
"org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition",
 "enum": [ "EARLIEST", "LATEST" ], "deprecated": false, "secret": false, 
"defaultValue": "LATEST", "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"pulsarConfigur [...]
     "subscriptionName": { "kind": "parameter", "displayName": "Subscription 
Name", "group": "consumer", "label": "consumer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "secret": false, 
"defaultValue": "subs", "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"pulsarConfiguration", "description": "Name of the subscription to use" },
     "subscriptionTopicsMode": { "kind": "parameter", "displayName": 
"Subscription Topics Mode", "group": "consumer", "label": "consumer", 
"required": false, "type": "object", "javaType": 
"org.apache.pulsar.client.api.RegexSubscriptionMode", "enum": [ 
"PersistentOnly", "NonPersistentOnly", "AllTopics" ], "deprecated": false, 
"secret": false, "defaultValue": "PersistentOnly", "configurationClass": 
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": 
"pulsarConfigu [...]
diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc 
b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
index db3962b..cb375e6 100644
--- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc
+++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
@@ -36,7 +36,7 @@ pulsar:[persistent|non-persistent]://tenant/namespace/topic
 
 
 // component options: START
-The Pulsar component supports 35 options, which are listed below.
+The Pulsar component supports 36 options, which are listed below.
 
 
 
@@ -55,6 +55,7 @@ The Pulsar component supports 35 options, which are listed 
below.
 | *maxRedeliverCount* (consumer) | Maximum number of times that a message will 
be redelivered before being sent to the dead letter queue. If this value is not 
set, no Dead Letter Policy will be created |  | Integer
 | *negativeAckRedeliveryDelay{zwsp}Micros* (consumer) | Set the negative 
acknowledgement delay | 60000000 | long
 | *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 | 
int
+| *readCompacted* (consumer) | Enable compacted topic reading. | false | 
boolean
 | *subscriptionInitialPosition* (consumer) | Control the initial position in 
the topic of a newly created subscription. Default is latest message. There are 
2 enums and the value can be one of: EARLIEST, LATEST | LATEST | 
SubscriptionInitialPosition
 | *subscriptionName* (consumer) | Name of the subscription to use | subs | 
String
 | *subscriptionTopicsMode* (consumer) | Determines to which topics this 
consumer should be subscribed to - Persistent, Non-Persistent, or both. Only 
used with pattern subscriptions. There are 3 enums and the value can be one of: 
PersistentOnly, NonPersistentOnly, AllTopics | PersistentOnly | 
RegexSubscriptionMode
@@ -107,7 +108,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (34 parameters):
+=== Query Parameters (35 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -124,6 +125,7 @@ with the following path and query parameters:
 | *maxRedeliverCount* (consumer) | Maximum number of times that a message will 
be redelivered before being sent to the dead letter queue. If this value is not 
set, no Dead Letter Policy will be created |  | Integer
 | *negativeAckRedeliveryDelay{zwsp}Micros* (consumer) | Set the negative 
acknowledgement delay | 60000000 | long
 | *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 | 
int
+| *readCompacted* (consumer) | Enable compacted topic reading. | false | 
boolean
 | *subscriptionInitialPosition* (consumer) | Control the initial position in 
the topic of a newly created subscription. Default is latest message. There are 
2 enums and the value can be one of: EARLIEST, LATEST | LATEST | 
SubscriptionInitialPosition
 | *subscriptionName* (consumer) | Name of the subscription to use | subs | 
String
 | *subscriptionTopicsMode* (consumer) | Determines to which topics this 
consumer should be subscribed to - Persistent, Non-Persistent, or both. Only 
used with pattern subscriptions. There are 3 enums and the value can be one of: 
PersistentOnly, NonPersistentOnly, AllTopics | PersistentOnly | 
RegexSubscriptionMode
diff --git 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
index a10c1a1..c43b511 100644
--- 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
+++ 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
@@ -63,6 +63,8 @@ public class PulsarConfiguration implements Cloneable {
     private long ackGroupTimeMillis = 100;
     @UriParam(label = "consumer", defaultValue = "LATEST")
     private SubscriptionInitialPosition subscriptionInitialPosition = LATEST;
+    @UriParam(label = "consumer", defaultValue = "false")
+    private boolean readCompacted;
     @UriParam(label = "consumer",
               description = "Maximum number of times that a message will be 
redelivered before being sent to the dead letter queue. If this value is not 
set, no Dead Letter Policy will be created")
     private Integer maxRedeliverCount;
@@ -367,6 +369,17 @@ public class PulsarConfiguration implements Cloneable {
     }
 
     /**
+     * Enable compacted topic reading.
+     */
+    public boolean isReadCompacted() {
+        return readCompacted;
+    }
+
+    public void setReadCompacted(boolean readCompacted) {
+        this.readCompacted = readCompacted;
+    }
+
+    /**
      * Set the baseline for the sequence ids for messages published by the 
producer. First message will be using
      * (initialSequenceId 1) as its sequence id and subsequent messages will 
be assigned incremental sequence ids, if
      * not otherwise specified.
diff --git 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
index f429980..86fe75c 100644
--- 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
+++ 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
@@ -51,7 +51,8 @@ public final class CommonCreationStrategyImpl {
                         
endpointConfiguration.getSubscriptionInitialPosition().toPulsarSubscriptionInitialPosition())
                 
.acknowledgmentGroupTime(endpointConfiguration.getAckGroupTimeMillis(), 
TimeUnit.MILLISECONDS)
                 
.negativeAckRedeliveryDelay(endpointConfiguration.getNegativeAckRedeliveryDelayMicros(),
 TimeUnit.MICROSECONDS)
-                .messageListener(new PulsarMessageListener(pulsarEndpoint, 
pulsarConsumer));
+                .messageListener(new PulsarMessageListener(pulsarEndpoint, 
pulsarConsumer))
+                .readCompacted(endpointConfiguration.isReadCompacted());
 
         if (endpointConfiguration.getMaxRedeliverCount() != null) {
             DeadLetterPolicyBuilder policy = DeadLetterPolicy.builder()
diff --git 
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java
 
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java
index 146586e..d86f981 100644
--- 
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java
+++ 
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java
@@ -70,6 +70,7 @@ public class PulsarComponentTest extends CamelTestSupport {
         assertEquals("subs", 
endpoint.getPulsarConfiguration().getSubscriptionName());
         assertEquals(SubscriptionType.EXCLUSIVE, 
endpoint.getPulsarConfiguration().getSubscriptionType());
         
assertFalse(endpoint.getPulsarConfiguration().isAllowManualAcknowledgement());
+        assertFalse(endpoint.getPulsarConfiguration().isReadCompacted());
     }
 
     @Test
diff --git 
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerReadCompactedTest.java
 
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerReadCompactedTest.java
new file mode 100644
index 0000000..e80e64a
--- /dev/null
+++ 
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerReadCompactedTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.pulsar;
+
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.pulsar.utils.AutoConfiguration;
+import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
+import org.apache.camel.spi.Registry;
+import org.apache.camel.support.SimpleRegistry;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Topics;
+import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PulsarConsumerReadCompactedTest extends PulsarTestSupport {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PulsarConsumerReadCompactedTest.class);
+
+    private static final String TOPIC_URI = 
"persistent://public/default/camel-topic";
+    private static final String PRODUCER = "camel-producer-1";
+
+    @EndpointInject("pulsar:" + TOPIC_URI + 
"?numberOfConsumers=1&subscriptionType=Exclusive"
+                    + 
"&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
+                    + "&allowManualAcknowledgement=true" + 
"&ackTimeoutMillis=1000"
+                    + "&readCompacted=true"
+                    + "&negativeAckRedeliveryDelayMicros=100000")
+    private Endpoint from;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    private Producer<String> producer;
+
+    @BeforeEach
+    public void setup() throws Exception {
+        context.removeRoute("myRoute");
+        producer = 
givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create();
+    }
+
+    @Override
+    protected Registry createCamelRegistry() throws Exception {
+        Registry registry = new SimpleRegistry();
+
+        registerPulsarBeans(registry);
+
+        return registry;
+    }
+
+    private void registerPulsarBeans(final Registry registry) throws 
PulsarClientException {
+        PulsarClient pulsarClient = givenPulsarClient();
+        AutoConfiguration autoConfiguration = new AutoConfiguration(null, 
null);
+
+        registry.bind("pulsarClient", pulsarClient);
+        PulsarComponent comp = new PulsarComponent(context);
+        comp.setAutoConfiguration(autoConfiguration);
+        comp.setPulsarClient(pulsarClient);
+        registry.bind("pulsar", comp);
+    }
+
+    private PulsarClient givenPulsarClient() throws PulsarClientException {
+        return new 
ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build();
+    }
+
+    private PulsarAdmin givenPulsarAdmin() throws PulsarClientException {
+        return new 
PulsarAdminBuilderImpl().serviceHttpUrl(getPulsarAdminUrl()).build();
+    }
+
+    private void triggerCompaction() throws PulsarAdminException, 
PulsarClientException {
+        final Topics topics = givenPulsarAdmin().topics();
+
+        topics.triggerCompaction(TOPIC_URI);
+        while 
(!topics.compactionStatus(TOPIC_URI).status.equals(LongRunningProcessStatus.Status.RUNNING))
 {
+            LOGGER.info("Waiting for compaction completeness...");
+            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+        }
+    }
+
+    @Test
+    public void testReadCompacted() throws Exception {
+        to.expectedMessageCount(1);
+        triggerCompaction();
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() {
+                from(from).routeId("myRoute").to(to).process(exchange -> {
+                    LOGGER.info("Processing message {}", 
exchange.getIn().getBody());
+
+                    PulsarMessageReceipt receipt
+                            = (PulsarMessageReceipt) 
exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+                    receipt.acknowledge();
+                });
+            }
+        });
+
+        producer.newMessage().key("myKey").value("Hello World!").send();
+        producer.newMessage().key("myKey").value("Hello World! Again!").send();
+
+        MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
+    }
+
+    @Test
+    public void testReadNotCompacted() throws Exception {
+        to.expectedMessageCount(2);
+        triggerCompaction();
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() {
+                from(from).routeId("myRoute").to(to).process(exchange -> {
+                    LOGGER.info("Processing message {}", 
exchange.getIn().getBody());
+
+                    PulsarMessageReceipt receipt
+                            = (PulsarMessageReceipt) 
exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+                    receipt.acknowledge();
+                });
+            }
+        });
+
+        producer.newMessage().key("mySecondKey").value("Hello World!").send();
+        producer.newMessage().key("mySecondKey").value("Hello World! 
Again!").send();
+
+        MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
+    }
+}
diff --git 
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/PulsarComponentBuilderFactory.java
 
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/PulsarComponentBuilderFactory.java
index 02fc71e..de3ac84 100644
--- 
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/PulsarComponentBuilderFactory.java
+++ 
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/PulsarComponentBuilderFactory.java
@@ -220,6 +220,18 @@ public interface PulsarComponentBuilderFactory {
             return this;
         }
         /**
+         * Enable compacted topic reading.
+         * 
+         * The option is a: <code>boolean</code> type.
+         * 
+         * Default: false
+         * Group: consumer
+         */
+        default PulsarComponentBuilder readCompacted(boolean readCompacted) {
+            doSetProperty("readCompacted", readCompacted);
+            return this;
+        }
+        /**
          * Control the initial position in the topic of a newly created
          * subscription. Default is latest message.
          * 
@@ -578,6 +590,7 @@ public interface PulsarComponentBuilderFactory {
             case "maxRedeliverCount": 
getOrCreateConfiguration((PulsarComponent) 
component).setMaxRedeliverCount((java.lang.Integer) value); return true;
             case "negativeAckRedeliveryDelayMicros": 
getOrCreateConfiguration((PulsarComponent) 
component).setNegativeAckRedeliveryDelayMicros((long) value); return true;
             case "numberOfConsumers": 
getOrCreateConfiguration((PulsarComponent) 
component).setNumberOfConsumers((int) value); return true;
+            case "readCompacted": getOrCreateConfiguration((PulsarComponent) 
component).setReadCompacted((boolean) value); return true;
             case "subscriptionInitialPosition": 
getOrCreateConfiguration((PulsarComponent) 
component).setSubscriptionInitialPosition((org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition)
 value); return true;
             case "subscriptionName": 
getOrCreateConfiguration((PulsarComponent) 
component).setSubscriptionName((java.lang.String) value); return true;
             case "subscriptionTopicsMode": 
getOrCreateConfiguration((PulsarComponent) 
component).setSubscriptionTopicsMode((org.apache.pulsar.client.api.RegexSubscriptionMode)
 value); return true;
diff --git 
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
 
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
index ce57097..fa79a01 100644
--- 
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
+++ 
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
@@ -319,6 +319,31 @@ public interface PulsarEndpointBuilderFactory {
             return this;
         }
         /**
+         * Enable compacted topic reading.
+         * 
+         * The option is a: <code>boolean</code> type.
+         * 
+         * Default: false
+         * Group: consumer
+         */
+        default PulsarEndpointConsumerBuilder readCompacted(
+                boolean readCompacted) {
+            doSetProperty("readCompacted", readCompacted);
+            return this;
+        }
+        /**
+         * Enable compacted topic reading.
+         * 
+         * The option will be converted to a <code>boolean</code> type.
+         * 
+         * Default: false
+         * Group: consumer
+         */
+        default PulsarEndpointConsumerBuilder readCompacted(String 
readCompacted) {
+            doSetProperty("readCompacted", readCompacted);
+            return this;
+        }
+        /**
          * Control the initial position in the topic of a newly created
          * subscription. Default is latest message.
          * 
diff --git a/docs/components/modules/ROOT/pages/pulsar-component.adoc 
b/docs/components/modules/ROOT/pages/pulsar-component.adoc
index dd2c35b..0076543 100644
--- a/docs/components/modules/ROOT/pages/pulsar-component.adoc
+++ b/docs/components/modules/ROOT/pages/pulsar-component.adoc
@@ -38,7 +38,7 @@ pulsar:[persistent|non-persistent]://tenant/namespace/topic
 
 
 // component options: START
-The Pulsar component supports 35 options, which are listed below.
+The Pulsar component supports 36 options, which are listed below.
 
 
 
@@ -57,6 +57,7 @@ The Pulsar component supports 35 options, which are listed 
below.
 | *maxRedeliverCount* (consumer) | Maximum number of times that a message will 
be redelivered before being sent to the dead letter queue. If this value is not 
set, no Dead Letter Policy will be created |  | Integer
 | *negativeAckRedeliveryDelay{zwsp}Micros* (consumer) | Set the negative 
acknowledgement delay | 60000000 | long
 | *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 | 
int
+| *readCompacted* (consumer) | Enable compacted topic reading. | false | 
boolean
 | *subscriptionInitialPosition* (consumer) | Control the initial position in 
the topic of a newly created subscription. Default is latest message. There are 
2 enums and the value can be one of: EARLIEST, LATEST | LATEST | 
SubscriptionInitialPosition
 | *subscriptionName* (consumer) | Name of the subscription to use | subs | 
String
 | *subscriptionTopicsMode* (consumer) | Determines to which topics this 
consumer should be subscribed to - Persistent, Non-Persistent, or both. Only 
used with pattern subscriptions. There are 3 enums and the value can be one of: 
PersistentOnly, NonPersistentOnly, AllTopics | PersistentOnly | 
RegexSubscriptionMode
@@ -109,7 +110,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (34 parameters):
+=== Query Parameters (35 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -126,6 +127,7 @@ with the following path and query parameters:
 | *maxRedeliverCount* (consumer) | Maximum number of times that a message will 
be redelivered before being sent to the dead letter queue. If this value is not 
set, no Dead Letter Policy will be created |  | Integer
 | *negativeAckRedeliveryDelay{zwsp}Micros* (consumer) | Set the negative 
acknowledgement delay | 60000000 | long
 | *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 | 
int
+| *readCompacted* (consumer) | Enable compacted topic reading. | false | 
boolean
 | *subscriptionInitialPosition* (consumer) | Control the initial position in 
the topic of a newly created subscription. Default is latest message. There are 
2 enums and the value can be one of: EARLIEST, LATEST | LATEST | 
SubscriptionInitialPosition
 | *subscriptionName* (consumer) | Name of the subscription to use | subs | 
String
 | *subscriptionTopicsMode* (consumer) | Determines to which topics this 
consumer should be subscribed to - Persistent, Non-Persistent, or both. Only 
used with pattern subscriptions. There are 3 enums and the value can be one of: 
PersistentOnly, NonPersistentOnly, AllTopics | PersistentOnly | 
RegexSubscriptionMode

Reply via email to