Repository: camel Updated Branches: refs/heads/camel-2.13.x 2249559a0 -> 02f2945cd
Fix the KafkaConsumer to put the message in the body Right now, the consumer would create an exchange for each received message. However, it didn't filled the exchange body with the received message content. Right now, it is set as an array of bytes but in the future we could use the Consumer decoder class to convert the content in the right type. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6af7f21b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6af7f21b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6af7f21b Branch: refs/heads/camel-2.13.x Commit: 6af7f21b51298c7cda8a263ae74a8f75c4b5af02 Parents: e2d549d Author: Fabien Chaillou <fabien.chail...@gmail.com> Authored: Wed Mar 26 15:14:10 2014 -0400 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Mar 27 08:05:48 2014 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/camel/component/kafka/KafkaEndpoint.java | 1 + .../test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java | 1 + 2 files changed, 2 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/6af7f21b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java index b700850..f88e3d6 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -111,6 +111,7 @@ public class KafkaEndpoint extends DefaultEndpoint { message.setHeader(KafkaConstants.PARTITION, mm.partition()); message.setHeader(KafkaConstants.TOPIC, mm.topic()); message.setHeader(KafkaConstants.KEY, new String(mm.key())); + message.setBody(mm.message()); exchange.setIn(message); return exchange; http://git-wip-us.apache.org/repos/asf/camel/blob/6af7f21b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java index cb9be59..a8ca6c3 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java @@ -79,6 +79,7 @@ public class KafkaConsumerIT extends CamelTestSupport { @Test public void kaftMessageIsConsumedByCamel() throws InterruptedException, IOException { to.expectedMessageCount(5); + to.expectedBodiesReceived("message-0","message-1","message-2","message-3","message-4" ); for (int k = 0; k < 5; k++) { String msg = "message-" + k; KeyedMessage<String, String> data = new KeyedMessage<String, String>(TOPIC, "1", msg);