Fix the KafkaConsumer to put the message in the body Right now, the consumer would create an exchange for each received message. However, it didn't filled the exchange body with the received message content.
Right now, it is set as an array of bytes but in the future we could use the Consumer decoder class to convert the content in the right type. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a2fd504a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a2fd504a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a2fd504a Branch: refs/heads/master Commit: a2fd504ac48a9f4759f59b7d2f028df1dc7087c4 Parents: 06ffb5d Author: Fabien Chaillou <fabien.chail...@gmail.com> Authored: Wed Mar 26 15:14:10 2014 -0400 Committer: Fabien Chaillou <fabien.chail...@gmail.com> Committed: Wed Mar 26 15:14:10 2014 -0400 ---------------------------------------------------------------------- .../main/java/org/apache/camel/component/kafka/KafkaEndpoint.java | 1 + .../test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java | 1 + 2 files changed, 2 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/a2fd504a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java index b700850..f88e3d6 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -111,6 +111,7 @@ public class KafkaEndpoint extends DefaultEndpoint { message.setHeader(KafkaConstants.PARTITION, mm.partition()); message.setHeader(KafkaConstants.TOPIC, mm.topic()); message.setHeader(KafkaConstants.KEY, new String(mm.key())); + message.setBody(mm.message()); exchange.setIn(message); return exchange; http://git-wip-us.apache.org/repos/asf/camel/blob/a2fd504a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java index cb9be59..a8ca6c3 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java @@ -79,6 +79,7 @@ public class KafkaConsumerIT extends CamelTestSupport { @Test public void kaftMessageIsConsumedByCamel() throws InterruptedException, IOException { to.expectedMessageCount(5); + to.expectedBodiesReceived("message-0","message-1","message-2","message-3","message-4" ); for (int k = 0; k < 5; k++) { String msg = "message-" + k; KeyedMessage<String, String> data = new KeyedMessage<String, String>(TOPIC, "1", msg);