This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 92df5d20cae CAMEL-16360: camel-pulsar - Add header to control sending message after a given time period. 92df5d20cae is described below commit 92df5d20caee856d3ca5231c7421b5b8c30da213 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Fri May 16 16:43:23 2025 +0200 CAMEL-16360: camel-pulsar - Add header to control sending message after a given time period. --- .../org/apache/camel/catalog/components/pulsar.json | 3 ++- .../META-INF/org/apache/camel/component/pulsar/pulsar.json | 3 ++- .../org/apache/camel/component/pulsar/PulsarProducer.java | 5 +++++ .../pulsar/utils/message/PulsarMessageHeaders.java | 3 +++ .../builder/endpoint/dsl/PulsarEndpointBuilderFactory.java | 13 +++++++++++++ 5 files changed, 25 insertions(+), 2 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pulsar.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pulsar.json index 5ef0f762841..e213c335e55 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pulsar.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pulsar.json @@ -88,7 +88,8 @@ "CamelPulsarProducerMessageProperties": { "index": 11, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Map<String, String>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The properties of the message to add.", "constantName": "org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders#PROPERTIES_OUT" }, "CamelPulsarProducerMessageEventTime": { "index": 12, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The event time of the message message.", "constantName": "org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders#EVENT_TIME_OUT" }, "CamelPulsarProducerMessageDeliverAt": { "index": 13, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Deliver the message only at or after the specified absolute timestamp. The timestamp is milliseconds and based on UTC (eg: System.currentTimeMillis) Note: messages are only delivered with delay when a consumer is consum [...] - "CamelPulsarRedeliveryCount": { "index": 14, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The message redelivery count, redelivery count maintain in pulsar broker.", "constantName": "org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders#PULSAR_REDELIVERY_COUNT" } + "CamelPulsarRedeliveryCount": { "index": 14, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The message redelivery count, redelivery count maintain in pulsar broker.", "constantName": "org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders#PULSAR_REDELIVERY_COUNT" }, + "CamelPulsarProducerMessageDeliverAfter": { "index": 15, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Deliver the message after a given delayed time (millis).", "constantName": "org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders#DELIVER_AFTER" } }, "properties": { "persistence": { "index": 0, "kind": "path", "displayName": "Persistence", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "enum": [ "persistent", "non-persistent" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Whether the topic is persistent or non-persistent" }, diff --git a/components/camel-pulsar/src/generated/resources/META-INF/org/apache/camel/component/pulsar/pulsar.json b/components/camel-pulsar/src/generated/resources/META-INF/org/apache/camel/component/pulsar/pulsar.json index 5ef0f762841..e213c335e55 100644 --- a/components/camel-pulsar/src/generated/resources/META-INF/org/apache/camel/component/pulsar/pulsar.json +++ b/components/camel-pulsar/src/generated/resources/META-INF/org/apache/camel/component/pulsar/pulsar.json @@ -88,7 +88,8 @@ "CamelPulsarProducerMessageProperties": { "index": 11, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Map<String, String>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The properties of the message to add.", "constantName": "org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders#PROPERTIES_OUT" }, "CamelPulsarProducerMessageEventTime": { "index": 12, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The event time of the message message.", "constantName": "org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders#EVENT_TIME_OUT" }, "CamelPulsarProducerMessageDeliverAt": { "index": 13, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Deliver the message only at or after the specified absolute timestamp. The timestamp is milliseconds and based on UTC (eg: System.currentTimeMillis) Note: messages are only delivered with delay when a consumer is consum [...] - "CamelPulsarRedeliveryCount": { "index": 14, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The message redelivery count, redelivery count maintain in pulsar broker.", "constantName": "org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders#PULSAR_REDELIVERY_COUNT" } + "CamelPulsarRedeliveryCount": { "index": 14, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The message redelivery count, redelivery count maintain in pulsar broker.", "constantName": "org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders#PULSAR_REDELIVERY_COUNT" }, + "CamelPulsarProducerMessageDeliverAfter": { "index": 15, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Deliver the message after a given delayed time (millis).", "constantName": "org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders#DELIVER_AFTER" } }, "properties": { "persistence": { "index": 0, "kind": "path", "displayName": "Persistence", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "enum": [ "persistent", "non-persistent" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Whether the topic is persistent or non-persistent" }, diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java index 00b095266c0..b83505d3b09 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java @@ -76,6 +76,11 @@ public class PulsarProducer extends DefaultAsyncProducer { messageBuilder.deliverAt(deliverAt); } + Long deliverAfter = exchange.getIn().getHeader(PulsarMessageHeaders.DELIVER_AFTER, Long.class); + if (deliverAfter != null) { + messageBuilder.deliverAfter(deliverAfter, TimeUnit.MILLISECONDS); + } + messageBuilder.sendAsync() .thenAccept(r -> exchange.getIn().setBody(r)) .whenComplete( diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java index cdbbc5ed417..9b5dbe29858 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java @@ -57,4 +57,7 @@ public interface PulsarMessageHeaders { @Metadata(label = "consumer", description = "The message redelivery count, redelivery count maintain in pulsar broker.", javaType = "int") String PULSAR_REDELIVERY_COUNT = "CamelPulsarRedeliveryCount"; + @Metadata(label = "producer", description = "Deliver the message after a given delayed time (millis).", + javaType = "Long") + String DELIVER_AFTER = "CamelPulsarProducerMessageDeliverAfter"; } diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java index 8feb096e24c..4228f641149 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java @@ -1762,6 +1762,19 @@ public interface PulsarEndpointBuilderFactory { public String pulsarRedeliveryCount() { return "CamelPulsarRedeliveryCount"; } + /** + * Deliver the message after a given delayed time (millis). + * + * The option is a: {@code Long} type. + * + * Group: producer + * + * @return the name of the header {@code + * PulsarProducerMessageDeliverAfter}. + */ + public String pulsarProducerMessageDeliverAfter() { + return "CamelPulsarProducerMessageDeliverAfter"; + } } static PulsarEndpointBuilder endpointBuilder(String componentName, String path) { class PulsarEndpointBuilderImpl extends AbstractEndpointBuilder implements PulsarEndpointBuilder, AdvancedPulsarEndpointBuilder {