This is an automated email from the ASF dual-hosted git repository. nfilotto pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new ae2f8e250c5 CAMEL-16030: camel-pulsar - Add async send to producer (#8529) ae2f8e250c5 is described below commit ae2f8e250c5ab7210e1b3ff5ee06aa68e28de2c4 Author: Nicolas Filotto <essob...@users.noreply.github.com> AuthorDate: Wed Oct 12 17:04:27 2022 +0200 CAMEL-16030: camel-pulsar - Add async send to producer (#8529) ## Motivation The producer is synchronous today. But pulsar allows sending asynchronously, where a `CompletableFuture` is returned. We can leverage this for async send, and call `AsyncCallback` from the future. ## Modifications: * Make `PulsarProducer` extend `DefaultAsyncProducer` and implement the corresponding `process` method * Ensure the thread safety of the producer initialization (not directly related to the initial issue) * Deprecate the option `maxPendingMessagesAcrossPartitions` as it is deprecated in the pulsar client --- .../org/apache/camel/component/pulsar/pulsar.json | 4 +- .../component/pulsar/PulsarConfiguration.java | 6 +- .../camel/component/pulsar/PulsarProducer.java | 145 +++++++++++++-------- 3 files changed, 99 insertions(+), 56 deletions(-) diff --git a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json index b95da9667fe..183bf83b416 100644 --- a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json +++ b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json @@ -57,7 +57,7 @@ "initialSequenceId": { "kind": "property", "displayName": "Initial Sequence Id", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": -1, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "The first message published will have a sequence Id of initialSequenceId 1." }, "lazyStartProducer": { "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during star [...] "maxPendingMessages": { "kind": "property", "displayName": "Max Pending Messages", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Size of the pending massages queue. When the queue is full, by default, any further sends wi [...] - "maxPendingMessagesAcrossPartitions": { "kind": "property", "displayName": "Max Pending Messages Across Partitions", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 50000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "The maximum number of pending messages for partitioned to [...] + "maxPendingMessagesAcrossPartitions": { "kind": "property", "displayName": "Max Pending Messages Across Partitions", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "int", "deprecated": true, "autowired": false, "secret": false, "defaultValue": 50000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "The maximum number of pending messages for partitioned top [...] "messageRouter": { "kind": "property", "displayName": "Message Router", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.pulsar.client.api.MessageRouter", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Custom Message Router to use" }, "messageRoutingMode": { "kind": "property", "displayName": "Message Routing Mode", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.pulsar.client.api.MessageRoutingMode", "enum": [ "SinglePartition", "RoundRobinPartition", "CustomPartition" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "RoundRobinPartition", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configuration [...] "producerName": { "kind": "property", "displayName": "Producer Name", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "configuration", "description": "Name of the producer. If unset, lets Pulsar select a unique identifier." }, @@ -121,7 +121,7 @@ "compressionType": { "kind": "parameter", "displayName": "Compression Type", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.pulsar.client.api.CompressionType", "enum": [ "NONE", "LZ4", "ZLIB", "ZSTD", "SNAPPY" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "NONE", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description" [...] "initialSequenceId": { "kind": "parameter", "displayName": "Initial Sequence Id", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": -1, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "The first message published will have a sequence Id of initialSequenceId 1." }, "maxPendingMessages": { "kind": "parameter", "displayName": "Max Pending Messages", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Size of the pending massages queue. When the queue is full, by default, any further s [...] - "maxPendingMessagesAcrossPartitions": { "kind": "parameter", "displayName": "Max Pending Messages Across Partitions", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 50000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "The maximum number of pending messages for partiti [...] + "maxPendingMessagesAcrossPartitions": { "kind": "parameter", "displayName": "Max Pending Messages Across Partitions", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "int", "deprecated": true, "autowired": false, "secret": false, "defaultValue": 50000, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "The maximum number of pending messages for partitio [...] "messageRouter": { "kind": "parameter", "displayName": "Message Router", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.pulsar.client.api.MessageRouter", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Custom Message Router to use" }, "messageRoutingMode": { "kind": "parameter", "displayName": "Message Routing Mode", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.pulsar.client.api.MessageRoutingMode", "enum": [ "SinglePartition", "RoundRobinPartition", "CustomPartition" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "RoundRobinPartition", "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configuratio [...] "producerName": { "kind": "parameter", "displayName": "Producer Name", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField": "pulsarConfiguration", "description": "Name of the producer. If unset, lets Pulsar select a unique identifier." }, diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java index 1d9aa490e71..fb5695ce294 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java @@ -102,6 +102,7 @@ public class PulsarConfiguration implements Cloneable { description = "The maximum number of pending messages for partitioned topics. The maxPendingMessages value will be reduced if " + "(number of partitions * maxPendingMessages) exceeds this value. Partitioned topics have a pending message queue for each partition.", defaultValue = "50000") + @Deprecated private int maxPendingMessagesAcrossPartitions = 50000; @UriParam(label = "producer", description = "The maximum time period within which the messages sent will be batched if batchingEnabled is true.", @@ -312,12 +313,15 @@ public class PulsarConfiguration implements Cloneable { } /** - * Set the number of max pending messages across all the partitions. Default is 50000. + * Set the number of max pending messages across all the partitions. Default is 50000. This option is deprecated and + * will be removed in a future version. */ + @Deprecated public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) { this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions; } + @Deprecated public int getMaxPendingMessagesAcrossPartitions() { return maxPendingMessagesAcrossPartitions; } diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java index 52dcdc14dfb..100340eef2e 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java @@ -16,30 +16,35 @@ */ package org.apache.camel.component.pulsar; +import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.TypeConversionException; import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders; import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils; -import org.apache.camel.support.DefaultProducer; +import org.apache.camel.support.DefaultAsyncProducer; import org.apache.camel.util.CastUtils; import org.apache.camel.util.ObjectHelper; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PulsarProducer extends DefaultProducer { +public class PulsarProducer extends DefaultAsyncProducer { private static final Logger LOG = LoggerFactory.getLogger(PulsarProducer.class); + private final Object mutex = new Object(); private final PulsarEndpoint pulsarEndpoint; - private Producer<byte[]> producer; + private volatile Producer<byte[]> producer; public PulsarProducer(PulsarEndpoint pulsarEndpoint) { super(pulsarEndpoint); @@ -47,69 +52,103 @@ public class PulsarProducer extends DefaultProducer { } @Override - public void process(final Exchange exchange) throws Exception { - final Message message = exchange.getIn(); - - TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage(); - byte[] body; + public boolean process(Exchange exchange, AsyncCallback callback) { try { - body = exchange.getContext().getTypeConverter() - .mandatoryConvertTo(byte[].class, exchange, message.getBody()); - } catch (NoTypeConversionAvailableException | TypeConversionException exception) { - // fallback to try to serialize the data - body = PulsarMessageUtils.serialize(message.getBody()); - } - messageBuilder.value(body); + final Message message = exchange.getIn(); + byte[] body = serialize(exchange, message.getBody()); + TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage(); + messageBuilder.value(body); - String key = exchange.getIn().getHeader(PulsarMessageHeaders.KEY_OUT, String.class); - if (ObjectHelper.isNotEmpty(key)) { - messageBuilder.key(key); - } + String key = exchange.getIn().getHeader(PulsarMessageHeaders.KEY_OUT, String.class); + if (ObjectHelper.isNotEmpty(key)) { + messageBuilder.key(key); + } - Map<String, String> properties - = CastUtils.cast(exchange.getIn().getHeader(PulsarMessageHeaders.PROPERTIES_OUT, Map.class)); - if (ObjectHelper.isNotEmpty(properties)) { - messageBuilder.properties(properties); - } + Map<String, String> properties + = CastUtils.cast(exchange.getIn().getHeader(PulsarMessageHeaders.PROPERTIES_OUT, Map.class)); + if (ObjectHelper.isNotEmpty(properties)) { + messageBuilder.properties(properties); + } - Long eventTime = exchange.getIn().getHeader(PulsarMessageHeaders.EVENT_TIME_OUT, Long.class); - if (eventTime != null) { - messageBuilder.eventTime(eventTime); + Long eventTime = exchange.getIn().getHeader(PulsarMessageHeaders.EVENT_TIME_OUT, Long.class); + if (eventTime != null) { + messageBuilder.eventTime(eventTime); + } + + messageBuilder.sendAsync() + .thenAccept(r -> exchange.getIn().setBody(r)) + .whenComplete( + (r, e) -> { + try { + if (e != null) { + exchange.setException(new CamelExchangeException( + "An error occurred while sending a message to pulsar", exchange, e)); + } + } finally { + callback.done(false); + } + }); + } catch (Exception e) { + exchange.setException(e); + callback.done(true); + return true; } + return false; + } - messageBuilder.send(); + /** + * Serialize the given content using the appropriate converter if any, otherwise it relies on + * {@link PulsarMessageUtils#serialize(Object)}. + * + * @param exchange the exchange used as context for the serialization process. + * @param content the content to serialize. + * @return the serialized counterpart of the given content + * @throws IOException if an error occurs while serializing the content. + */ + private static byte[] serialize(Exchange exchange, Object content) throws IOException { + byte[] result; + try { + result = exchange.getContext().getTypeConverter() + .mandatoryConvertTo(byte[].class, exchange, content); + } catch (NoTypeConversionAvailableException | TypeConversionException exception) { + // fallback to try to serialize the data + result = PulsarMessageUtils.serialize(content); + } + return result; } - private synchronized void createProducer() throws org.apache.pulsar.client.api.PulsarClientException { - if (producer == null) { - final String topicUri = pulsarEndpoint.getUri(); - PulsarConfiguration configuration = pulsarEndpoint.getPulsarConfiguration(); - String producerName = configuration.getProducerName(); - final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint.getPulsarClient().newProducer().topic(topicUri) - .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS) - .blockIfQueueFull(configuration.isBlockIfQueueFull()) - .maxPendingMessages(configuration.getMaxPendingMessages()) - .maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions()) - .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS) - .batchingMaxMessages(configuration.getMaxPendingMessages()) - .enableBatching(configuration.isBatchingEnabled()).batcherBuilder(configuration.getBatcherBuilder()) - .initialSequenceId(configuration.getInitialSequenceId()) - .compressionType(configuration.getCompressionType()); - if (ObjectHelper.isNotEmpty(configuration.getMessageRouter())) { - producerBuilder.messageRouter(configuration.getMessageRouter()); - } else { - producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode()); - } - if (producerName != null) { - producerBuilder.producerName(producerName); + private void createProducer() throws PulsarClientException { + synchronized (mutex) { + if (producer == null) { + final String topicUri = pulsarEndpoint.getUri(); + PulsarConfiguration configuration = pulsarEndpoint.getPulsarConfiguration(); + String producerName = configuration.getProducerName(); + final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint.getPulsarClient().newProducer().topic(topicUri) + .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS) + .blockIfQueueFull(configuration.isBlockIfQueueFull()) + .maxPendingMessages(configuration.getMaxPendingMessages()) + .maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions()) + .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS) + .batchingMaxMessages(configuration.getMaxPendingMessages()) + .enableBatching(configuration.isBatchingEnabled()).batcherBuilder(configuration.getBatcherBuilder()) + .initialSequenceId(configuration.getInitialSequenceId()) + .compressionType(configuration.getCompressionType()); + if (ObjectHelper.isNotEmpty(configuration.getMessageRouter())) { + producerBuilder.messageRouter(configuration.getMessageRouter()); + } else { + producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode()); + } + if (producerName != null) { + producerBuilder.producerName(producerName); + } + producer = producerBuilder.create(); } - producer = producerBuilder.create(); } } @Override protected void doStart() throws Exception { - LOG.debug("Starting producer: {}", this); + LOG.debug("Starting the pulsar producer: {}", this); if (producer == null) { createProducer(); } @@ -117,7 +156,7 @@ public class PulsarProducer extends DefaultProducer { @Override protected void doStop() throws Exception { - LOG.debug("Stopping producer: {}", this); + LOG.debug("Stopping the pulsar producer: {}", this); if (producer != null) { producer.close(); producer = null;