This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit e119b98ee928d23ddc790f1f4c50a218c03d6418 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Mon Aug 23 16:37:44 2021 +0200 CAMEL-16392 - Camel-Pulsar: Make the client configurable from component parameters --- .../component/pulsar/PulsarConfiguration.java | 39 ++++++++++++++++++++++ .../camel/component/pulsar/PulsarEndpoint.java | 14 +++++++- 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java index cceb811..5f22892 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java @@ -35,6 +35,12 @@ import static org.apache.camel.component.pulsar.utils.consumers.SubscriptionType @UriParams public class PulsarConfiguration implements Cloneable { + @UriParam(label = "common") + private String serviceUrl; + @UriParam(label = "common") + private String authenticationClass; + @UriParam(label = "common") + private String authenticationParams; @UriParam(label = "consumer") private boolean topicsPattern; @UriParam(label = "consumer", defaultValue = "PersistentOnly") @@ -483,4 +489,37 @@ public class PulsarConfiguration implements Cloneable { public void setNumberOfConsumerThreads(int numberOfConsumerThreads) { this.numberOfConsumerThreads = numberOfConsumerThreads; } + + public String getServiceUrl() { + return serviceUrl; + } + + /** + * The Pulsar Service URL to point while creating the client from URI + */ + public void setServiceUrl(String serviceUrl) { + this.serviceUrl = serviceUrl; + } + + public String getAuthenticationClass() { + return authenticationClass; + } + + /** + * The Authentication FQCN to be used while creating the client from URI + */ + public void setAuthenticationClass(String authenticationClass) { + this.authenticationClass = authenticationClass; + } + + public String getAuthenticationParams() { + return authenticationParams; + } + + /** + * The Authentication Parameters to be used while creating the client from URI + */ + public void setAuthenticationParams(String authenticationParams) { + this.authenticationParams = authenticationParams; + } } diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java index e8f48f5..ceba2ef 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java @@ -26,6 +26,7 @@ import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; import org.apache.camel.support.DefaultEndpoint; import org.apache.camel.util.ObjectHelper; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; /** @@ -151,7 +152,18 @@ public class PulsarEndpoint extends DefaultEndpoint { @Override protected void doStart() throws Exception { - ObjectHelper.notNull(pulsarClient, "pulsarClient", this); + if (ObjectHelper.isEmpty(pulsarClient)) { + ClientBuilder builder = PulsarClient.builder(); + if (ObjectHelper.isNotEmpty(pulsarConfiguration.getServiceUrl())) { + builder = builder.serviceUrl(pulsarConfiguration.getServiceUrl()); + } + if (ObjectHelper.isNotEmpty(pulsarConfiguration.getAuthenticationClass()) + && ObjectHelper.isNotEmpty(pulsarConfiguration.getAuthenticationParams())) { + builder = builder.authentication(pulsarConfiguration.getAuthenticationClass(), + pulsarConfiguration.getAuthenticationParams()); + } + pulsarClient = builder.build(); + } } @Override