This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 86e3762 CAMEL-14047: Allow unset producerName to get unique name from Pulsar (#3237) 86e3762 is described below commit 86e37622a8986b3de8efc0ad9b042aab3fd89d2d Author: William Thompson <william.thomp...@toasttab.com> AuthorDate: Fri Oct 11 06:50:16 2019 -0400 CAMEL-14047: Allow unset producerName to get unique name from Pulsar (#3237) * Allow null ProducerName to get unique name from Pulsar * Remove explicit null declaration * Update javadoc, generate adoc and PulsarEndpointBuilderFactory --- .../src/main/docs/pulsar-component.adoc | 2 +- .../camel/component/pulsar/PulsarProducer.java | 8 +- .../pulsar/configuration/PulsarConfiguration.java | 6 +- .../component/pulsar/PulsarComponentTest.java | 2 +- .../PulsarProducerUndefinedProducerNameInTest.java | 111 +++++++++++++++++++++ .../endpoint/dsl/PulsarEndpointBuilderFactory.java | 3 +- 6 files changed, 122 insertions(+), 10 deletions(-) diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc b/components/camel-pulsar/src/main/docs/pulsar-component.adoc index 3e19465..d2eb07a 100644 --- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc +++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc @@ -98,7 +98,7 @@ with the following path and query parameters: | *maxPendingMessagesAcross Partitions* (producer) | Set the number of max pending messages across all the partitions. Default is 50000. | 50000 | int | *messageRouter* (producer) | Set a custom Message Router. | | MessageRouter | *messageRoutingMode* (producer) | Set the message routing mode for the producer. | RoundRobinPartition | MessageRoutingMode -| *producerName* (producer) | Name of the producer | default-producer | String +| *producerName* (producer) | Name of the producer. If unset, lets Pulsar select a unique identifier. | | String | *sendTimeoutMs* (producer) | Send timeout in milliseconds. Defaults to 30,000ms (30 seconds) | 30000 | int | *basicPropertyBinding* (advanced) | Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java index 5503949..efc78ba 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java @@ -57,10 +57,7 @@ public class PulsarProducer extends DefaultProducer { final String topicUri = pulsarEndpoint.getUri(); PulsarConfiguration configuration = pulsarEndpoint.getPulsarConfiguration(); String producerName = configuration.getProducerName(); - if (producerName == null) { - producerName = topicUri + "-" + Thread.currentThread().getId(); - } - final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint.getPulsarClient().newProducer().producerName(producerName).topic(topicUri) + final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint.getPulsarClient().newProducer().topic(topicUri) .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS).blockIfQueueFull(configuration.isBlockIfQueueFull()) .maxPendingMessages(configuration.getMaxPendingMessages()).maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions()) .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS).batchingMaxMessages(configuration.getMaxPendingMessages()) @@ -70,6 +67,9 @@ public class PulsarProducer extends DefaultProducer { } else { producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode()); } + if (producerName != null) { + producerBuilder.producerName(producerName); + } producer = producerBuilder.create(); } } diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java index 50b9778..d6bdcb8 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java @@ -41,8 +41,8 @@ public class PulsarConfiguration { private int consumerQueueSize = 10; @UriParam(label = "consumer", defaultValue = "sole-consumer") private String consumerName = "sole-consumer"; - @UriParam(label = "producer", defaultValue = "default-producer") - private String producerName = "default-producer"; + @UriParam(label = "producer") + private String producerName; @UriParam(label = "consumer", defaultValue = "cons") private String consumerNamePrefix = "cons"; @UriParam(label = "consumer", defaultValue = "false") @@ -136,7 +136,7 @@ public class PulsarConfiguration { } /** - * Name of the producer + * Name of the producer. If unset, lets Pulsar select a unique identifier. */ public void setProducerName(String producerName) { this.producerName = producerName; diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java index 258df10..6ef8983 100644 --- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java @@ -60,7 +60,7 @@ public class PulsarComponentTest extends CamelTestSupport { assertEquals("cons", endpoint.getPulsarConfiguration().getConsumerNamePrefix()); assertEquals(10, endpoint.getPulsarConfiguration().getConsumerQueueSize()); assertEquals(1, endpoint.getPulsarConfiguration().getNumberOfConsumers()); - assertEquals("default-producer", endpoint.getPulsarConfiguration().getProducerName()); + assertNull(endpoint.getPulsarConfiguration().getProducerName()); assertEquals("subs", endpoint.getPulsarConfiguration().getSubscriptionName()); assertEquals(SubscriptionType.EXCLUSIVE, endpoint.getPulsarConfiguration().getSubscriptionType()); assertFalse(endpoint.getPulsarConfiguration().isAllowManualAcknowledgement()); diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerUndefinedProducerNameInTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerUndefinedProducerNameInTest.java new file mode 100644 index 0000000..c65d9ad --- /dev/null +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerUndefinedProducerNameInTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.pulsar; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.pulsar.utils.AutoConfiguration; +import org.apache.camel.spi.Registry; +import org.apache.camel.support.SimpleRegistry; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.junit.Test; + +public class PulsarProducerUndefinedProducerNameInTest extends PulsarTestSupport { + + private static final String TOPIC_URI = "persistent://public/default/camel-producer-topic"; + + @Produce("direct:start1") + private ProducerTemplate producerTemplate1; + + @Produce("direct:start2") + private ProducerTemplate producerTemplate2; + + @EndpointInject("pulsar:" + TOPIC_URI + + "?numberOfConsumers=1" + + "&subscriptionType=Exclusive" + + "&subscriptionName=camel-subscription" + + "&consumerQueueSize=1" + + "&consumerName=camel-consumer" + ) + private Endpoint pulsarEndpoint1; + + @EndpointInject("pulsar:" + TOPIC_URI) + private Endpoint pulsarEndpoint2; + + @EndpointInject("mock:result") + private MockEndpoint to; + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + + @Override + public void configure() { + from("direct:start1").to(pulsarEndpoint1); + from("direct:start2").to(pulsarEndpoint2); + + from(pulsarEndpoint1).to(to); + } + }; + } + + @Override + protected Registry createCamelRegistry() throws Exception { + Registry registry = new SimpleRegistry(); + + registerPulsarBeans(registry); + + return registry; + } + + private void registerPulsarBeans(Registry registry) throws PulsarClientException { + PulsarClient pulsarClient = givenPulsarClient(); + AutoConfiguration autoConfiguration = new AutoConfiguration(null, null); + + registry.bind("pulsarClient", pulsarClient); + PulsarComponent comp = new PulsarComponent(context); + comp.setAutoConfiguration(autoConfiguration); + comp.setPulsarClient(pulsarClient); + registry.bind("pulsar", comp); + } + + private PulsarClient givenPulsarClient() throws PulsarClientException { + return new ClientBuilderImpl() + .serviceUrl(getPulsarBrokerUrl()) + .ioThreads(1) + .listenerThreads(1) + .build(); + } + + @Test + public void testAMessageToRouteIsSentFromBothProducersAndThenConsumed() throws Exception { + to.expectedMessageCount(2); + + producerTemplate1.sendBody("Test First"); + producerTemplate2.sendBody("Test Second"); + + MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to); + } +} diff --git a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java index ace4f65..e5f4f00 100644 --- a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java +++ b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java @@ -717,7 +717,8 @@ public interface PulsarEndpointBuilderFactory { return this; } /** - * Name of the producer. + * Name of the producer. If unset, lets Pulsar select a unique + * identifier. * * The option is a: <code>java.lang.String</code> type. *