This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new f1b58ec fix #738 : added timestamp information to source records. f1b58ec is described below commit f1b58ecac6e97eee22166893f02bc6347fd39387 Author: Andrea Tarocchi <andrea.taroc...@gmail.com> AuthorDate: Thu Nov 26 17:25:53 2020 +0100 fix #738 : added timestamp information to source records. --- .../kafkaconnector/timer/CamelTimerSourceTask.java | 22 ++++++++++++++++++---- .../camel/kafkaconnector/CamelSourceTask.java | 10 ++++++++-- .../camel/kafkaconnector/CamelSourceTaskTest.java | 2 ++ 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/connectors/camel-timer-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/timer/CamelTimerSourceTask.java b/connectors/camel-timer-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/timer/CamelTimerSourceTask.java index 3dadb5a..b07fdfe 100644 --- a/connectors/camel-timer-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/timer/CamelTimerSourceTask.java +++ b/connectors/camel-timer-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/timer/CamelTimerSourceTask.java @@ -16,13 +16,14 @@ */ package org.apache.camel.kafkaconnector.timer; -import java.util.HashMap; -import java.util.Map; -import javax.annotation.Generated; +import org.apache.camel.Exchange; import org.apache.camel.kafkaconnector.CamelSourceConnectorConfig; import org.apache.camel.kafkaconnector.CamelSourceTask; -@Generated("This class has been generated by camel-kafka-connector-generator-maven-plugin, remove this annotation to prevent it from being generated.") +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + public class CamelTimerSourceTask extends CamelSourceTask { @Override @@ -36,4 +37,17 @@ public class CamelTimerSourceTask extends CamelSourceTask { put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "timer"); }}; } + + //XXX: this method override is the only difference from how the class was initially generated by camel-kafka-connector-generator-maven-plugin + @Override + protected long calculateTimestamp(Exchange exchange) { + if (exchange != null) { + Date fireDate = exchange.getProperty(Exchange.TIMER_FIRED_TIME, Date.class); + if (fireDate != null) { + return fireDate.getTime(); + } + } + + return super.calculateTimestamp(exchange); + } } \ No newline at end of file diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java index dd2ddc9..f196c25 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java @@ -173,9 +173,11 @@ public class CamelSourceTask extends SourceTask { final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null; final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null; + final long timestamp = calculateTimestamp(exchange); + for (String singleTopic : topics) { - SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, singleTopic, messageKeySchema, - messageHeaderKey, messageBodySchema, messageBodyValue); + SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, singleTopic, null, messageKeySchema, + messageHeaderKey, messageBodySchema, messageBodyValue, timestamp); if (exchange.getMessage().hasHeaders()) { setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX); @@ -240,6 +242,10 @@ public class CamelSourceTask extends SourceTask { return CAMEL_SOURCE_PATH_PROPERTIES_PREFIX; } + protected long calculateTimestamp(Exchange exchange) { + return System.currentTimeMillis(); + } + private void setAdditionalHeaders(SourceRecord record, Map<String, Object> map, String prefix) { for (Map.Entry<String, Object> entry : map.entrySet()) { diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java index 49bf878..2a85664 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java @@ -37,6 +37,7 @@ import org.junit.jupiter.api.Test; import static org.apache.camel.util.CollectionHelper.mapOf; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -67,6 +68,7 @@ public class CamelSourceTaskTest { assertEquals(size, poll.size()); assertEquals(TOPIC_NAME, poll.get(0).topic()); + assertNotNull(poll.get(0).timestamp()); assertEquals(LoggingLevel.OFF.toString(), sourceTask.getCamelSourceConnectorConfig(props) .getString(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF));