This is an automated email from the ASF dual-hosted git repository. oalsafi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 952b4a8 CAMEL-16481:camel-vertx-kafka set custom timestamp (#5337) 952b4a8 is described below commit 952b4a81df9c50d5bb0df63785b32283e1cdcbeb Author: Ramu <kramu...@gmail.com> AuthorDate: Tue Apr 13 13:49:48 2021 +0530 CAMEL-16481:camel-vertx-kafka set custom timestamp (#5337) Co-authored-by: Kodanda Ramu Kakarla <kkaka...@kkakarla.pnq.csb> --- .../src/main/docs/vertx-kafka-component.adoc | 1 + .../kafka/VertxKafkaConfigurationOptionsProxy.java | 11 +++++++ .../component/vertx/kafka/VertxKafkaConstants.java | 1 + .../operations/VertxKafkaProducerOperations.java | 16 +++++++++- .../VertxKafkaProducerOperationsTest.java | 35 ++++++++++++++++++++++ 5 files changed, 63 insertions(+), 1 deletion(-) diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/docs/vertx-kafka-component.adoc b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/docs/vertx-kafka-component.adoc index 9eaa5ed..6c86bf5 100644 --- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/docs/vertx-kafka-component.adoc +++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/docs/vertx-kafka-component.adoc @@ -343,6 +343,7 @@ Before sending a message to Kafka you can configure the following headers. |`CamelVertxKafkaMessageKey`| `VertxKafkaConstants.MESSAGE_KEY`|`String`| Explicitly specify the message key, if partition ID is not specified, this will trigger the messages to go into the same partition. |`CamelVertxKafkaTopic`| `VertxKafkaConstants.TOPIC`|`String`| Explicitly specify the topic to where produce the messages, this will be *preserved* in case of header aggregation. |`CamelVertxKafkaOverrideTopic`| `VertxKafkaConstants.OVERRIDE_TOPIC`|`String`| Explicitly specify the topic to where produce the messages, this will *not be preserved* in case of header aggregation and it will take *precedence* over `CamelVertxKafkaTopic`. +| `CamelVertxKafkaOverrideTimestamp` | `VertxKafkaConstants.OVERRIDE_TIMESTAMP` | Long | The ProducerRecord also has an associated timestamp. If the user did provide a timestamp, the producer will stamp the record with the provided timestamp and the header is not preserved. |======================================================================= If you want to send a message to a dynamic topic then use `VertxKafkaConstants.OVERRIDE_TOPIC` as its used as a one-time header diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConfigurationOptionsProxy.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConfigurationOptionsProxy.java index 253c9f6..6ab88e9 100644 --- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConfigurationOptionsProxy.java +++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConfigurationOptionsProxy.java @@ -46,6 +46,17 @@ public class VertxKafkaConfigurationOptionsProxy { return configuration.getValueSerializer(); } + public Object getOverrideTimestamp(final Message message) { + Object timestamp = getOption(message, VertxKafkaConstants.OVERRIDE_TIMESTAMP, () -> null, Object.class); + if (ObjectHelper.isNotEmpty(timestamp)) { + // must remove header so its not propagated + message.removeHeader(VertxKafkaConstants.OVERRIDE_TIMESTAMP); + } + + return timestamp; + + } + public String getTopic(final Message message) { return getOption(message, VertxKafkaConstants.TOPIC, configuration::getTopic, String.class); } diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConstants.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConstants.java index b822097..30a3da0 100644 --- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConstants.java +++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConstants.java @@ -28,6 +28,7 @@ public final class VertxKafkaConstants { public static final String OFFSET = HEADER_PREFIX + "Offset"; public static final String HEADERS = HEADER_PREFIX + "Headers"; public static final String TIMESTAMP = HEADER_PREFIX + "Timestamp"; + public static final String OVERRIDE_TIMESTAMP = HEADER_PREFIX + "OverrideTimestamp"; public static final String MANUAL_COMMIT = HEADER_PREFIX + "ManualCommit"; // headers evaluated by the producer only public static final String OVERRIDE_TOPIC = HEADER_PREFIX + "OverrideTopic"; diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/operations/VertxKafkaProducerOperations.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/operations/VertxKafkaProducerOperations.java index d29460c..b27dc9e 100644 --- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/operations/VertxKafkaProducerOperations.java +++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/operations/VertxKafkaProducerOperations.java @@ -142,11 +142,12 @@ public class VertxKafkaProducerOperations { final Object messageKey = getMessageKey(message); final Object messageValue = getMessageValue(message, inputData); final Integer partitionId = getPartitionId(message); + final Long overrideTimestamp = getOverrideTimestamp(message); final List<KafkaHeader> propagatedHeaders = new VertxKafkaHeadersPropagation(configurationOptionsProxy.getConfiguration().getHeaderFilterStrategy()) .getPropagatedHeaders(message); - return KafkaProducerRecord.create(topic, messageKey, messageValue, partitionId) + return KafkaProducerRecord.create(topic, messageKey, messageValue, overrideTimestamp, partitionId) .addHeaders(propagatedHeaders); } @@ -189,4 +190,17 @@ public class VertxKafkaProducerOperations { return VertxKafkaTypeSerializer.tryConvertToSerializedType(message, inputData, configurationOptionsProxy.getValueSerializer(message)); } + + private Long getOverrideTimestamp(final Message message) { + + Object timeStamp = configurationOptionsProxy.getOverrideTimestamp(message); + Long overrideTimestamp = null; + if (ObjectHelper.isNotEmpty(timeStamp)) { + overrideTimestamp = ((Long) timeStamp).longValue(); + } + + return overrideTimestamp; + + } + } diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/test/java/org/apache/camel/component/vertx/kafka/operations/VertxKafkaProducerOperationsTest.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/test/java/org/apache/camel/component/vertx/kafka/operations/VertxKafkaProducerOperationsTest.java index eeee1d8..bc4f5d2 100644 --- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/test/java/org/apache/camel/component/vertx/kafka/operations/VertxKafkaProducerOperationsTest.java +++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/test/java/org/apache/camel/component/vertx/kafka/operations/VertxKafkaProducerOperationsTest.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.vertx.kafka.operations; +import java.time.LocalDateTime; +import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -274,6 +276,27 @@ class VertxKafkaProducerOperationsTest extends CamelTestSupport { } @Test + void testSendEventWithOverrideTopicHeaderAndTimestamp() { + configuration.setTopic("sometopic"); + Long timestamp = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); + + final Message message = createMessage(); + + message.setHeader(VertxKafkaConstants.OVERRIDE_TOPIC, "anotherTopic"); + message.setHeader(VertxKafkaConstants.MESSAGE_KEY, "someKey"); + message.setHeader(VertxKafkaConstants.OVERRIDE_TIMESTAMP, timestamp); + message.setBody("test"); + + sendEvent(message); + + // the header is now removed + assertNull(message.getHeader(VertxKafkaConstants.OVERRIDE_TOPIC)); + assertNull(message.getHeader(VertxKafkaConstants.OVERRIDE_TIMESTAMP)); + + verifySendMessage("anotherTopic", "someKey", timestamp, "test"); + } + + @Test void testSendEventWithNoTopicSet() { configuration.setTopic(null); @@ -539,6 +562,18 @@ class VertxKafkaProducerOperationsTest extends CamelTestSupport { }); } + private void verifySendMessage( + final String topic, final Object messageKey, final Long timestamp, final Object messageBody) { + assertProducedMessages(records -> { + assertEquals(1, records.size()); + assertEquals(topic, records.get(0).topic()); + assertEquals(messageKey, records.get(0).key()); + assertEquals(messageBody, records.get(0).value()); + assertEquals(timestamp, records.get(0).timestamp()); + + }); + } + private void assertProducedMessages(final Consumer<List<ProducerRecord<Object, Object>>> recordsFn) { Awaitility .await()