This is an automated email from the ASF dual-hosted git repository. nfilotto pushed a commit to branch CAMEL-16030/async-producer in repository https://gitbox.apache.org/repos/asf/camel.git
commit 5bf94a8f578a867531fb2f743c053b74e338f317 Author: Nicolas Filotto <nfilo...@talend.com> AuthorDate: Wed Oct 12 15:11:53 2022 +0200 CAMEL-16030: camel-pulsar - Add async send to producer --- .../component/pulsar/PulsarConfiguration.java | 5 +- .../camel/component/pulsar/PulsarProducer.java | 145 +++++++++++++-------- 2 files changed, 96 insertions(+), 54 deletions(-) diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java index 1d9aa490e71..5fe9db9f06f 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java @@ -312,12 +312,15 @@ public class PulsarConfiguration implements Cloneable { } /** - * Set the number of max pending messages across all the partitions. Default is 50000. + * Set the number of max pending messages across all the partitions. Default is 50000. This option is deprecated and + * will be removed in future version. */ + @Deprecated public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) { this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions; } + @Deprecated public int getMaxPendingMessagesAcrossPartitions() { return maxPendingMessagesAcrossPartitions; } diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java index 52dcdc14dfb..100340eef2e 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java @@ -16,30 +16,35 @@ */ package org.apache.camel.component.pulsar; +import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.TypeConversionException; import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders; import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils; -import org.apache.camel.support.DefaultProducer; +import org.apache.camel.support.DefaultAsyncProducer; import org.apache.camel.util.CastUtils; import org.apache.camel.util.ObjectHelper; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PulsarProducer extends DefaultProducer { +public class PulsarProducer extends DefaultAsyncProducer { private static final Logger LOG = LoggerFactory.getLogger(PulsarProducer.class); + private final Object mutex = new Object(); private final PulsarEndpoint pulsarEndpoint; - private Producer<byte[]> producer; + private volatile Producer<byte[]> producer; public PulsarProducer(PulsarEndpoint pulsarEndpoint) { super(pulsarEndpoint); @@ -47,69 +52,103 @@ public class PulsarProducer extends DefaultProducer { } @Override - public void process(final Exchange exchange) throws Exception { - final Message message = exchange.getIn(); - - TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage(); - byte[] body; + public boolean process(Exchange exchange, AsyncCallback callback) { try { - body = exchange.getContext().getTypeConverter() - .mandatoryConvertTo(byte[].class, exchange, message.getBody()); - } catch (NoTypeConversionAvailableException | TypeConversionException exception) { - // fallback to try to serialize the data - body = PulsarMessageUtils.serialize(message.getBody()); - } - messageBuilder.value(body); + final Message message = exchange.getIn(); + byte[] body = serialize(exchange, message.getBody()); + TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage(); + messageBuilder.value(body); - String key = exchange.getIn().getHeader(PulsarMessageHeaders.KEY_OUT, String.class); - if (ObjectHelper.isNotEmpty(key)) { - messageBuilder.key(key); - } + String key = exchange.getIn().getHeader(PulsarMessageHeaders.KEY_OUT, String.class); + if (ObjectHelper.isNotEmpty(key)) { + messageBuilder.key(key); + } - Map<String, String> properties - = CastUtils.cast(exchange.getIn().getHeader(PulsarMessageHeaders.PROPERTIES_OUT, Map.class)); - if (ObjectHelper.isNotEmpty(properties)) { - messageBuilder.properties(properties); - } + Map<String, String> properties + = CastUtils.cast(exchange.getIn().getHeader(PulsarMessageHeaders.PROPERTIES_OUT, Map.class)); + if (ObjectHelper.isNotEmpty(properties)) { + messageBuilder.properties(properties); + } - Long eventTime = exchange.getIn().getHeader(PulsarMessageHeaders.EVENT_TIME_OUT, Long.class); - if (eventTime != null) { - messageBuilder.eventTime(eventTime); + Long eventTime = exchange.getIn().getHeader(PulsarMessageHeaders.EVENT_TIME_OUT, Long.class); + if (eventTime != null) { + messageBuilder.eventTime(eventTime); + } + + messageBuilder.sendAsync() + .thenAccept(r -> exchange.getIn().setBody(r)) + .whenComplete( + (r, e) -> { + try { + if (e != null) { + exchange.setException(new CamelExchangeException( + "An error occurred while sending a message to pulsar", exchange, e)); + } + } finally { + callback.done(false); + } + }); + } catch (Exception e) { + exchange.setException(e); + callback.done(true); + return true; } + return false; + } - messageBuilder.send(); + /** + * Serialize the given content using the appropriate converter if any, otherwise it relies on + * {@link PulsarMessageUtils#serialize(Object)}. + * + * @param exchange the exchange used as context for the serialization process. + * @param content the content to serialize. + * @return the serialized counterpart of the given content + * @throws IOException if an error occurs while serializing the content. + */ + private static byte[] serialize(Exchange exchange, Object content) throws IOException { + byte[] result; + try { + result = exchange.getContext().getTypeConverter() + .mandatoryConvertTo(byte[].class, exchange, content); + } catch (NoTypeConversionAvailableException | TypeConversionException exception) { + // fallback to try to serialize the data + result = PulsarMessageUtils.serialize(content); + } + return result; } - private synchronized void createProducer() throws org.apache.pulsar.client.api.PulsarClientException { - if (producer == null) { - final String topicUri = pulsarEndpoint.getUri(); - PulsarConfiguration configuration = pulsarEndpoint.getPulsarConfiguration(); - String producerName = configuration.getProducerName(); - final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint.getPulsarClient().newProducer().topic(topicUri) - .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS) - .blockIfQueueFull(configuration.isBlockIfQueueFull()) - .maxPendingMessages(configuration.getMaxPendingMessages()) - .maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions()) - .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS) - .batchingMaxMessages(configuration.getMaxPendingMessages()) - .enableBatching(configuration.isBatchingEnabled()).batcherBuilder(configuration.getBatcherBuilder()) - .initialSequenceId(configuration.getInitialSequenceId()) - .compressionType(configuration.getCompressionType()); - if (ObjectHelper.isNotEmpty(configuration.getMessageRouter())) { - producerBuilder.messageRouter(configuration.getMessageRouter()); - } else { - producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode()); - } - if (producerName != null) { - producerBuilder.producerName(producerName); + private void createProducer() throws PulsarClientException { + synchronized (mutex) { + if (producer == null) { + final String topicUri = pulsarEndpoint.getUri(); + PulsarConfiguration configuration = pulsarEndpoint.getPulsarConfiguration(); + String producerName = configuration.getProducerName(); + final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint.getPulsarClient().newProducer().topic(topicUri) + .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS) + .blockIfQueueFull(configuration.isBlockIfQueueFull()) + .maxPendingMessages(configuration.getMaxPendingMessages()) + .maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions()) + .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS) + .batchingMaxMessages(configuration.getMaxPendingMessages()) + .enableBatching(configuration.isBatchingEnabled()).batcherBuilder(configuration.getBatcherBuilder()) + .initialSequenceId(configuration.getInitialSequenceId()) + .compressionType(configuration.getCompressionType()); + if (ObjectHelper.isNotEmpty(configuration.getMessageRouter())) { + producerBuilder.messageRouter(configuration.getMessageRouter()); + } else { + producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode()); + } + if (producerName != null) { + producerBuilder.producerName(producerName); + } + producer = producerBuilder.create(); } - producer = producerBuilder.create(); } } @Override protected void doStart() throws Exception { - LOG.debug("Starting producer: {}", this); + LOG.debug("Starting the pulsar producer: {}", this); if (producer == null) { createProducer(); } @@ -117,7 +156,7 @@ public class PulsarProducer extends DefaultProducer { @Override protected void doStop() throws Exception { - LOG.debug("Stopping producer: {}", this); + LOG.debug("Stopping the pulsar producer: {}", this); if (producer != null) { producer.close(); producer = null;