Kafka offset of the message included as Camel Exchange header. This might be helpful also for logging purposes. In the test I've reused an offset value which was already present but not checked.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c81a0516 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c81a0516 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c81a0516 Branch: refs/heads/master Commit: c81a051616ecf97139c2ec3a8b972320b570167b Parents: 3fc9de7 Author: tarilabs <matteo.mort...@gmail.com> Authored: Fri Aug 7 13:43:07 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Aug 7 14:00:27 2015 +0200 ---------------------------------------------------------------------- .../main/java/org/apache/camel/component/kafka/KafkaConstants.java | 1 + .../main/java/org/apache/camel/component/kafka/KafkaEndpoint.java | 1 + .../java/org/apache/camel/component/kafka/KafkaEndpointTest.java | 1 + 3 files changed, 3 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c81a0516/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java index 6c31b65..3397060 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java @@ -27,6 +27,7 @@ public final class KafkaConstants { public static final String PARTITION = "kafka.EXCHANGE_NAME"; public static final String KEY = "kafka.CONTENT_TYPE"; public static final String TOPIC = "kafka.TOPIC"; + public static final String OFFSET = "kafka.OFFSET"; public static final String KAFKA_DEFAULT_ENCODER = "kafka.serializer.DefaultEncoder"; public static final String KAFKA_STRING_ENCODER = "kafka.serializer.StringEncoder"; http://git-wip-us.apache.org/repos/asf/camel/blob/c81a0516/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java index 78863f8..165c984 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -113,6 +113,7 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS Message message = new DefaultMessage(); message.setHeader(KafkaConstants.PARTITION, mm.partition()); message.setHeader(KafkaConstants.TOPIC, mm.topic()); + message.setHeader(KafkaConstants.OFFSET, mm.offset()); if (mm.key() != null) { message.setHeader(KafkaConstants.KEY, new String(mm.key())); } http://git-wip-us.apache.org/repos/asf/camel/blob/c81a0516/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java index ed4a6d1..be16766 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java @@ -44,6 +44,7 @@ public class KafkaEndpointTest { assertEquals("somekey", exchange.getIn().getHeader(KafkaConstants.KEY)); assertEquals("topic", exchange.getIn().getHeader(KafkaConstants.TOPIC)); assertEquals(4, exchange.getIn().getHeader(KafkaConstants.PARTITION)); + assertEquals(56L, exchange.getIn().getHeader(KafkaConstants.OFFSET)); } @Test