This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch CAMEL-23142-4.18.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 7f8bcf8e0956c67a4616e210d6d427c3ac531f28 Author: Andrea Cosentino <[email protected]> AuthorDate: Fri Mar 6 17:05:29 2026 +0100 CAMEL-23142 - camel-google-pubsub: Add maxDeliveryAttempts enforcement to prevent infinite redelivery loops Signed-off-by: Andrea Cosentino <[email protected]> --- .../camel/catalog/components/google-pubsub.json | 25 ++--- .../pubsub/GooglePubsubEndpointConfigurer.java | 6 ++ .../pubsub/GooglePubsubEndpointUriFactory.java | 3 +- .../component/google/pubsub/google-pubsub.json | 25 ++--- .../src/main/docs/google-pubsub-component.adoc | 27 +++++ .../google/pubsub/GooglePubsubComponent.java | 15 +++ .../google/pubsub/GooglePubsubConsumer.java | 56 ++++++++++ .../google/pubsub/GooglePubsubEndpoint.java | 24 +++++ .../pubsub/consumer/CamelMessageReceiver.java | 9 ++ .../pubsub/integration/MaxDeliveryAttemptsIT.java | 115 +++++++++++++++++++++ .../PubsubEndpointMaxDeliveryAttemptsTest.java | 77 ++++++++++++++ .../dsl/GooglePubsubEndpointBuilderFactory.java | 46 +++++++++ 12 files changed, 403 insertions(+), 25 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json index 00b807df40db..ac37c22f8741 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json @@ -54,17 +54,18 @@ "exceptionHandler": { "index": 5, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By def [...] "exchangePattern": { "index": 6, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, "maxAckExtensionPeriod": { "index": 7, "kind": "parameter", "displayName": "Max Ack Extension Period", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 3600, "description": "Set the maximum period a message ack deadline will be extended. Value in seconds" }, - "maxMessagesPerPoll": { "index": 8, "kind": "parameter", "displayName": "Max Messages Per Poll", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "description": "The max number of messages to receive from the server in a single API call" }, - "synchronousPull": { "index": 9, "kind": "parameter", "displayName": "Synchronous Pull", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Synchronously pull batches of messages" }, - "lazyStartProducer": { "index": 10, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] - "messageOrderingEnabled": { "index": 11, "kind": "parameter", "displayName": "Message Ordering Enabled", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Should message ordering be enabled" }, - "pubsubEndpoint": { "index": 12, "kind": "parameter", "displayName": "Pubsub Endpoint", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Pub\/Sub endpoint to use. Required when using message ordering, and ensures that messages are received in order even when multiple publishers are used" }, - "retry": { "index": 13, "kind": "parameter", "displayName": "Retry", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "object", "javaType": "com.google.api.gax.retrying.RetrySettings", "deprecated": false, "autowired": false, "secret": false, "description": "A custom RetrySettings to control how the publisher handles retry-able failures" }, - "serializer": { "index": 14, "kind": "parameter", "displayName": "Serializer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.google.pubsub.serializer.GooglePubsubSerializer", "deprecated": false, "deprecationNote": "", "autowired": true, "secret": false, "description": "A custom GooglePubsubSerializer to use for serializing message payloads in the producer" }, - "headerFilterStrategy": { "index": 15, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom HeaderFilterStrategy to filter headers to and from Camel message." }, - "includeAllGoogleProperties": { "index": 16, "kind": "parameter", "displayName": "Include All Google Properties", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to include all Google headers when mapping from Pubsub to Camel Message. Setting this to true will include properties such as x-goog etc." }, - "loggerId": { "index": 17, "kind": "parameter", "displayName": "Logger Id", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": true, "autowired": false, "secret": false, "description": "To use a custom logger name" }, - "authenticate": { "index": 18, "kind": "parameter", "displayName": "Authenticate", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Use Credentials when interacting with PubSub service (no authentication is required when using emulator)." }, - "serviceAccountKey": { "index": 19, "kind": "parameter", "displayName": "Service Account Key", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The Service account key that can be used as credentials for the PubSub publisher\/subscriber. It can be loaded by default from classpath, but you can prefix with classpath:, file:, or http: to load the resour [...] + "maxDeliveryAttempts": { "index": 8, "kind": "parameter", "displayName": "Max Delivery Attempts", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 0, "description": "The maximum number of delivery attempts for each message. When set to a positive value, the consumer will automatically nack any message whose delivery attempt count is greater t [...] + "maxMessagesPerPoll": { "index": 9, "kind": "parameter", "displayName": "Max Messages Per Poll", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "description": "The max number of messages to receive from the server in a single API call" }, + "synchronousPull": { "index": 10, "kind": "parameter", "displayName": "Synchronous Pull", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Synchronously pull batches of messages" }, + "lazyStartProducer": { "index": 11, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] + "messageOrderingEnabled": { "index": 12, "kind": "parameter", "displayName": "Message Ordering Enabled", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Should message ordering be enabled" }, + "pubsubEndpoint": { "index": 13, "kind": "parameter", "displayName": "Pubsub Endpoint", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Pub\/Sub endpoint to use. Required when using message ordering, and ensures that messages are received in order even when multiple publishers are used" }, + "retry": { "index": 14, "kind": "parameter", "displayName": "Retry", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "object", "javaType": "com.google.api.gax.retrying.RetrySettings", "deprecated": false, "autowired": false, "secret": false, "description": "A custom RetrySettings to control how the publisher handles retry-able failures" }, + "serializer": { "index": 15, "kind": "parameter", "displayName": "Serializer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.google.pubsub.serializer.GooglePubsubSerializer", "deprecated": false, "deprecationNote": "", "autowired": true, "secret": false, "description": "A custom GooglePubsubSerializer to use for serializing message payloads in the producer" }, + "headerFilterStrategy": { "index": 16, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom HeaderFilterStrategy to filter headers to and from Camel message." }, + "includeAllGoogleProperties": { "index": 17, "kind": "parameter", "displayName": "Include All Google Properties", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to include all Google headers when mapping from Pubsub to Camel Message. Setting this to true will include properties such as x-goog etc." }, + "loggerId": { "index": 18, "kind": "parameter", "displayName": "Logger Id", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": true, "autowired": false, "secret": false, "description": "To use a custom logger name" }, + "authenticate": { "index": 19, "kind": "parameter", "displayName": "Authenticate", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Use Credentials when interacting with PubSub service (no authentication is required when using emulator)." }, + "serviceAccountKey": { "index": 20, "kind": "parameter", "displayName": "Service Account Key", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The Service account key that can be used as credentials for the PubSub publisher\/subscriber. It can be loaded by default from classpath, but you can prefix with classpath:, file:, or http: to load the resour [...] } } diff --git a/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java b/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java index 6f60fab7267e..131534da3d08 100644 --- a/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java +++ b/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java @@ -44,6 +44,8 @@ public class GooglePubsubEndpointConfigurer extends PropertyConfigurerSupport im case "loggerId": target.setLoggerId(property(camelContext, java.lang.String.class, value)); return true; case "maxackextensionperiod": case "maxAckExtensionPeriod": target.setMaxAckExtensionPeriod(property(camelContext, int.class, value)); return true; + case "maxdeliveryattempts": + case "maxDeliveryAttempts": target.setMaxDeliveryAttempts(property(camelContext, int.class, value)); return true; case "maxmessagesperpoll": case "maxMessagesPerPoll": target.setMaxMessagesPerPoll(property(camelContext, java.lang.Integer.class, value)); return true; case "messageorderingenabled": @@ -89,6 +91,8 @@ public class GooglePubsubEndpointConfigurer extends PropertyConfigurerSupport im case "loggerId": return java.lang.String.class; case "maxackextensionperiod": case "maxAckExtensionPeriod": return int.class; + case "maxdeliveryattempts": + case "maxDeliveryAttempts": return int.class; case "maxmessagesperpoll": case "maxMessagesPerPoll": return java.lang.Integer.class; case "messageorderingenabled": @@ -130,6 +134,8 @@ public class GooglePubsubEndpointConfigurer extends PropertyConfigurerSupport im case "loggerId": return target.getLoggerId(); case "maxackextensionperiod": case "maxAckExtensionPeriod": return target.getMaxAckExtensionPeriod(); + case "maxdeliveryattempts": + case "maxDeliveryAttempts": return target.getMaxDeliveryAttempts(); case "maxmessagesperpoll": case "maxMessagesPerPoll": return target.getMaxMessagesPerPoll(); case "messageorderingenabled": diff --git a/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointUriFactory.java b/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointUriFactory.java index 2649a9a6eef3..61b8d3cbb091 100644 --- a/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointUriFactory.java +++ b/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointUriFactory.java @@ -23,7 +23,7 @@ public class GooglePubsubEndpointUriFactory extends org.apache.camel.support.com private static final Set<String> SECRET_PROPERTY_NAMES; private static final Map<String, String> MULTI_VALUE_PREFIXES; static { - Set<String> props = new HashSet<>(20); + Set<String> props = new HashSet<>(21); props.add("ackMode"); props.add("authenticate"); props.add("bridgeErrorHandler"); @@ -36,6 +36,7 @@ public class GooglePubsubEndpointUriFactory extends org.apache.camel.support.com props.add("lazyStartProducer"); props.add("loggerId"); props.add("maxAckExtensionPeriod"); + props.add("maxDeliveryAttempts"); props.add("maxMessagesPerPoll"); props.add("messageOrderingEnabled"); props.add("projectId"); diff --git a/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json b/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json index 00b807df40db..ac37c22f8741 100644 --- a/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json +++ b/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json @@ -54,17 +54,18 @@ "exceptionHandler": { "index": 5, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By def [...] "exchangePattern": { "index": 6, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, "maxAckExtensionPeriod": { "index": 7, "kind": "parameter", "displayName": "Max Ack Extension Period", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 3600, "description": "Set the maximum period a message ack deadline will be extended. Value in seconds" }, - "maxMessagesPerPoll": { "index": 8, "kind": "parameter", "displayName": "Max Messages Per Poll", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "description": "The max number of messages to receive from the server in a single API call" }, - "synchronousPull": { "index": 9, "kind": "parameter", "displayName": "Synchronous Pull", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Synchronously pull batches of messages" }, - "lazyStartProducer": { "index": 10, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] - "messageOrderingEnabled": { "index": 11, "kind": "parameter", "displayName": "Message Ordering Enabled", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Should message ordering be enabled" }, - "pubsubEndpoint": { "index": 12, "kind": "parameter", "displayName": "Pubsub Endpoint", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Pub\/Sub endpoint to use. Required when using message ordering, and ensures that messages are received in order even when multiple publishers are used" }, - "retry": { "index": 13, "kind": "parameter", "displayName": "Retry", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "object", "javaType": "com.google.api.gax.retrying.RetrySettings", "deprecated": false, "autowired": false, "secret": false, "description": "A custom RetrySettings to control how the publisher handles retry-able failures" }, - "serializer": { "index": 14, "kind": "parameter", "displayName": "Serializer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.google.pubsub.serializer.GooglePubsubSerializer", "deprecated": false, "deprecationNote": "", "autowired": true, "secret": false, "description": "A custom GooglePubsubSerializer to use for serializing message payloads in the producer" }, - "headerFilterStrategy": { "index": 15, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom HeaderFilterStrategy to filter headers to and from Camel message." }, - "includeAllGoogleProperties": { "index": 16, "kind": "parameter", "displayName": "Include All Google Properties", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to include all Google headers when mapping from Pubsub to Camel Message. Setting this to true will include properties such as x-goog etc." }, - "loggerId": { "index": 17, "kind": "parameter", "displayName": "Logger Id", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": true, "autowired": false, "secret": false, "description": "To use a custom logger name" }, - "authenticate": { "index": 18, "kind": "parameter", "displayName": "Authenticate", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Use Credentials when interacting with PubSub service (no authentication is required when using emulator)." }, - "serviceAccountKey": { "index": 19, "kind": "parameter", "displayName": "Service Account Key", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The Service account key that can be used as credentials for the PubSub publisher\/subscriber. It can be loaded by default from classpath, but you can prefix with classpath:, file:, or http: to load the resour [...] + "maxDeliveryAttempts": { "index": 8, "kind": "parameter", "displayName": "Max Delivery Attempts", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 0, "description": "The maximum number of delivery attempts for each message. When set to a positive value, the consumer will automatically nack any message whose delivery attempt count is greater t [...] + "maxMessagesPerPoll": { "index": 9, "kind": "parameter", "displayName": "Max Messages Per Poll", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "description": "The max number of messages to receive from the server in a single API call" }, + "synchronousPull": { "index": 10, "kind": "parameter", "displayName": "Synchronous Pull", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Synchronously pull batches of messages" }, + "lazyStartProducer": { "index": 11, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] + "messageOrderingEnabled": { "index": 12, "kind": "parameter", "displayName": "Message Ordering Enabled", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Should message ordering be enabled" }, + "pubsubEndpoint": { "index": 13, "kind": "parameter", "displayName": "Pubsub Endpoint", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Pub\/Sub endpoint to use. Required when using message ordering, and ensures that messages are received in order even when multiple publishers are used" }, + "retry": { "index": 14, "kind": "parameter", "displayName": "Retry", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "object", "javaType": "com.google.api.gax.retrying.RetrySettings", "deprecated": false, "autowired": false, "secret": false, "description": "A custom RetrySettings to control how the publisher handles retry-able failures" }, + "serializer": { "index": 15, "kind": "parameter", "displayName": "Serializer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.google.pubsub.serializer.GooglePubsubSerializer", "deprecated": false, "deprecationNote": "", "autowired": true, "secret": false, "description": "A custom GooglePubsubSerializer to use for serializing message payloads in the producer" }, + "headerFilterStrategy": { "index": 16, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom HeaderFilterStrategy to filter headers to and from Camel message." }, + "includeAllGoogleProperties": { "index": 17, "kind": "parameter", "displayName": "Include All Google Properties", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to include all Google headers when mapping from Pubsub to Camel Message. Setting this to true will include properties such as x-goog etc." }, + "loggerId": { "index": 18, "kind": "parameter", "displayName": "Logger Id", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": true, "autowired": false, "secret": false, "description": "To use a custom logger name" }, + "authenticate": { "index": 19, "kind": "parameter", "displayName": "Authenticate", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Use Credentials when interacting with PubSub service (no authentication is required when using emulator)." }, + "serviceAccountKey": { "index": 20, "kind": "parameter", "displayName": "Service Account Key", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The Service account key that can be used as credentials for the PubSub publisher\/subscriber. It can be loaded by default from classpath, but you can prefix with classpath:, file:, or http: to load the resour [...] } } diff --git a/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc b/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc index 68ab8eb44d91..d795cf6c2101 100644 --- a/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc +++ b/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc @@ -116,6 +116,33 @@ from("google-pubsub:{{project.name}}:{{subscription.name}}") With a dead-letter policy, after the configured maximum delivery attempts are exceeded, the message will automatically be forwarded to the dead-letter topic by Google PubSub. +==== Automatic Max Delivery Attempts Enforcement + +The component can enforce the subscription's `maxDeliveryAttempts` setting at the consumer level. +When enabled, messages whose delivery attempt count is greater than or equal to the configured maximum +will be automatically nacked without processing, allowing Pub/Sub to route them to the dead-letter topic. +This prevents infinite redelivery loops that can occur with short retry delays. + +The `maxDeliveryAttempts` value is resolved as follows: + +1. If explicitly set via the endpoint option, that value is used. +2. If not explicitly set, the component attempts to auto-fetch the value from the subscription's + dead-letter policy at consumer startup. +3. If auto-fetch fails (e.g., insufficient permissions or no dead-letter policy), enforcement is + disabled and a warning is logged. +4. A value of `0` disables enforcement. + +[source,java] +---- +// Explicit configuration +from("google-pubsub:{{project.name}}:{{subscription.name}}?maxDeliveryAttempts=5") + .to("direct:process"); + +// Auto-fetch from subscription dead-letter policy (default behavior when not set) +from("google-pubsub:{{project.name}}:{{subscription.name}}") + .to("direct:process"); +---- + === Message Body The consumer endpoint returns the content of the message as `byte[]`. Exactly as the underlying system sends it. diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java index fd2fe9c00dff..9a98f73413e2 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java @@ -37,6 +37,8 @@ import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.cloud.pubsub.v1.Publisher; import com.google.cloud.pubsub.v1.Subscriber; +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; import com.google.cloud.pubsub.v1.stub.PublisherStubSettings; import com.google.cloud.pubsub.v1.stub.SubscriberStub; import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings; @@ -215,6 +217,19 @@ public class GooglePubsubComponent extends HeaderFilterStrategyComponent { return builder.build().createStub(); } + public SubscriptionAdminClient getSubscriptionAdminClient(GooglePubsubEndpoint googlePubsubEndpoint) throws IOException { + SubscriptionAdminSettings.Builder builder = SubscriptionAdminSettings.newBuilder(); + + if (StringHelper.trimToNull(endpoint) != null) { + ManagedChannel channel = ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build(); + TransportChannelProvider channelProvider + = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)); + builder.setTransportChannelProvider(channelProvider); + } + builder.setCredentialsProvider(getCredentialsProvider(googlePubsubEndpoint)); + return SubscriptionAdminClient.create(builder.build()); + } + private CredentialsProvider getCredentialsProvider(GooglePubsubEndpoint endpoint) throws IOException { CredentialsProvider credentialsProvider; diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java index ef40e7605580..ab01edd20f21 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java @@ -32,13 +32,16 @@ import com.google.api.core.ApiFuture; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.cloud.pubsub.v1.Subscriber; +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; import com.google.cloud.pubsub.v1.stub.SubscriberStub; import com.google.common.base.Strings; +import com.google.pubsub.v1.DeadLetterPolicy; import com.google.pubsub.v1.ProjectSubscriptionName; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.PullRequest; import com.google.pubsub.v1.PullResponse; import com.google.pubsub.v1.ReceivedMessage; +import com.google.pubsub.v1.Subscription; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.component.google.pubsub.consumer.AcknowledgeCompletion; @@ -64,6 +67,7 @@ public class GooglePubsubConsumer extends DefaultConsumer { private final List<Subscriber> subscribers; private final Set<ApiFuture<PullResponse>> pendingSynchronousPullResponses; private final HeaderFilterStrategy headerFilterStrategy; + private int resolvedMaxDeliveryAttempts; GooglePubsubConsumer(GooglePubsubEndpoint endpoint, Processor processor) { super(endpoint, processor); @@ -86,6 +90,13 @@ public class GooglePubsubConsumer extends DefaultConsumer { super.doStart(); localLog.info("Starting Google PubSub consumer for {}/{}", endpoint.getProjectId(), endpoint.getDestinationName()); + + resolvedMaxDeliveryAttempts = resolveMaxDeliveryAttempts(); + if (resolvedMaxDeliveryAttempts > 0) { + localLog.info("Max delivery attempts enforcement enabled: {} for subscription {}", + resolvedMaxDeliveryAttempts, endpoint.getDestinationName()); + } + executor = endpoint.createExecutor(this); for (int i = 0; i < endpoint.getConcurrentConsumers(); i++) { executor.submit(new SubscriberWrapper()); @@ -128,6 +139,40 @@ public class GooglePubsubConsumer extends DefaultConsumer { pendingSynchronousPullResponses.clear(); } + private int resolveMaxDeliveryAttempts() { + if (endpoint.isMaxDeliveryAttemptsExplicitlySet()) { + localLog.debug("Using explicitly configured maxDeliveryAttempts: {}", endpoint.getMaxDeliveryAttempts()); + return endpoint.getMaxDeliveryAttempts(); + } + + String subscriptionName = ProjectSubscriptionName.format(endpoint.getProjectId(), endpoint.getDestinationName()); + try (SubscriptionAdminClient adminClient = endpoint.getComponent().getSubscriptionAdminClient(endpoint)) { + Subscription subscription = adminClient.getSubscription(subscriptionName); + if (subscription.hasDeadLetterPolicy()) { + DeadLetterPolicy dlp = subscription.getDeadLetterPolicy(); + int maxAttempts = dlp.getMaxDeliveryAttempts(); + if (maxAttempts > 0) { + localLog.info("Auto-fetched maxDeliveryAttempts={} from subscription dead-letter policy for {}", + maxAttempts, endpoint.getDestinationName()); + return maxAttempts; + } + } + localLog.debug("No dead-letter policy found on subscription {}, maxDeliveryAttempts enforcement disabled", + endpoint.getDestinationName()); + } catch (Exception e) { + localLog.warn("Failed to auto-fetch maxDeliveryAttempts from subscription {}: {}. " + + "Max delivery attempts enforcement will be disabled. " + + "Set the maxDeliveryAttempts endpoint option explicitly to enable enforcement.", + endpoint.getDestinationName(), e.getMessage()); + localLog.debug("Auto-fetch failure details", e); + } + return 0; + } + + public int getResolvedMaxDeliveryAttempts() { + return resolvedMaxDeliveryAttempts; + } + private class SubscriberWrapper implements Runnable { private final String subscriptionName; @@ -259,6 +304,17 @@ public class GooglePubsubConsumer extends DefaultConsumer { exchange.getIn().setHeader(GooglePubsubConstants.DELIVERY_ATTEMPT, deliveryAttempt); } + // Enforce maxDeliveryAttempts: nack without processing if limit reached + if (resolvedMaxDeliveryAttempts > 0 && deliveryAttempt >= resolvedMaxDeliveryAttempts) { + localLog.info( + "Message {} has reached max delivery attempts ({}/{}), nacking to route to dead-letter topic", + pubsubMessage.getMessageId(), deliveryAttempt, resolvedMaxDeliveryAttempts); + GooglePubsubAcknowledge ack = new AcknowledgeSync( + () -> endpoint.getComponent().getSubscriberStub(endpoint), subscriptionName); + ack.nack(exchange); + continue; + } + //existing subscriber can not be propagated, because it will be closed at the end of this block //subscriber will be created at the moment of use // (see https://issues.apache.org/jira/browse/CAMEL-18447) diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java index 2fdec64f2e7c..f0000a0eb7c8 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java @@ -81,6 +81,17 @@ public class GooglePubsubEndpoint extends DefaultEndpoint implements EndpointSer description = "Set the maximum period a message ack deadline will be extended. Value in seconds", defaultValue = "3600") private int maxAckExtensionPeriod = 3600; + @UriParam(label = "consumer,advanced", name = "maxDeliveryAttempts", + description = "The maximum number of delivery attempts for each message. " + + "When set to a positive value, the consumer will automatically nack any message whose delivery attempt count " + + "is greater than or equal to this value, allowing Pub/Sub to route it to the dead-letter topic " + + "without processing it. This prevents infinite redelivery loops when short retry delays are configured. " + + "If not explicitly set and the subscription has a dead-letter policy, " + + "the value is automatically fetched from the subscription configuration at consumer startup. " + + "Set to 0 to disable enforcement.", + defaultValue = "0") + private int maxDeliveryAttempts; + private boolean maxDeliveryAttemptsExplicitlySet; @UriParam(label = "producer,advanced", description = "Should message ordering be enabled") private boolean messageOrderingEnabled; @@ -238,6 +249,19 @@ public class GooglePubsubEndpoint extends DefaultEndpoint implements EndpointSer this.maxAckExtensionPeriod = maxAckExtensionPeriod; } + public int getMaxDeliveryAttempts() { + return maxDeliveryAttempts; + } + + public void setMaxDeliveryAttempts(int maxDeliveryAttempts) { + this.maxDeliveryAttempts = maxDeliveryAttempts; + this.maxDeliveryAttemptsExplicitlySet = true; + } + + public boolean isMaxDeliveryAttemptsExplicitlySet() { + return maxDeliveryAttemptsExplicitlySet; + } + public GooglePubsubSerializer getSerializer() { return serializer; } diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java index 0967baa5a164..c2240b31c24b 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java @@ -75,6 +75,15 @@ public class CamelMessageReceiver implements MessageReceiver { exchange.getIn().setHeader(GooglePubsubConstants.DELIVERY_ATTEMPT, deliveryAttempt); } + // Enforce maxDeliveryAttempts: nack without processing if limit reached + int maxDeliveryAttempts = consumer.getResolvedMaxDeliveryAttempts(); + if (maxDeliveryAttempts > 0 && deliveryAttempt != null && deliveryAttempt >= maxDeliveryAttempts) { + localLog.info("Message {} has reached max delivery attempts ({}/{}), nacking to route to dead-letter topic", + pubsubMessage.getMessageId(), deliveryAttempt, maxDeliveryAttempts); + ackReplyConsumer.nack(); + return; + } + GooglePubsubAcknowledge acknowledge = new AcknowledgeAsync(ackReplyConsumer); if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) { exchange.getExchangeExtension().addOnCompletion(new AcknowledgeCompletion(acknowledge)); diff --git a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/MaxDeliveryAttemptsIT.java b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/MaxDeliveryAttemptsIT.java new file mode 100644 index 000000000000..3bfcedb3718b --- /dev/null +++ b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/MaxDeliveryAttemptsIT.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.pubsub.integration; + +import com.google.pubsub.v1.DeadLetterPolicy; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.Topic; +import com.google.pubsub.v1.TopicName; +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.google.pubsub.GooglePubsubConsumer; +import org.apache.camel.component.google.pubsub.PubsubTestSupport; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.support.DefaultExchange; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class MaxDeliveryAttemptsIT extends PubsubTestSupport { + + private static final String TOPIC_NAME = "camel.max-delivery-topic"; + private static final String SUBSCRIPTION_NAME = "camel.max-delivery-subscription"; + private static final String DLQ_TOPIC_NAME = "camel.max-delivery-dlq-topic"; + private static final String DLQ_SUBSCRIPTION_NAME = "camel.max-delivery-dlq-subscription"; + private static final int MAX_DELIVERY_ATTEMPTS = 5; + + // Consumer without explicit maxDeliveryAttempts - should auto-fetch from subscription + @EndpointInject("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME) + private Endpoint autoFetchSubscription; + + @EndpointInject("mock:auto-fetch-result") + private MockEndpoint autoFetchResult; + + @Produce("google-pubsub:{{project.id}}:" + TOPIC_NAME) + private ProducerTemplate producer; + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from(autoFetchSubscription) + .routeId("auto-fetch-consumer") + .to(autoFetchResult); + } + }; + } + + @Override + public void createTopicSubscription() { + TopicName projectTopicName = TopicName.of(PROJECT_ID, TOPIC_NAME); + TopicName projectDlqTopicName = TopicName.of(PROJECT_ID, DLQ_TOPIC_NAME); + ProjectSubscriptionName projectSubscriptionName = ProjectSubscriptionName.of(PROJECT_ID, SUBSCRIPTION_NAME); + ProjectSubscriptionName projectDlqSubscriptionName = ProjectSubscriptionName.of(PROJECT_ID, DLQ_SUBSCRIPTION_NAME); + + Topic topic = Topic.newBuilder().setName(projectTopicName.toString()).build(); + Topic dlqTopic = Topic.newBuilder().setName(projectDlqTopicName.toString()).build(); + + Subscription subscription = Subscription.newBuilder() + .setName(projectSubscriptionName.toString()) + .setTopic(topic.getName()) + .setDeadLetterPolicy(DeadLetterPolicy.newBuilder() + .setDeadLetterTopic(dlqTopic.getName()) + .setMaxDeliveryAttempts(MAX_DELIVERY_ATTEMPTS).build()) + .build(); + Subscription dlqSubscription = Subscription.newBuilder() + .setName(projectDlqSubscriptionName.toString()) + .setTopic(dlqTopic.getName()) + .build(); + + createTopicSubscriptionPair(dlqTopic, dlqSubscription); + createTopicSubscriptionPair(topic, subscription); + } + + /** + * Tests that the consumer auto-fetches maxDeliveryAttempts from the subscription's dead-letter policy at startup + * and that messages are processed normally when the delivery attempt is below the threshold. On first delivery the + * delivery attempt is either not set or 1, which is below maxDeliveryAttempts=5. + */ + @Test + public void testAutoFetchMaxDeliveryAttemptsAndProcessBelowThreshold() throws Exception { + // Verify auto-fetch resolved the correct value + GooglePubsubConsumer consumer + = (GooglePubsubConsumer) context.getRoute("auto-fetch-consumer").getConsumer(); + assertEquals(MAX_DELIVERY_ATTEMPTS, consumer.getResolvedMaxDeliveryAttempts(), + "Consumer should auto-fetch maxDeliveryAttempts from subscription dead-letter policy"); + + // Verify messages are processed normally when below the threshold + autoFetchResult.expectedMessageCount(1); + + Exchange exchange = new DefaultExchange(context); + exchange.getIn().setBody("test message"); + producer.send(exchange); + + autoFetchResult.assertIsSatisfied(5000); + } +} diff --git a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/unit/PubsubEndpointMaxDeliveryAttemptsTest.java b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/unit/PubsubEndpointMaxDeliveryAttemptsTest.java new file mode 100644 index 000000000000..78fae89145f5 --- /dev/null +++ b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/unit/PubsubEndpointMaxDeliveryAttemptsTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.pubsub.unit; + +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.google.pubsub.GooglePubsubEndpoint; +import org.apache.camel.component.google.pubsub.PubsubTestSupport; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class PubsubEndpointMaxDeliveryAttemptsTest extends PubsubTestSupport { + + private static final String SUBSCRIPTION_WITH_MAX = "test-max-delivery?maxDeliveryAttempts=5"; + private static final String SUBSCRIPTION_WITHOUT_MAX = "test-no-max-delivery"; + + @EndpointInject("google-pubsub://{{project.id}}:" + SUBSCRIPTION_WITH_MAX) + private Endpoint endpointWithMax; + + @EndpointInject("google-pubsub://{{project.id}}:" + SUBSCRIPTION_WITHOUT_MAX) + private Endpoint endpointWithoutMax; + + @Test + public void testMaxDeliveryAttemptsExplicitlySet() { + Endpoint endpoint + = context.hasEndpoint(String.format("google-pubsub:%s:%s", PROJECT_ID, SUBSCRIPTION_WITH_MAX)); + assertNotNull(endpoint); + + assertTrue(endpoint instanceof GooglePubsubEndpoint); + GooglePubsubEndpoint pubsubEndpoint = (GooglePubsubEndpoint) endpoint; + + assertEquals(5, pubsubEndpoint.getMaxDeliveryAttempts()); + assertTrue(pubsubEndpoint.isMaxDeliveryAttemptsExplicitlySet()); + } + + @Test + public void testMaxDeliveryAttemptsDefaultValue() { + Endpoint endpoint + = context.hasEndpoint(String.format("google-pubsub:%s:%s", PROJECT_ID, SUBSCRIPTION_WITHOUT_MAX)); + assertNotNull(endpoint); + + assertTrue(endpoint instanceof GooglePubsubEndpoint); + GooglePubsubEndpoint pubsubEndpoint = (GooglePubsubEndpoint) endpoint; + + assertEquals(0, pubsubEndpoint.getMaxDeliveryAttempts()); + assertFalse(pubsubEndpoint.isMaxDeliveryAttemptsExplicitlySet()); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from(endpointWithMax).to("direct:to1"); + from(endpointWithoutMax).to("direct:to2"); + } + }; + } +} diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java index 3763dcbeb773..0a54df2f5c16 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java @@ -317,6 +317,52 @@ public interface GooglePubsubEndpointBuilderFactory { doSetProperty("maxAckExtensionPeriod", maxAckExtensionPeriod); return this; } + /** + * The maximum number of delivery attempts for each message. When set to + * a positive value, the consumer will automatically nack any message + * whose delivery attempt count is greater than or equal to this value, + * allowing Pub/Sub to route it to the dead-letter topic without + * processing it. This prevents infinite redelivery loops when short + * retry delays are configured. If not explicitly set and the + * subscription has a dead-letter policy, the value is automatically + * fetched from the subscription configuration at consumer startup. Set + * to 0 to disable enforcement. + * + * The option is a: <code>int</code> type. + * + * Default: 0 + * Group: consumer (advanced) + * + * @param maxDeliveryAttempts the value to set + * @return the dsl builder + */ + default AdvancedGooglePubsubEndpointConsumerBuilder maxDeliveryAttempts(int maxDeliveryAttempts) { + doSetProperty("maxDeliveryAttempts", maxDeliveryAttempts); + return this; + } + /** + * The maximum number of delivery attempts for each message. When set to + * a positive value, the consumer will automatically nack any message + * whose delivery attempt count is greater than or equal to this value, + * allowing Pub/Sub to route it to the dead-letter topic without + * processing it. This prevents infinite redelivery loops when short + * retry delays are configured. If not explicitly set and the + * subscription has a dead-letter policy, the value is automatically + * fetched from the subscription configuration at consumer startup. Set + * to 0 to disable enforcement. + * + * The option will be converted to a <code>int</code> type. + * + * Default: 0 + * Group: consumer (advanced) + * + * @param maxDeliveryAttempts the value to set + * @return the dsl builder + */ + default AdvancedGooglePubsubEndpointConsumerBuilder maxDeliveryAttempts(String maxDeliveryAttempts) { + doSetProperty("maxDeliveryAttempts", maxDeliveryAttempts); + return this; + } /** * The max number of messages to receive from the server in a single API * call.
