This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 722298e85027de6a0e64d6d731653bc0f32b3f9c Author: Sherman Richard <sherm...@thehutgroup.com> AuthorDate: Tue Aug 13 13:33:10 2019 +0100 Increase configuration options for Pulsar --- .../camel/component/pulsar/PulsarProducer.java | 17 ++- .../pulsar/configuration/PulsarConfiguration.java | 144 +++++++++++++++++++++ 2 files changed, 159 insertions(+), 2 deletions(-) diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java index 70f1d40..140f0c7 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java @@ -20,11 +20,14 @@ import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.TypeConversionException; +import org.apache.camel.component.pulsar.configuration.PulsarConfiguration; import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils; import org.apache.camel.support.DefaultProducer; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; +import java.util.concurrent.TimeUnit; + public class PulsarProducer extends DefaultProducer { private final PulsarEndpoint pulsarEndpoint; @@ -52,7 +55,8 @@ public class PulsarProducer extends DefaultProducer { private synchronized void createProducer() throws org.apache.pulsar.client.api.PulsarClientException { if (producer == null) { final String topicUri = pulsarEndpoint.getUri(); - String producerName = pulsarEndpoint.getPulsarConfiguration().getProducerName(); + PulsarConfiguration configuration = pulsarEndpoint.getPulsarConfiguration(); + String producerName = configuration.getProducerName(); if (producerName == null) { producerName = topicUri + "-" + Thread.currentThread().getId(); } @@ -60,7 +64,16 @@ public class PulsarProducer extends DefaultProducer { .getPulsarClient() .newProducer() .producerName(producerName) - .topic(topicUri); + .topic(topicUri) + .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS) + .blockIfQueueFull(configuration.isBlockIfQueueFull()) + .maxPendingMessages(configuration.getMaxPendingMessages()) + .maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions()) + .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS) + .batchingMaxMessages(configuration.getMaxPendingMessages()) + .enableBatching(configuration.isBatchingEnabled()) + .initialSequenceId(configuration.getInitialSequenceId()) + .compressionType(configuration.getCompressionType()); producer = producerBuilder.create(); } } diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java index 6b160fd..b2099d3 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java @@ -19,6 +19,9 @@ package org.apache.camel.component.pulsar.configuration; import org.apache.camel.component.pulsar.utils.consumers.SubscriptionType; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriParams; +import org.apache.pulsar.client.api.CompressionType; + +import java.util.concurrent.TimeUnit; import static org.apache.camel.component.pulsar.utils.consumers.SubscriptionType.EXCLUSIVE; @@ -45,6 +48,24 @@ public class PulsarConfiguration { private long ackTimeoutMillis = 10000; @UriParam(label = "consumer", defaultValue = "100") private long ackGroupTimeMillis = 100; + @UriParam(label = "producer", description = "Send timeout in milliseconds", defaultValue = "30000") + private int sendTimeoutMs = 30000; + @UriParam(label = "producer", description = "Whether to block the producing thread if pending messages queue is full or to throw a ProducerQueueIsFullError", defaultValue = "false") + private boolean blockIfQueueFull = false; + @UriParam(label = "producer", description = "Size of the pending massages queue. When the queue is full, by default, any further sends will fail unless blockIfQueueFull=true", defaultValue = "1000") + private int maxPendingMessages = 1000; + @UriParam(label = "producer", description = "The maximum number of pending messages for partitioned topics. The maxPendingMessages value will be reduced if (number of partitions * maxPendingMessages) exceeds this value. Partitioned topics have a pending message queue for each partition.", defaultValue = "50000") + private int maxPendingMessagesAcrossPartitions = 50000; + @UriParam(label = "producer", description = "The maximum time period within which the messages sent will be batched if batchingEnabled is true.", defaultValue = "1000") + private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(1); + @UriParam(label = "producer", description = "The maximum size to batch messages.", defaultValue = "1000") + private int batchingMaxMessages = 1000; + @UriParam(label = "producer", description = "Control whether automatic batching of messages is enabled for the producer.", defaultValue = "true") + private boolean batchingEnabled = true; + @UriParam(label = "producer", description = "The first message published will have a sequence Id of initialSequenceId 1.", defaultValue = "-1") + private long initialSequenceId = -1; + @UriParam(label = "producer", description = "Compression type to use, defaults to NONE from [NONE, LZ4, ZLIB]", defaultValue = "NONE") + private CompressionType compressionType = CompressionType.NONE; public String getSubscriptionName() { return subscriptionName; @@ -159,4 +180,127 @@ public class PulsarConfiguration { public void setAckGroupTimeMillis(long ackGroupTimeMillis) { this.ackGroupTimeMillis = ackGroupTimeMillis; } + + /** + * Send timeout in milliseconds. + * Defaults to 30,000ms (30 seconds) + */ + public void setSendTimeoutMs(int sendTimeoutMs) { + this.sendTimeoutMs = sendTimeoutMs; + } + + public int getSendTimeoutMs() { + return sendTimeoutMs; + } + + /** + * Set whether the send and asyncSend operations should block when the outgoing message queue is full. + * If set to false, send operations will immediately fail with ProducerQueueIsFullError when there is no space left + * in the pending queue. + * Default is false. + */ + public void setBlockIfQueueFull(boolean blockIfQueueFull) { + this.blockIfQueueFull = blockIfQueueFull; + } + + public boolean isBlockIfQueueFull() { + return blockIfQueueFull; + } + + /** + * Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. + * Default is 1000. + */ + public void setMaxPendingMessages(int maxPendingMessages) { + this.maxPendingMessages = maxPendingMessages; + } + + public int getMaxPendingMessages() { + return maxPendingMessages; + } + + /** + * Set the number of max pending messages across all the partitions. + * Default is 50000. + */ + public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) { + this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions; + } + + public int getMaxPendingMessagesAcrossPartitions() { + return maxPendingMessagesAcrossPartitions; + } + + /** + * Set the time period within which the messages sent will be batched if batch messages are + * enabled. If set to a non zero value, messages will be queued until either: + * <ul> + * <li>this time interval expires</li> + * <li>the max number of messages in a batch is reached + * </ul> + * Default is 1ms. + */ + public void setBatchingMaxPublishDelayMicros(long batchingMaxPublishDelayMicros) { + this.batchingMaxPublishDelayMicros = batchingMaxPublishDelayMicros; + } + + public long getBatchingMaxPublishDelayMicros() { + return batchingMaxPublishDelayMicros; + } + + /** + * Set the maximum number of messages permitted in a batch. + * Default 1,000. + */ + public void setBatchingMaxMessages(int batchingMaxMessages) { + this.batchingMaxMessages = batchingMaxMessages; + } + + public int getBatchingMaxMessages() { + return batchingMaxMessages; + } + + /** + * Control whether automatic batching of messages is enabled for the producer. + * Default is true. + */ + public void setBatchingEnabled(boolean batchingEnabled) { + this.batchingEnabled = batchingEnabled; + } + + public boolean isBatchingEnabled() { + return batchingEnabled; + } + + /** + * Set the baseline for the sequence ids for messages published by the producer. + * First message will be using (initialSequenceId 1) as its sequence id and subsequent messages will be assigned + * incremental sequence ids, if not otherwise specified. + */ + public void setInitialSequenceId(long initialSequenceId) { + this.initialSequenceId = initialSequenceId; + } + + public long getInitialSequenceId() { + return initialSequenceId; + } + + /** + * + * Set the compression type for the producer. + * Supported compression types are: + * <ul> + * <li>NONE: No compression</li> + * <li>LZ4: Compress with LZ4 algorithm. Faster but lower compression than ZLib</li> + * <li>ZLI: Standard ZLib compression</li> + * </ul> + * Default is NONE + */ + public void setCompressionType(String compressionType) { + this.compressionType = CompressionType.valueOf(compressionType.toUpperCase()); + } + + public CompressionType getCompressionType() { + return compressionType; + } }