This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 3f0c9af "CAMEL-16414:camel-kafka set custom timestamp" (#5312) 3f0c9af is described below commit 3f0c9afa0ee30e62f1aea8a6dd08412b8aadbb9b Author: Ramu <kramu...@gmail.com> AuthorDate: Sat Apr 10 11:30:40 2021 +0530 "CAMEL-16414:camel-kafka set custom timestamp" (#5312) Co-authored-by: Kodanda Ramu Kakarla <kkaka...@kkakarla.pnq.csb> --- .../camel-kafka/src/main/docs/kafka-component.adoc | 1 + .../camel/component/kafka/KafkaConstants.java | 1 + .../camel/component/kafka/KafkaProducer.java | 19 ++++++++++++++-- .../camel/component/kafka/KafkaProducerTest.java | 26 ++++++++++++++++++++++ 4 files changed, 45 insertions(+), 2 deletions(-) diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc index 397f064..da2db71 100644 --- a/components/camel-kafka/src/main/docs/kafka-component.adoc +++ b/components/camel-kafka/src/main/docs/kafka-component.adoc @@ -310,6 +310,7 @@ Before sending a message to Kafka you can configure the following headers. | Header constant | Header value | Type | Description | KafkaConstants.KEY | "kafka.KEY" | Object | *Required* The key of the message in order to ensure that all related message goes in the same partition | KafkaConstants.OVERRIDE_TOPIC | "kafka.OVERRIDE_TOPIC" | String | The topic to which send the message (override and takes precedence), and the header is not preserved. +| KafkaConstants.OVERRIDE_TIMESTAMP | "kafka.OVERRIDE_TIMESTAMP" | Long | The ProducerRecord also has an associated timestamp. If the user did provide a timestamp, the producer will stamp the record with the provided timestamp and the header is not preserved. | KafkaConstants.PARTITION_KEY | "kafka.PARTITION_KEY" | Integer | Explicitly specify the partition |=== diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java index 785fc21..edc2dcb 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java @@ -28,6 +28,7 @@ public final class KafkaConstants { public static final String LAST_RECORD_BEFORE_COMMIT = "kafka.LAST_RECORD_BEFORE_COMMIT"; public static final String LAST_POLL_RECORD = "kafka.LAST_POLL_RECORD"; public static final String TIMESTAMP = "kafka.TIMESTAMP"; + public static final String OVERRIDE_TIMESTAMP = "kafka.OVERRIDE_TIMESTAMP"; @Deprecated public static final String KAFKA_DEFAULT_ENCODER = "kafka.serializer.DefaultEncoder"; diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index 32aa440..f720f2b 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -146,6 +146,7 @@ public class KafkaProducer extends DefaultAsyncProducer { @SuppressWarnings({ "unchecked", "rawtypes" }) protected Iterator<KeyValueHolder<Object, ProducerRecord>> createRecorder(Exchange exchange) throws Exception { String topic = endpoint.getConfiguration().getTopic(); + Long timeStamp = null; // must remove header so its not propagated Object overrideTopic = exchange.getIn().removeHeader(KafkaConstants.OVERRIDE_TOPIC); @@ -160,6 +161,12 @@ public class KafkaProducer extends DefaultAsyncProducer { topic = URISupport.extractRemainderPath(new URI(endpoint.getEndpointUri()), true); } + Object overrideTimeStamp = exchange.getIn().removeHeader(KafkaConstants.OVERRIDE_TIMESTAMP); + if ((overrideTimeStamp != null) && (overrideTimeStamp instanceof Long)) { + LOG.debug("Using override TimeStamp: {}", overrideTimeStamp); + timeStamp = ((Long) overrideTimeStamp).longValue(); + } + // extracting headers which need to be propagated List<Header> propagatedHeaders = getPropagatedHeaders(exchange, endpoint.getConfiguration()); @@ -189,6 +196,7 @@ public class KafkaProducer extends DefaultAsyncProducer { String innerTopic = msgTopic; Object innerKey = null; Integer innerPartitionKey = null; + Long innerTimestamp = null; Object value = next; Exchange ex = null; @@ -223,6 +231,12 @@ public class KafkaProducer extends DefaultAsyncProducer { } } + if (innerMmessage.getHeader(KafkaConstants.OVERRIDE_TIMESTAMP) != null) { + if (innerMmessage.getHeader(KafkaConstants.OVERRIDE_TIMESTAMP) instanceof Long) { + innerTimestamp + = ((Long) innerMmessage.removeHeader(KafkaConstants.OVERRIDE_TIMESTAMP)).longValue(); + } + } ex = innerExchange == null ? exchange : innerExchange; value = tryConvertToSerializedType(ex, innerMmessage.getBody(), endpoint.getConfiguration().getValueSerializer()); @@ -231,7 +245,8 @@ public class KafkaProducer extends DefaultAsyncProducer { return new KeyValueHolder( body, - new ProducerRecord(innerTopic, innerPartitionKey, null, innerKey, value, propagatedHeaders)); + new ProducerRecord( + innerTopic, innerPartitionKey, innerTimestamp, innerKey, value, propagatedHeaders)); } @Override @@ -257,7 +272,7 @@ public class KafkaProducer extends DefaultAsyncProducer { // the serializer Object value = tryConvertToSerializedType(exchange, msg, endpoint.getConfiguration().getValueSerializer()); - ProducerRecord record = new ProducerRecord(topic, partitionKey, null, key, value, propagatedHeaders); + ProducerRecord record = new ProducerRecord(topic, partitionKey, timeStamp, key, value, propagatedHeaders); return Collections.singletonList(new KeyValueHolder<Object, ProducerRecord>((Object) exchange, record)).iterator(); } diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java index 2b812e5..765c561 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.kafka; +import java.time.LocalDateTime; +import java.time.ZoneId; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; @@ -215,6 +217,8 @@ public class KafkaProducerTest { in.setHeader(KafkaConstants.PARTITION_KEY, 4); in.setHeader(KafkaConstants.OVERRIDE_TOPIC, "anotherTopic"); in.setHeader(KafkaConstants.KEY, "someKey"); + in.setHeader(KafkaConstants.OVERRIDE_TIMESTAMP, + LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); producer.process(exchange); @@ -303,6 +307,21 @@ public class KafkaProducerTest { } @Test + public void processSendsMessageWithMessageTimestampHeader() throws Exception { + endpoint.getConfiguration().setTopic("someTopic"); + Mockito.when(exchange.getIn()).thenReturn(in); + Mockito.when(exchange.getMessage()).thenReturn(out); + in.setHeader(KafkaConstants.KEY, "someKey"); + in.setHeader(KafkaConstants.OVERRIDE_TIMESTAMP, + LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); + + producer.process(exchange); + + verifySendMessage("someTopic", "someKey"); + assertRecordMetadataTimestampExists(); + } + + @Test public void processSendMessageWithTopicHeader() throws Exception { endpoint.getConfiguration().setTopic("someTopic"); Mockito.when(exchange.getIn()).thenReturn(in); @@ -425,6 +444,13 @@ public class KafkaProducerTest { assertEquals(expectedTopics, actualTopics); } + private void assertRecordMetadataTimestampExists() { + List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORDMETA); + assertNotNull(recordMetaData1); + assertEquals(recordMetaData1.size(), 1, "Expected one recordMetaData"); + assertNotNull(recordMetaData1.get(0).timestamp()); + } + private void assertRecordMetadataExists() { List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) in.getHeader(KafkaConstants.KAFKA_RECORDMETA); assertNotNull(recordMetaData1);