This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-2.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.x by this push: new a856fe9 CAMEL-14184: (Backport to 2.x) Allow setting Pulsar Message properties, event_time, key fields (#3344) a856fe9 is described below commit a856fe9e580f89487b5c09572ea9006f1078dad7 Author: William Thompson <william.thomp...@toasttab.com> AuthorDate: Mon Nov 18 10:50:01 2019 -0500 CAMEL-14184: (Backport to 2.x) Allow setting Pulsar Message properties, event_time, key fields (#3344) * Back-port CAMEL-14184: Allow setting Pulsar Message properties, event_time, key fields * Remove adoc line for camel-pulsar feature not in 2.x * Checkstyle fixes --- .../src/main/docs/pulsar-component.adoc | 30 +++++ .../camel/component/pulsar/PulsarProducer.java | 26 ++++- .../pulsar/utils/message/PulsarMessageHeaders.java | 3 + .../pulsar/PulsarProducerHeadersInTest.java | 126 +++++++++++++++++++++ 4 files changed, 184 insertions(+), 1 deletion(-) diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc b/components/camel-pulsar/src/main/docs/pulsar-component.adoc index 26d5fa8..2638788 100644 --- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc +++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc @@ -121,3 +121,33 @@ The component supports 3 options, which are listed below. | *camel.component.pulsar.resolve-property-placeholders* | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | Boolean |=== // spring-boot-auto-configure options: END + +// message-headers options: START +=== Message headers evaluated by the Pulsar producer + + +[width="100%",cols="10%,10%,80%",options="header",] +|=== +| Header | Type | Description +| `CamelPulsarProducerMessageKey` | `String` | Sets the key on the message for the Pulsar routing policy +| `CamelPulsarProducerMessageProperties` | `Map<String,String>` | The properties to set on the Pulsar message +| `CamelPulsarProducerEventTime` | `long` | Sets the event time on the message +|=== + +=== Message headers set by the Pulsar consumer + + +[width="100%",cols="10%,10%,80%",options="header",] +|=== +| Header | Type | Description +| `properties` | `Map<String,String>` | The properties from the Pulsar message or the empty Map if unset on the Pulsar message +| `producer_name` | `String` | The name of the producer that created the message +| `sequence_id` | `long` | Sequence identifier of the Pulsar message +| `publish_time` | `long` | Time the Pulsar message was published to the topic +| `message_id` | `MessageId` | Unique identifier of the message +| `event_time` | `long` | The event time associated with the message or 0 if unset on the Pulsar message +| `key` | `String` | The key of the Pulsar message in String form or the empty string if unset on the Pulsar message +| `key_bytes` | `byte[]` | The bytes in the key. If the key has been base64 encoded, it is decoded before being returned. Otherwise, if the key is a plain string, the UTF-8 encoded bytes of the string. +| `topic_name` | `String` | The topic to which the message was published +|=== +// message-headers options: END diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java index f583597..dd1a1c5 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.pulsar; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.camel.Exchange; @@ -23,10 +24,14 @@ import org.apache.camel.Message; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.TypeConversionException; import org.apache.camel.component.pulsar.configuration.PulsarConfiguration; +import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders; import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils; import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.util.CastUtils; +import org.apache.camel.util.ObjectHelper; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.TypedMessageBuilder; public class PulsarProducer extends DefaultProducer { @@ -42,6 +47,8 @@ public class PulsarProducer extends DefaultProducer { @Override public void process(final Exchange exchange) throws Exception { final Message message = exchange.getIn(); + + TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage(); byte[] body; try { body = exchange.getContext().getTypeConverter() @@ -50,7 +57,24 @@ public class PulsarProducer extends DefaultProducer { // fallback to try serialize the data body = PulsarMessageUtils.serialize(message.getBody()); } - producer.send(body); + messageBuilder.value(body); + + String key = exchange.getIn().getHeader(PulsarMessageHeaders.KEY_OUT, String.class); + if (ObjectHelper.isNotEmpty(key)) { + messageBuilder.key(key); + } + + Map<String, String> properties = CastUtils.cast(exchange.getIn().getHeader(PulsarMessageHeaders.PROPERTIES_OUT, Map.class)); + if (ObjectHelper.isNotEmpty(properties)) { + messageBuilder.properties(properties); + } + + Long eventTime = exchange.getIn().getHeader(PulsarMessageHeaders.EVENT_TIME_OUT, Long.class); + if (eventTime != null) { + messageBuilder.eventTime(eventTime); + } + + messageBuilder.send(); } private synchronized void createProducer() throws org.apache.pulsar.client.api.PulsarClientException { diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java index 65de4c1..1cfbaae 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java @@ -27,4 +27,7 @@ public interface PulsarMessageHeaders { String KEY = "key"; String KEY_BYTES = "key_bytes"; String TOPIC_NAME = "topic_name"; + String KEY_OUT = "CamelPulsarProducerMessageKey"; + String PROPERTIES_OUT = "CamelPulsarProducerMessageProperties"; + String EVENT_TIME_OUT = "CamelPulsarProducerMessageEventTime"; } diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerHeadersInTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerHeadersInTest.java new file mode 100644 index 0000000..5d7df80 --- /dev/null +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerHeadersInTest.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.pulsar; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.pulsar.utils.AutoConfiguration; +import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders; +import org.apache.camel.impl.JndiRegistry; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.junit.Test; + +public class PulsarProducerHeadersInTest extends PulsarTestSupport { + + private static final String TOPIC_URI = "persistent://public/default/camel-producer-topic"; + private static final String PRODUCER = "camel-producer"; + + @Produce(uri = "direct:start") + private ProducerTemplate producerTemplate; + + @EndpointInject(uri = "pulsar:" + TOPIC_URI + + "?numberOfConsumers=1&subscriptionType=Exclusive" + + "&subscriptionName=camel-subscription" + + "&consumerQueueSize=1" + + "&consumerName=camel-consumer" + + "&producerName=" + PRODUCER + ) + private Endpoint pulsar; + + @EndpointInject(uri = "mock:result") + private MockEndpoint mock; + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + + @Override + public void configure() { + from("direct:start").to(pulsar); + from(pulsar).to(mock); + } + }; + } + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry jndi = super.createRegistry(); + + registerPulsarBeans(jndi); + + return jndi; + } + + private void registerPulsarBeans(final JndiRegistry jndi) throws PulsarClientException { + PulsarClient pulsarClient = givenPulsarClient(); + AutoConfiguration autoConfiguration = new AutoConfiguration(null, null); + + jndi.bind("pulsarClient", pulsarClient); + PulsarComponent comp = new PulsarComponent(context); + comp.setAutoConfiguration(autoConfiguration); + comp.setPulsarClient(pulsarClient); + jndi.bind("pulsar", comp); + } + + private PulsarClient givenPulsarClient() throws PulsarClientException { + return new ClientBuilderImpl() + .serviceUrl(getPulsarBrokerUrl()) + .ioThreads(1) + .listenerThreads(1) + .build(); + } + + @Test + public void propertyHeaderSetsPulsarProperties() throws InterruptedException { + Map<String, String> properties = new HashMap<>(); + properties.put("testProperty", "testValue"); + mock.expectedHeaderReceived(PulsarMessageHeaders.PROPERTIES, properties); + + producerTemplate.sendBodyAndHeader("test", PulsarMessageHeaders.PROPERTIES_OUT, properties); + + MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, mock); + } + + @Test + public void eventTimeHeaderSetsPulsarEventTime() throws InterruptedException { + long eventTime = 10000; + mock.expectedHeaderReceived(PulsarMessageHeaders.EVENT_TIME, eventTime); + + producerTemplate.sendBodyAndHeader("test", PulsarMessageHeaders.EVENT_TIME_OUT, eventTime); + + MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, mock); + } + + @Test + public void keyHeaderSetsPulsarKey() throws InterruptedException { + String key = "testKey"; + mock.expectedHeaderReceived(PulsarMessageHeaders.KEY, key); + + producerTemplate.sendBodyAndHeader("test", PulsarMessageHeaders.KEY_OUT, key); + + MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, mock); + } +}