This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 1752055 CAMEL-13718: camel-pulsar - Split @UriPath into individual parts and polised the code a bit. 1752055 is described below commit 175205551a0ab61c93647de440dcecfb947a5c93 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Fri Jul 5 10:08:22 2019 +0200 CAMEL-13718: camel-pulsar - Split @UriPath into individual parts and polised the code a bit. --- components/camel-pulsar/pom.xml | 7 +- .../src/main/docs/pulsar-component.adoc | 16 +-- .../camel/component/pulsar/PulsarComponent.java | 20 +++- .../camel/component/pulsar/PulsarEndpoint.java | 107 +++++++++++++++------ .../component/pulsar/PulsarMessageListener.java | 5 +- .../camel/component/pulsar/PulsarProducer.java | 4 +- .../camel/component/pulsar/utils/PulsarPath.java | 19 +++- .../consumers/CommonCreationStrategyImpl.java | 2 +- .../camel/component/pulsar/PulsarEndpointTest.java | 18 +++- .../pulsar/utils/AutoConfigurationTest.java | 105 -------------------- .../endpoint/dsl/PulsarEndpointBuilderFactory.java | 107 ++++++++++++++++++++- 11 files changed, 246 insertions(+), 164 deletions(-) diff --git a/components/camel-pulsar/pom.xml b/components/camel-pulsar/pom.xml index 66312d4..2e0820f 100644 --- a/components/camel-pulsar/pom.xml +++ b/components/camel-pulsar/pom.xml @@ -20,21 +20,20 @@ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + <parent> <artifactId>components</artifactId> <groupId>org.apache.camel</groupId> <version>3.0.0-SNAPSHOT</version> </parent> - <modelVersion>4.0.0</modelVersion> <artifactId>camel-pulsar</artifactId> <packaging>jar</packaging> <name>Camel :: Pulsar</name> <description>Camel Apache Pulsar Component</description> - <properties> - </properties> - <dependencies> <dependency> diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc b/components/camel-pulsar/src/main/docs/pulsar-component.adoc index ded6ccd..6cd4f99 100644 --- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc +++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc @@ -17,14 +17,14 @@ their `pom.xml` for this component. </dependency> ------------------------------------------------------------ -### URI format +=== URI format -[source,java] +[source,text] ---------------------- pulsar:[persistent|non-persistent]://tenant/namespace/topic ---------------------- -### Options +=== Options // component options: START @@ -50,18 +50,21 @@ The Apache Pulsar component supports 4 options, which are listed below. The Apache Pulsar endpoint is configured using URI syntax: ---- -pulsar:uri +pulsar:persistence://tenant/namespace/topic ---- with the following path and query parameters: -==== Path Parameters (1 parameters): +==== Path Parameters (4 parameters): [width="100%",cols="2,5,^1,2",options="header"] |=== | Name | Description | Default | Type -| *topicUri* | The Topic's full URI path including type, tenant and namespace | | String +| *persistence* | *Required* Whether the topic is persistent or non-persistent | | String +| *tenant* | *Required* The tenant | | String +| *namespace* | *Required* The namespace | | String +| *topic* | *Required* The topic | | String |=== @@ -86,6 +89,7 @@ with the following path and query parameters: | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean |=== // endpoint options: END + // spring-boot-auto-configure options: START === Spring Boot Auto-Configuration diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java index 7244fdb..67f34ff 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java @@ -22,6 +22,7 @@ import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.component.pulsar.configuration.PulsarConfiguration; import org.apache.camel.component.pulsar.utils.AutoConfiguration; +import org.apache.camel.component.pulsar.utils.PulsarPath; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.annotations.Component; import org.apache.camel.support.DefaultComponent; @@ -45,17 +46,30 @@ public class PulsarComponent extends DefaultComponent { @Override protected Endpoint createEndpoint(final String uri, final String path, final Map<String, Object> parameters) throws Exception { final PulsarConfiguration configuration = new PulsarConfiguration(); - setProperties(configuration, parameters); if (autoConfiguration != null) { setProperties(autoConfiguration, parameters); - if (autoConfiguration.isAutoConfigurable()) { autoConfiguration.ensureNameSpaceAndTenant(path); } } - return PulsarEndpoint.create(uri, path, configuration, this, pulsarClient); + PulsarEndpoint answer = new PulsarEndpoint(uri, this); + answer.setPulsarConfiguration(configuration); + answer.setPulsarClient(pulsarClient); + setProperties(answer, parameters); + + PulsarPath pp = new PulsarPath(path); + if (pp.isAutoConfigurable()) { + answer.setPersistence(pp.getPersistence()); + answer.setTenant(pp.getTenant()); + answer.setNamespace(pp.getNamespace()); + answer.setTopic(pp.getTopic()); + } else { + throw new IllegalArgumentException("Pulsar name structure is invalid: was " + path); + } + + return answer; } public AutoConfiguration getAutoConfiguration() { diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java index d46bc8b..917bcf8 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java @@ -17,42 +17,36 @@ package org.apache.camel.component.pulsar; import org.apache.camel.Consumer; -import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.component.pulsar.configuration.PulsarConfiguration; +import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; import org.apache.camel.support.DefaultEndpoint; +import org.apache.camel.util.ObjectHelper; import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -@UriEndpoint(scheme = "pulsar", firstVersion = "2.24.0", title = "Apache Pulsar", syntax = "pulsar:uri", label = "messaging") +@UriEndpoint(scheme = "pulsar", firstVersion = "2.24.0", title = "Apache Pulsar", syntax = "pulsar:persistence://tenant/namespace/topic", label = "messaging") public class PulsarEndpoint extends DefaultEndpoint { private PulsarClient pulsarClient; + private String uri; + + @UriPath(enums = "persistent,non-persistent") @Metadata(required = true) + private String persistence; + @UriPath @Metadata(required = true) + private String tenant; + @UriPath @Metadata(required = true) + private String namespace; + @UriPath @Metadata(required = true) + private String topic; @UriParam private PulsarConfiguration pulsarConfiguration; - @UriPath(label = "consumer,producer", description = "The Topic's full URI path including type, tenant and namespace") - private final String topicUri; - public PulsarEndpoint(String uri, String path, PulsarConfiguration pulsarConfiguration, PulsarComponent component, PulsarClient pulsarClient) throws PulsarClientException { + public PulsarEndpoint(String uri, PulsarComponent component) { super(uri, component); - this.topicUri = path; - this.pulsarConfiguration = pulsarConfiguration; - this.pulsarClient = pulsarClient; - } - - public static PulsarEndpoint create(final String uri, final String path, final PulsarConfiguration pulsarConfiguration, final PulsarComponent component, - final PulsarClient pulsarClient) - throws PulsarClientException, IllegalArgumentException { - - if (null == pulsarConfiguration) { - throw new IllegalArgumentException("PulsarEndpointConfiguration cannot be null"); - } - - return new PulsarEndpoint(uri, path, pulsarConfiguration, component, pulsarClient); } @Override @@ -67,21 +61,80 @@ public class PulsarEndpoint extends DefaultEndpoint { return consumer; } - @Override - public Exchange createExchange() { - return super.createExchange(); - } - public PulsarClient getPulsarClient() { return pulsarClient; } + /** + * To use a custom pulsar client + */ + public void setPulsarClient(PulsarClient pulsarClient) { + this.pulsarClient = pulsarClient; + } + + public String getPersistence() { + return persistence; + } + + /** + * Whether the topic is persistent or non-persistent + */ + public void setPersistence(String persistence) { + this.persistence = persistence; + } + + public String getTenant() { + return tenant; + } + + /** + * The tenant + */ + public void setTenant(String tenant) { + this.tenant = tenant; + } + + public String getNamespace() { + return namespace; + } + + /** + * The namespace + */ + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + public String getTopic() { + return topic; + } + + /** + * The topic + */ + public void setTopic(String topic) { + this.topic = topic; + } + public PulsarConfiguration getPulsarConfiguration() { return pulsarConfiguration; } - public String getTopicUri() { - return topicUri; + public void setPulsarConfiguration(PulsarConfiguration pulsarConfiguration) { + this.pulsarConfiguration = pulsarConfiguration; + } + + public String getUri() { + return persistence + "://" + tenant + "/" + namespace + "/" + topic; } + @Override + protected void doStart() throws Exception { + ObjectHelper.notNull(persistence, "persistence", this); + ObjectHelper.notNull(tenant, "tenant", this); + ObjectHelper.notNull(namespace, "namespace", this); + ObjectHelper.notNull(topic, "topic", this); + + uri = persistence + "://" + tenant + "/" + namespace + "/" + topic; + } } diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java index 1063774..9e0398f 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java @@ -56,9 +56,6 @@ public class PulsarMessageListener implements MessageListener<byte[]> { final Exchange exchangeWithException = PulsarMessageUtils .updateExchangeWithException(exception, exchange); - exceptionHandler - .handleException("An error occurred", exchangeWithException, exception); - - LOGGER.error("An error occurred while processing this exchange :: {}", exception); + exceptionHandler.handleException("An error occurred", exchangeWithException, exception); } } diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java index 1f38005..70f1d40 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java @@ -32,7 +32,6 @@ public class PulsarProducer extends DefaultProducer { public PulsarProducer(PulsarEndpoint pulsarEndpoint) { super(pulsarEndpoint); - this.pulsarEndpoint = pulsarEndpoint; } @@ -52,7 +51,7 @@ public class PulsarProducer extends DefaultProducer { private synchronized void createProducer() throws org.apache.pulsar.client.api.PulsarClientException { if (producer == null) { - final String topicUri = pulsarEndpoint.getTopicUri(); + final String topicUri = pulsarEndpoint.getUri(); String producerName = pulsarEndpoint.getPulsarConfiguration().getProducerName(); if (producerName == null) { producerName = topicUri + "-" + Thread.currentThread().getId(); @@ -79,6 +78,7 @@ public class PulsarProducer extends DefaultProducer { log.debug("Stopping producer: {}", this); if (producer != null) { producer.close(); + producer = null; } } } diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarPath.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarPath.java index 01e6141..1af739f 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarPath.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarPath.java @@ -20,20 +20,29 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; public class PulsarPath { - private static final Pattern PATTERN = Pattern.compile("^((persistent|non-persistent)://)?(?<namespace>(?<tenant>.+)/.+)/.+$"); + private static final Pattern PATTERN = Pattern.compile("^(persistent|non-persistent):?/?/(.+)/(.+)/(.+)$"); + private String persistence; private String tenant; private String namespace; + private String topic; private boolean autoConfigurable; public PulsarPath(String path) { Matcher matcher = PATTERN.matcher(path); autoConfigurable = matcher.matches(); if (autoConfigurable) { - tenant = matcher.group("tenant"); - namespace = matcher.group("namespace"); + persistence = matcher.group(1); + tenant = matcher.group(2); + namespace = matcher.group(3); + topic = matcher.group(4); } } + + public String getPersistence() { + return persistence; + } + public String getTenant() { return tenant; } @@ -42,6 +51,10 @@ public class PulsarPath { return namespace; } + public String getTopic() { + return topic; + } + public boolean isAutoConfigurable() { return autoConfigurable; } diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java index 9432e13..740ee76 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java @@ -30,7 +30,7 @@ public final class CommonCreationStrategyImpl { public static ConsumerBuilder<byte[]> create(final String name, final PulsarEndpoint pulsarEndpoint, final PulsarConsumer pulsarConsumer) { final PulsarConfiguration endpointConfiguration = pulsarEndpoint.getPulsarConfiguration(); - return pulsarEndpoint.getPulsarClient().newConsumer().topic(pulsarEndpoint.getTopicUri()).subscriptionName(endpointConfiguration.getSubscriptionName()) + return pulsarEndpoint.getPulsarClient().newConsumer().topic(pulsarEndpoint.getUri()).subscriptionName(endpointConfiguration.getSubscriptionName()) .receiverQueueSize(endpointConfiguration.getConsumerQueueSize()).consumerName(name) .messageListener(new PulsarMessageListener(pulsarEndpoint, pulsarConsumer.getExceptionHandler(), pulsarConsumer.getProcessor())); } diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarEndpointTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarEndpointTest.java index 73be970..ea836de 100644 --- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarEndpointTest.java +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarEndpointTest.java @@ -16,13 +16,21 @@ */ package org.apache.camel.component.pulsar; -import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.camel.ResolveEndpointFailedException; +import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; -public class PulsarEndpointTest { +public class PulsarEndpointTest extends CamelTestSupport { - @Test(expected = IllegalArgumentException.class) - public void givenPulsarEndpointConfigurationIsNullthrowIllegalArgumentExceptionOnCreation() throws PulsarClientException { - PulsarEndpoint.create("", "", null, null, null); + @Test + public void testInvalidPulsarNameStructure() { + try { + // the topic is missing + context.getEndpoint("pulsar:persistent://myteant/mynamespace"); + fail("Should throw exception"); + } catch (ResolveEndpointFailedException e) { + IllegalArgumentException iae = assertIsInstanceOf(IllegalArgumentException.class, e.getCause()); + assertEquals("Pulsar name structure is invalid: was persistent://myteant/mynamespace", iae.getMessage()); + } } } diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/utils/AutoConfigurationTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/utils/AutoConfigurationTest.java deleted file mode 100644 index 677b245..0000000 --- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/utils/AutoConfigurationTest.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.pulsar.utils; - -import java.util.Collections; -import java.util.Set; - -import org.apache.pulsar.client.admin.Namespaces; -import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.admin.Tenants; -import org.apache.pulsar.common.policies.data.TenantInfo; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Matchers; - -import static org.mockito.Mockito.*; - -public class AutoConfigurationTest { - - private PulsarAdmin pulsarAdmin; - private Tenants tenants; - private Namespaces namespaces; - private Set<String> clusters = Collections.singleton("standalone"); - - @Before - public void setUp() { - pulsarAdmin = mock(PulsarAdmin.class); - tenants = mock(Tenants.class); - namespaces = mock(Namespaces.class); - - when(pulsarAdmin.tenants()).thenReturn(tenants); - when(pulsarAdmin.namespaces()).thenReturn(namespaces); - } - - @Test - public void noAdminConfiguration() { - when(pulsarAdmin.getClientConfigData()).thenReturn(null); - - AutoConfiguration autoConfiguration = new AutoConfiguration(null, clusters); - autoConfiguration.ensureNameSpaceAndTenant("tn1/ns1/topic"); - - verify(pulsarAdmin, never()).tenants(); - } - - @Test - public void autoConfigurationDisabled() { - - AutoConfiguration autoConfiguration = new AutoConfiguration(null, clusters); - autoConfiguration.ensureNameSpaceAndTenant("tn1/ns1/topic"); - - verify(pulsarAdmin, never()).tenants(); - } - - @Test - public void defaultTopic() { - - AutoConfiguration autoConfiguration = new AutoConfiguration(pulsarAdmin, clusters); - autoConfiguration.ensureNameSpaceAndTenant("topic"); - - verify(pulsarAdmin, never()).tenants(); - } - - @Test - public void newTenantAndNamespace() throws PulsarAdminException { - when(pulsarAdmin.tenants()).thenReturn(tenants); - when(tenants.getTenants()).thenReturn(Collections.<String> emptyList()); - when(pulsarAdmin.namespaces()).thenReturn(namespaces); - when(namespaces.getNamespaces("tn1")).thenReturn(Collections.<String> emptyList()); - - AutoConfiguration autoConfiguration = new AutoConfiguration(pulsarAdmin, clusters); - autoConfiguration.ensureNameSpaceAndTenant("tn1/ns1/topic"); - - verify(tenants).createTenant(eq("tn1"), Matchers.<TenantInfo> any()); - verify(namespaces).createNamespace("tn1/ns1", Collections.singleton("standalone")); - } - - @Test - public void existingTenantAndNamespace() throws PulsarAdminException { - when(pulsarAdmin.tenants()).thenReturn(tenants); - when(tenants.getTenants()).thenReturn(Collections.<String> singletonList("tn1")); - when(pulsarAdmin.namespaces()).thenReturn(namespaces); - when(namespaces.getNamespaces("tn1")).thenReturn(Collections.<String> singletonList("tn1/ns1")); - - AutoConfiguration autoConfiguration = new AutoConfiguration(pulsarAdmin, clusters); - autoConfiguration.ensureNameSpaceAndTenant("tn1/ns1/topic"); - - verify(tenants, never()).createTenant(Matchers.<String> any(), Matchers.<TenantInfo> any()); - verify(namespaces, never()).createNamespace(Matchers.<String> any(), anySet()); - } -} diff --git a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java index 71767d8..b57b512 100644 --- a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java +++ b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java @@ -42,6 +42,42 @@ public interface PulsarEndpointBuilderFactory { return (AdvancedPulsarEndpointConsumerBuilder) this; } /** + * Whether the topic is persistent or non-persistent. + * The option is a <code>java.lang.String</code> type. + * @group common + */ + default PulsarEndpointConsumerBuilder persistence(String persistence) { + setProperty("persistence", persistence); + return this; + } + /** + * The tenant. + * The option is a <code>java.lang.String</code> type. + * @group common + */ + default PulsarEndpointConsumerBuilder tenant(String tenant) { + setProperty("tenant", tenant); + return this; + } + /** + * The namespace. + * The option is a <code>java.lang.String</code> type. + * @group common + */ + default PulsarEndpointConsumerBuilder namespace(String namespace) { + setProperty("namespace", namespace); + return this; + } + /** + * The topic. + * The option is a <code>java.lang.String</code> type. + * @group common + */ + default PulsarEndpointConsumerBuilder topic(String topic) { + setProperty("topic", topic); + return this; + } + /** * Allows for bridging the consumer to the Camel routing Error Handler, * which mean any exceptions occurred while the consumer is trying to * pickup incoming messages, or the likes, will now be processed as a @@ -283,12 +319,39 @@ public interface PulsarEndpointBuilderFactory { return (AdvancedPulsarEndpointProducerBuilder) this; } /** - * The Topic's full URI path including type, tenant and namespace. + * Whether the topic is persistent or non-persistent. * The option is a <code>java.lang.String</code> type. - * @group producer + * @group common + */ + default PulsarEndpointProducerBuilder persistence(String persistence) { + setProperty("persistence", persistence); + return this; + } + /** + * The tenant. + * The option is a <code>java.lang.String</code> type. + * @group common + */ + default PulsarEndpointProducerBuilder tenant(String tenant) { + setProperty("tenant", tenant); + return this; + } + /** + * The namespace. + * The option is a <code>java.lang.String</code> type. + * @group common + */ + default PulsarEndpointProducerBuilder namespace(String namespace) { + setProperty("namespace", namespace); + return this; + } + /** + * The topic. + * The option is a <code>java.lang.String</code> type. + * @group common */ - default PulsarEndpointProducerBuilder topicUri(String topicUri) { - setProperty("topicUri", topicUri); + default PulsarEndpointProducerBuilder topic(String topic) { + setProperty("topic", topic); return this; } /** @@ -402,6 +465,42 @@ public interface PulsarEndpointBuilderFactory { default AdvancedPulsarEndpointBuilder advanced() { return (AdvancedPulsarEndpointBuilder) this; } + /** + * Whether the topic is persistent or non-persistent. + * The option is a <code>java.lang.String</code> type. + * @group common + */ + default PulsarEndpointBuilder persistence(String persistence) { + setProperty("persistence", persistence); + return this; + } + /** + * The tenant. + * The option is a <code>java.lang.String</code> type. + * @group common + */ + default PulsarEndpointBuilder tenant(String tenant) { + setProperty("tenant", tenant); + return this; + } + /** + * The namespace. + * The option is a <code>java.lang.String</code> type. + * @group common + */ + default PulsarEndpointBuilder namespace(String namespace) { + setProperty("namespace", namespace); + return this; + } + /** + * The topic. + * The option is a <code>java.lang.String</code> type. + * @group common + */ + default PulsarEndpointBuilder topic(String topic) { + setProperty("topic", topic); + return this; + } } /**