This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new b2f7a3e7b82 CAMEL-21515: camel-google-pubsub - Add Support for Custom Retry Settings in Google Pub/Sub Publisher. Thanks to Sateesh Divvela for the patch. b2f7a3e7b82 is described below commit b2f7a3e7b8264f1d7846db76c97d93540a7cd574 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Dec 5 08:33:06 2024 +0100 CAMEL-21515: camel-google-pubsub - Add Support for Custom Retry Settings in Google Pub/Sub Publisher. Thanks to Sateesh Divvela for the patch. --- .../camel/catalog/components/google-pubsub.json | 3 +- .../pubsub/GooglePubsubEndpointConfigurer.java | 3 + .../pubsub/GooglePubsubEndpointUriFactory.java | 3 +- .../component/google/pubsub/google-pubsub.json | 3 +- .../google/pubsub/GooglePubsubComponent.java | 26 ++--- .../google/pubsub/GooglePubsubEndpoint.java | 13 +++ .../PubsubProducerWithCustomRetrySettingsTest.java | 122 +++++++++++++++++++++ .../dsl/GooglePubsubEndpointBuilderFactory.java | 32 ++++++ 8 files changed, 185 insertions(+), 20 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json index 579fedf6ebd..2bd1849e8fa 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/google-pubsub.json @@ -60,6 +60,7 @@ "lazyStartProducer": { "index": 13, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] "messageOrderingEnabled": { "index": 14, "kind": "parameter", "displayName": "Message Ordering Enabled", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Should message ordering be enabled" }, "pubsubEndpoint": { "index": 15, "kind": "parameter", "displayName": "Pubsub Endpoint", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Pub\/Sub endpoint to use. Required when using message ordering, and ensures that messages are received in order even when multiple publishers are used" }, - "serializer": { "index": 16, "kind": "parameter", "displayName": "Serializer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.google.pubsub.serializer.GooglePubsubSerializer", "deprecated": false, "deprecationNote": "", "autowired": true, "secret": false, "description": "A custom GooglePubsubSerializer to use for serializing message payloads in the producer" } + "retry": { "index": 16, "kind": "parameter", "displayName": "Retry", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "object", "javaType": "com.google.api.gax.retrying.RetrySettings", "deprecated": false, "autowired": false, "secret": false, "description": "A custom RetrySettings to control how the publisher handles retry-able failures" }, + "serializer": { "index": 17, "kind": "parameter", "displayName": "Serializer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.google.pubsub.serializer.GooglePubsubSerializer", "deprecated": false, "deprecationNote": "", "autowired": true, "secret": false, "description": "A custom GooglePubsubSerializer to use for serializing message payloads in the producer" } } } diff --git a/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java b/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java index ea672cece6d..c6b2217cbe7 100644 --- a/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java +++ b/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java @@ -46,6 +46,7 @@ public class GooglePubsubEndpointConfigurer extends PropertyConfigurerSupport im case "messageOrderingEnabled": target.setMessageOrderingEnabled(property(camelContext, boolean.class, value)); return true; case "pubsubendpoint": case "pubsubEndpoint": target.setPubsubEndpoint(property(camelContext, java.lang.String.class, value)); return true; + case "retry": target.setRetry(property(camelContext, com.google.api.gax.retrying.RetrySettings.class, value)); return true; case "serializer": target.setSerializer(property(camelContext, org.apache.camel.component.google.pubsub.serializer.GooglePubsubSerializer.class, value)); return true; case "serviceaccountkey": case "serviceAccountKey": target.setServiceAccountKey(property(camelContext, java.lang.String.class, value)); return true; @@ -86,6 +87,7 @@ public class GooglePubsubEndpointConfigurer extends PropertyConfigurerSupport im case "messageOrderingEnabled": return boolean.class; case "pubsubendpoint": case "pubsubEndpoint": return java.lang.String.class; + case "retry": return com.google.api.gax.retrying.RetrySettings.class; case "serializer": return org.apache.camel.component.google.pubsub.serializer.GooglePubsubSerializer.class; case "serviceaccountkey": case "serviceAccountKey": return java.lang.String.class; @@ -122,6 +124,7 @@ public class GooglePubsubEndpointConfigurer extends PropertyConfigurerSupport im case "messageOrderingEnabled": return target.isMessageOrderingEnabled(); case "pubsubendpoint": case "pubsubEndpoint": return target.getPubsubEndpoint(); + case "retry": return target.getRetry(); case "serializer": return target.getSerializer(); case "serviceaccountkey": case "serviceAccountKey": return target.getServiceAccountKey(); diff --git a/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointUriFactory.java b/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointUriFactory.java index c463876457e..996447b78ee 100644 --- a/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointUriFactory.java +++ b/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointUriFactory.java @@ -23,7 +23,7 @@ public class GooglePubsubEndpointUriFactory extends org.apache.camel.support.com private static final Set<String> SECRET_PROPERTY_NAMES; private static final Set<String> MULTI_VALUE_PREFIXES; static { - Set<String> props = new HashSet<>(17); + Set<String> props = new HashSet<>(18); props.add("ackMode"); props.add("authenticate"); props.add("bridgeErrorHandler"); @@ -38,6 +38,7 @@ public class GooglePubsubEndpointUriFactory extends org.apache.camel.support.com props.add("messageOrderingEnabled"); props.add("projectId"); props.add("pubsubEndpoint"); + props.add("retry"); props.add("serializer"); props.add("serviceAccountKey"); props.add("synchronousPull"); diff --git a/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json b/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json index 579fedf6ebd..2bd1849e8fa 100644 --- a/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json +++ b/components/camel-google/camel-google-pubsub/src/generated/resources/META-INF/org/apache/camel/component/google/pubsub/google-pubsub.json @@ -60,6 +60,7 @@ "lazyStartProducer": { "index": 13, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] "messageOrderingEnabled": { "index": 14, "kind": "parameter", "displayName": "Message Ordering Enabled", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Should message ordering be enabled" }, "pubsubEndpoint": { "index": 15, "kind": "parameter", "displayName": "Pubsub Endpoint", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Pub\/Sub endpoint to use. Required when using message ordering, and ensures that messages are received in order even when multiple publishers are used" }, - "serializer": { "index": 16, "kind": "parameter", "displayName": "Serializer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.google.pubsub.serializer.GooglePubsubSerializer", "deprecated": false, "deprecationNote": "", "autowired": true, "secret": false, "description": "A custom GooglePubsubSerializer to use for serializing message payloads in the producer" } + "retry": { "index": 16, "kind": "parameter", "displayName": "Retry", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "object", "javaType": "com.google.api.gax.retrying.RetrySettings", "deprecated": false, "autowired": false, "secret": false, "description": "A custom RetrySettings to control how the publisher handles retry-able failures" }, + "serializer": { "index": 17, "kind": "parameter", "displayName": "Serializer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.google.pubsub.serializer.GooglePubsubSerializer", "deprecated": false, "deprecationNote": "", "autowired": true, "secret": false, "description": "A custom GooglePubsubSerializer to use for serializing message payloads in the producer" } } } diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java index 2dcb1ddfb34..04616746ca8 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java @@ -64,38 +64,27 @@ import org.threeten.bp.Duration; public class GooglePubsubComponent extends DefaultComponent { private static final Logger LOG = LoggerFactory.getLogger(GooglePubsubComponent.class); - @Metadata( - label = "common", + @Metadata(label = "common", description = "Endpoint to use with local Pub/Sub emulator.") private String endpoint; - @Metadata(label = "common", description = "Use Credentials when interacting with PubSub service (no authentication is required when using emulator).", defaultValue = "true") private boolean authenticate = true; - @Metadata(label = "common", description = "The Service account key that can be used as credentials for the PubSub publisher/subscriber. It can be loaded by default from " + " classpath, but you can prefix with classpath:, file:, or http: to load the resource from different systems.") private String serviceAccountKey; - - @Metadata( - label = "producer", + @Metadata(label = "producer", description = "Maximum number of producers to cache. This could be increased if you have producers for lots of different topics.") private int publisherCacheSize = 100; - - @Metadata( - label = "producer", + @Metadata(label = "producer", description = "How many milliseconds should each producer stay alive in the cache.") private int publisherCacheTimeout = 180000; - - @Metadata( - label = "advanced", + @Metadata(label = "advanced", description = "How many milliseconds should a producer be allowed to terminate.") private int publisherTerminationTimeout = 60000; - - @Metadata( - label = "consumer", + @Metadata(label = "consumer", description = "Comma-separated list of additional retryable error codes for synchronous pull. By default the PubSub client library retries ABORTED, UNAVAILABLE, UNKNOWN") private String synchronousPullRetryableCodes; @@ -173,10 +162,13 @@ public class GooglePubsubComponent extends DefaultComponent { if (googlePubsubEndpoint.isMessageOrderingEnabled()) { builder.setEnableMessageOrdering(true); if (StringHelper.trimToNull(googlePubsubEndpoint.getPubsubEndpoint()) == null) { - LOG.warn("In conjunction with enabeling message ordering the pubsubEndpoint should be set. " + LOG.warn("In conjunction with enabling message ordering the pubsubEndpoint should be set. " + "Message ordering is only guaranteed when send to the same region."); } } + if (googlePubsubEndpoint.getRetry() != null) { + builder.setRetrySettings(googlePubsubEndpoint.getRetry()); + } return builder.build(); } diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java index b714ab3a95b..a7300d8fdb6 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java @@ -19,6 +19,7 @@ package org.apache.camel.component.google.pubsub; import java.util.Map; import java.util.concurrent.ExecutorService; +import com.google.api.gax.retrying.RetrySettings; import org.apache.camel.Category; import org.apache.camel.Component; import org.apache.camel.Consumer; @@ -104,6 +105,10 @@ public class GooglePubsubEndpoint extends DefaultEndpoint implements EndpointSer @Metadata(autowired = true) private GooglePubsubSerializer serializer; + @UriParam(description = "A custom RetrySettings to control how the publisher handles retry-able failures", + label = "producer,advanced") + private RetrySettings retry; + public GooglePubsubEndpoint(String uri, Component component) { super(uri, component); @@ -252,6 +257,14 @@ public class GooglePubsubEndpoint extends DefaultEndpoint implements EndpointSer this.messageOrderingEnabled = messageOrderingEnabled; } + public RetrySettings getRetry() { + return retry; + } + + public void setRetry(RetrySettings retry) { + this.retry = retry; + } + public String getPubsubEndpoint() { return this.pubsubEndpoint; } diff --git a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/unit/PubsubProducerWithCustomRetrySettingsTest.java b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/unit/PubsubProducerWithCustomRetrySettingsTest.java new file mode 100644 index 00000000000..7395ba62ac0 --- /dev/null +++ b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/unit/PubsubProducerWithCustomRetrySettingsTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.pubsub.unit; + +import java.time.Duration; + +import com.google.api.gax.retrying.RetrySettings; +import org.apache.camel.BindToRegistry; +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Producer; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.google.pubsub.GooglePubsubEndpoint; +import org.apache.camel.component.google.pubsub.GooglePubsubProducer; +import org.apache.camel.component.google.pubsub.PubsubTestSupport; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class PubsubProducerWithCustomRetrySettingsTest extends PubsubTestSupport { + + private static final String TEST_TOPIC_NAME = "test-topic-name"; + + @EndpointInject("google-pubsub:{{project.id}}:" + TEST_TOPIC_NAME + "?retry=#retrySettings") + private Endpoint to; + + @BindToRegistry + private RetrySettings retrySettings = new CustomRetrySettings(); + + @EndpointInject("direct:from") + private Endpoint from; + + @Test + public void testProducerConfiguration() throws Exception { + // :1 indicates first of a component type in Camel context + Endpoint endpoint + = context.hasEndpoint(String.format("google-pubsub:%s:%s?retry=#retrySettings", PROJECT_ID, TEST_TOPIC_NAME)); + assertNotNull(endpoint, + String.format("Endpoint 'google-pubsub:%s:$s?retry=#retrySettings' is not found in Camel Context", PROJECT_ID, + TEST_TOPIC_NAME)); + + assertNotNull(((GooglePubsubEndpoint) endpoint).getRetry()); + Producer producer = endpoint.createProducer(); + assertTrue(producer instanceof GooglePubsubProducer); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + from(from).to(to); + } + }; + } + + private static final class CustomRetrySettings extends RetrySettings { + @Override + public Duration getTotalTimeoutDuration() { + return Duration.ZERO; + } + + @Override + public Duration getInitialRetryDelayDuration() { + return Duration.ZERO; + } + + @Override + public double getRetryDelayMultiplier() { + return 0; + } + + @Override + public Duration getMaxRetryDelayDuration() { + return Duration.ZERO; + } + + @Override + public int getMaxAttempts() { + return 0; + } + + @Override + public boolean isJittered() { + return false; + } + + @Override + public Duration getInitialRpcTimeoutDuration() { + return null; + } + + @Override + public double getRpcTimeoutMultiplier() { + return 0; + } + + @Override + public Duration getMaxRpcTimeoutDuration() { + return null; + } + + @Override + public Builder toBuilder() { + return null; + } + } +} diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java index 2d62ca9d0b7..54f1c8e79b6 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GooglePubsubEndpointBuilderFactory.java @@ -571,6 +571,38 @@ public interface GooglePubsubEndpointBuilderFactory { doSetProperty("pubsubEndpoint", pubsubEndpoint); return this; } + /** + * A custom RetrySettings to control how the publisher handles + * retry-able failures. + * + * The option is a: + * <code>com.google.api.gax.retrying.RetrySettings</code> type. + * + * Group: producer (advanced) + * + * @param retry the value to set + * @return the dsl builder + */ + default AdvancedGooglePubsubEndpointProducerBuilder retry(com.google.api.gax.retrying.RetrySettings retry) { + doSetProperty("retry", retry); + return this; + } + /** + * A custom RetrySettings to control how the publisher handles + * retry-able failures. + * + * The option will be converted to a + * <code>com.google.api.gax.retrying.RetrySettings</code> type. + * + * Group: producer (advanced) + * + * @param retry the value to set + * @return the dsl builder + */ + default AdvancedGooglePubsubEndpointProducerBuilder retry(String retry) { + doSetProperty("retry", retry); + return this; + } /** * A custom GooglePubsubSerializer to use for serializing message * payloads in the producer.