This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.x by this push:
     new a856fe9  CAMEL-14184: (Backport to 2.x) Allow setting Pulsar Message 
properties, event_time, key fields (#3344)
a856fe9 is described below

commit a856fe9e580f89487b5c09572ea9006f1078dad7
Author: William Thompson <william.thomp...@toasttab.com>
AuthorDate: Mon Nov 18 10:50:01 2019 -0500

    CAMEL-14184: (Backport to 2.x) Allow setting Pulsar Message properties, 
event_time, key fields (#3344)
    
    * Back-port CAMEL-14184: Allow setting Pulsar Message properties, 
event_time, key fields
    
    * Remove adoc line for camel-pulsar feature not in 2.x
    
    * Checkstyle fixes
---
 .../src/main/docs/pulsar-component.adoc            |  30 +++++
 .../camel/component/pulsar/PulsarProducer.java     |  26 ++++-
 .../pulsar/utils/message/PulsarMessageHeaders.java |   3 +
 .../pulsar/PulsarProducerHeadersInTest.java        | 126 +++++++++++++++++++++
 4 files changed, 184 insertions(+), 1 deletion(-)

diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc 
b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
index 26d5fa8..2638788 100644
--- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc
+++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
@@ -121,3 +121,33 @@ The component supports 3 options, which are listed below.
 | *camel.component.pulsar.resolve-property-placeholders* | Whether the 
component should resolve property placeholders on itself when starting. Only 
properties which are of String type can use property placeholders. | true | 
Boolean
 |===
 // spring-boot-auto-configure options: END
+
+// message-headers options: START
+=== Message headers evaluated by the Pulsar producer
+
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|===
+| Header | Type | Description
+| `CamelPulsarProducerMessageKey` | `String` | Sets the key on the message for 
the Pulsar routing policy
+| `CamelPulsarProducerMessageProperties` | `Map<String,String>` | The 
properties to set on the Pulsar message
+| `CamelPulsarProducerEventTime` | `long` | Sets the event time on the message
+|===
+
+=== Message headers set by the Pulsar consumer
+
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|===
+| Header | Type | Description
+| `properties` | `Map<String,String>` | The properties from the Pulsar message 
or the empty Map if unset on the Pulsar message
+| `producer_name` | `String` | The name of the producer that created the 
message
+| `sequence_id` | `long` | Sequence identifier of the Pulsar message
+| `publish_time` | `long` | Time the Pulsar message was published to the topic
+| `message_id` | `MessageId` | Unique identifier of the message
+| `event_time` | `long` | The event time associated with the message or 0 if 
unset on the Pulsar message
+| `key` | `String` | The key of the Pulsar message in String form or the empty 
string if unset on the Pulsar message
+| `key_bytes` | `byte[]` | The bytes in the key. If the key has been base64 
encoded, it is decoded before being returned. Otherwise, if the key is a plain 
string, the UTF-8 encoded bytes of the string.
+| `topic_name` | `String` | The topic to which the message was published
+|===
+// message-headers options: END
diff --git 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index f583597..dd1a1c5 100644
--- 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.pulsar;
 
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Exchange;
@@ -23,10 +24,14 @@ import org.apache.camel.Message;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.TypeConversionException;
 import org.apache.camel.component.pulsar.configuration.PulsarConfiguration;
+import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
 import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
 
 public class PulsarProducer extends DefaultProducer {
 
@@ -42,6 +47,8 @@ public class PulsarProducer extends DefaultProducer {
     @Override
     public void process(final Exchange exchange) throws Exception {
         final Message message = exchange.getIn();
+
+        TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
         byte[] body;
         try {
             body = exchange.getContext().getTypeConverter()
@@ -50,7 +57,24 @@ public class PulsarProducer extends DefaultProducer {
             // fallback to try serialize the data
             body = PulsarMessageUtils.serialize(message.getBody());
         }
-        producer.send(body);
+        messageBuilder.value(body);
+
+        String key = exchange.getIn().getHeader(PulsarMessageHeaders.KEY_OUT, 
String.class);
+        if (ObjectHelper.isNotEmpty(key)) {
+            messageBuilder.key(key);
+        }
+
+        Map<String, String> properties = 
CastUtils.cast(exchange.getIn().getHeader(PulsarMessageHeaders.PROPERTIES_OUT, 
Map.class));
+        if (ObjectHelper.isNotEmpty(properties)) {
+            messageBuilder.properties(properties);
+        }
+
+        Long eventTime = 
exchange.getIn().getHeader(PulsarMessageHeaders.EVENT_TIME_OUT, Long.class);
+        if (eventTime != null) {
+            messageBuilder.eventTime(eventTime);
+        }
+
+        messageBuilder.send();
     }
 
     private synchronized void createProducer() throws 
org.apache.pulsar.client.api.PulsarClientException {
diff --git 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java
 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java
index 65de4c1..1cfbaae 100644
--- 
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java
+++ 
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java
@@ -27,4 +27,7 @@ public interface PulsarMessageHeaders {
     String KEY = "key";
     String KEY_BYTES = "key_bytes";
     String TOPIC_NAME = "topic_name";
+    String KEY_OUT = "CamelPulsarProducerMessageKey";
+    String PROPERTIES_OUT = "CamelPulsarProducerMessageProperties";
+    String EVENT_TIME_OUT = "CamelPulsarProducerMessageEventTime";
 }
diff --git 
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerHeadersInTest.java
 
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerHeadersInTest.java
new file mode 100644
index 0000000..5d7df80
--- /dev/null
+++ 
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerHeadersInTest.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.pulsar;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.pulsar.utils.AutoConfiguration;
+import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.junit.Test;
+
+public class PulsarProducerHeadersInTest extends PulsarTestSupport {
+
+    private static final String TOPIC_URI = 
"persistent://public/default/camel-producer-topic";
+    private static final String PRODUCER = "camel-producer";
+
+    @Produce(uri = "direct:start")
+    private ProducerTemplate producerTemplate;
+
+    @EndpointInject(uri = "pulsar:" + TOPIC_URI
+            + "?numberOfConsumers=1&subscriptionType=Exclusive"
+            + "&subscriptionName=camel-subscription"
+            + "&consumerQueueSize=1"
+            + "&consumerName=camel-consumer"
+            + "&producerName=" + PRODUCER
+    )
+    private Endpoint pulsar;
+
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint mock;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from("direct:start").to(pulsar);
+                from(pulsar).to(mock);
+            }
+        };
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+
+        registerPulsarBeans(jndi);
+
+        return jndi;
+    }
+
+    private void registerPulsarBeans(final JndiRegistry jndi) throws 
PulsarClientException {
+        PulsarClient pulsarClient = givenPulsarClient();
+        AutoConfiguration autoConfiguration = new AutoConfiguration(null, 
null);
+
+        jndi.bind("pulsarClient", pulsarClient);
+        PulsarComponent comp = new PulsarComponent(context);
+        comp.setAutoConfiguration(autoConfiguration);
+        comp.setPulsarClient(pulsarClient);
+        jndi.bind("pulsar", comp);
+    }
+
+    private PulsarClient givenPulsarClient() throws PulsarClientException {
+        return new ClientBuilderImpl()
+                .serviceUrl(getPulsarBrokerUrl())
+                .ioThreads(1)
+                .listenerThreads(1)
+                .build();
+    }
+
+    @Test
+    public void propertyHeaderSetsPulsarProperties() throws 
InterruptedException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put("testProperty", "testValue");
+        mock.expectedHeaderReceived(PulsarMessageHeaders.PROPERTIES, 
properties);
+
+        producerTemplate.sendBodyAndHeader("test", 
PulsarMessageHeaders.PROPERTIES_OUT, properties);
+
+        MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, mock);
+    }
+
+    @Test
+    public void eventTimeHeaderSetsPulsarEventTime() throws 
InterruptedException {
+        long eventTime = 10000;
+        mock.expectedHeaderReceived(PulsarMessageHeaders.EVENT_TIME, 
eventTime);
+
+        producerTemplate.sendBodyAndHeader("test", 
PulsarMessageHeaders.EVENT_TIME_OUT, eventTime);
+
+        MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, mock);
+    }
+
+    @Test
+    public void keyHeaderSetsPulsarKey() throws InterruptedException {
+        String key = "testKey";
+        mock.expectedHeaderReceived(PulsarMessageHeaders.KEY, key);
+
+        producerTemplate.sendBodyAndHeader("test", 
PulsarMessageHeaders.KEY_OUT, key);
+
+        MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, mock);
+    }
+}

Reply via email to