This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f8a5b7b5fb728649164280c102b63ccbd5f17539
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Fri Sep 20 12:31:27 2019 +0200

    CAMEL-14001 - Camel-Pulsar: Add MessageRoutingMode and MessageRouter option 
to producer, messageRouting mode option
---
 .../src/main/docs/pulsar-component.adoc            |  3 +-
 .../camel/component/pulsar/PulsarProducer.java     |  3 +-
 .../pulsar/configuration/PulsarConfiguration.java  | 16 ++++++++++
 .../endpoint/dsl/PulsarEndpointBuilderFactory.java | 36 ++++++++++++++++++++++
 4 files changed, 56 insertions(+), 2 deletions(-)

diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc 
b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
index 605bff6..fa40b34 100644
--- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc
+++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
@@ -69,7 +69,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (25 parameters):
+=== Query Parameters (26 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -96,6 +96,7 @@ with the following path and query parameters:
 | *lazyStartProducer* (producer) | Whether the producer should be started lazy 
(on the first message). By starting lazy you can use this to allow CamelContext 
and routes to startup in situations where a producer may otherwise fail during 
starting and cause the route to fail being started. By deferring this startup 
to be lazy then the startup failure can be handled during routing messages via 
Camel's routing error handlers. Beware that when the first message is processed 
then creating and [...]
 | *maxPendingMessages* (producer) | Set the max size of the queue holding the 
messages pending to receive an acknowledgment from the broker. Default is 1000. 
| 1000 | int
 | *maxPendingMessagesAcross Partitions* (producer) | Set the number of max 
pending messages across all the partitions. Default is 50000. | 50000 | int
+| *messageRoutingMode* (producer) | Set the message routing mode for the 
producer. | RoundRobinPartition | MessageRoutingMode
 | *producerName* (producer) | Name of the producer | default-producer | String
 | *sendTimeoutMs* (producer) | Send timeout in milliseconds. Defaults to 
30,000ms (30 seconds) | 30000 | int
 | *basicPropertyBinding* (advanced) | Whether the endpoint should use basic 
property binding (Camel 2.x) or the newer property binding with additional 
capabilities | false | boolean
diff --git 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index 2b4eaf3..f4a5646 100644
--- 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -73,7 +73,8 @@ public class PulsarProducer extends DefaultProducer {
                     .batchingMaxMessages(configuration.getMaxPendingMessages())
                     .enableBatching(configuration.isBatchingEnabled())
                     .initialSequenceId(configuration.getInitialSequenceId())
-                    .compressionType(configuration.getCompressionType());
+                    .compressionType(configuration.getCompressionType())
+                    .messageRoutingMode(configuration.getMessageRoutingMode());
             producer = producerBuilder.create();
         }
     }
diff --git 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
index 223c124..4366166 100644
--- 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
+++ 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
@@ -23,6 +23,7 @@ import 
org.apache.camel.component.pulsar.utils.consumers.SubscriptionType;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
 import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 
 import static 
org.apache.camel.component.pulsar.utils.consumers.SubscriptionType.EXCLUSIVE;
 
@@ -69,6 +70,8 @@ public class PulsarConfiguration {
     private long initialSequenceId = -1;
     @UriParam(label = "producer", description = "Compression type to use", 
defaultValue = "NONE")
     private CompressionType compressionType = CompressionType.NONE;
+    @UriParam(label = "producer", description = "MessageRoutingMode", 
defaultValue = "RoundRobinPartition")
+    private MessageRoutingMode messageRoutingMode = 
MessageRoutingMode.RoundRobinPartition;
 
     public String getSubscriptionName() {
         return subscriptionName;
@@ -305,4 +308,17 @@ public class PulsarConfiguration {
     public CompressionType getCompressionType() {
         return compressionType;
     }
+
+    /**
+     * Set the message routing mode for the producer.
+     */
+       public MessageRoutingMode getMessageRoutingMode() {
+               return messageRoutingMode;
+       }
+
+       public void setMessageRoutingMode(MessageRoutingMode 
messageRoutingMode) {
+               this.messageRoutingMode = messageRoutingMode;
+       }
+    
+
 }
diff --git 
a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
 
b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
index 1e770ca..943e2f1 100644
--- 
a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
+++ 
b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
@@ -667,6 +667,32 @@ public interface PulsarEndpointBuilderFactory {
             return this;
         }
         /**
+         * Set the message routing mode for the producer.
+         * 
+         * The option is a:
+         * <code>org.apache.pulsar.client.api.MessageRoutingMode</code> type.
+         * 
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder messageRoutingMode(
+                MessageRoutingMode messageRoutingMode) {
+            doSetProperty("messageRoutingMode", messageRoutingMode);
+            return this;
+        }
+        /**
+         * Set the message routing mode for the producer.
+         * 
+         * The option will be converted to a
+         * <code>org.apache.pulsar.client.api.MessageRoutingMode</code> type.
+         * 
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder messageRoutingMode(
+                String messageRoutingMode) {
+            doSetProperty("messageRoutingMode", messageRoutingMode);
+            return this;
+        }
+        /**
          * Name of the producer.
          * 
          * The option is a: <code>java.lang.String</code> type.
@@ -857,6 +883,16 @@ public interface PulsarEndpointBuilderFactory {
         ZSTD,
         SNAPPY;
     }
+
+    /**
+     * Proxy enum for
+     * <code>org.apache.pulsar.client.api.MessageRoutingMode</code> enum.
+     */
+    enum MessageRoutingMode {
+        SinglePartition,
+        RoundRobinPartition,
+        CustomPartition;
+    }
     /**
      * Apache Pulsar (camel-pulsar)
      * Camel Apache Pulsar Component

Reply via email to