Repository: camel Updated Branches: refs/heads/camel-2.16.x c3d4ad71b -> d83f2d67b
[CAMEL-10065] Update camel-kafka to support Iterable and Iterator Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d83f2d67 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d83f2d67 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d83f2d67 Branch: refs/heads/camel-2.16.x Commit: d83f2d67b98dce7a793052e3984ea96bdd18b2f8 Parents: c3d4ad7 Author: Daniel Kulp <dk...@apache.org> Authored: Wed Jun 22 15:17:00 2016 -0400 Committer: Daniel Kulp <dk...@apache.org> Committed: Wed Jun 22 15:17:31 2016 -0400 ---------------------------------------------------------------------- .../camel/component/kafka/KafkaConsumer.java | 6 +-- .../camel/component/kafka/KafkaProducer.java | 50 +++++++++++++++----- .../component/kafka/KafkaProducerFullTest.java | 4 +- 3 files changed, 42 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d83f2d67/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 94893fb..5ad4d82 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -131,7 +131,7 @@ public class KafkaConsumer extends DefaultConsumer { private KafkaStream<byte[], byte[]> stream; private CyclicBarrier barrier; - public BatchingConsumerTask(KafkaStream<byte[], byte[]> stream, CyclicBarrier barrier) { + BatchingConsumerTask(KafkaStream<byte[], byte[]> stream, CyclicBarrier barrier) { this.stream = stream; this.barrier = barrier; } @@ -185,7 +185,7 @@ public class KafkaConsumer extends DefaultConsumer { private final ConsumerConnector consumer; - public CommitOffsetTask(ConsumerConnector consumer) { + CommitOffsetTask(ConsumerConnector consumer) { this.consumer = consumer; } @@ -201,7 +201,7 @@ public class KafkaConsumer extends DefaultConsumer { private final ConsumerConnector consumer; private KafkaStream<byte[], byte[]> stream; - public AutoCommitConsumerTask(ConsumerConnector consumer, KafkaStream<byte[], byte[]> stream) { + AutoCommitConsumerTask(ConsumerConnector consumer, KafkaStream<byte[], byte[]> stream) { this.consumer = consumer; this.stream = stream; } http://git-wip-us.apache.org/repos/asf/camel/blob/d83f2d67/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index 06a0317..09de594 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -16,6 +16,9 @@ */ package org.apache.camel.component.kafka; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; import java.util.Properties; import kafka.javaapi.producer.Producer; @@ -25,6 +28,7 @@ import org.apache.camel.CamelException; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.util.CastUtils; /** * @@ -77,20 +81,42 @@ public class KafkaProducer<K, V> extends DefaultProducer { K messageKey = (K) exchange.getIn().getHeader(KafkaConstants.KEY); boolean hasMessageKey = messageKey != null; - V msg = (V) exchange.getIn().getBody(); - KeyedMessage<K, V> data; - - if (hasPartitionKey && hasMessageKey) { - data = new KeyedMessage<K, V>(topic, messageKey, partitionKey, msg); - } else if (hasPartitionKey) { - data = new KeyedMessage<K, V>(topic, partitionKey, msg); - } else if (hasMessageKey) { - data = new KeyedMessage<K, V>(topic, messageKey, msg); + Object msg = exchange.getIn().getBody(); + + if (msg instanceof Iterable) { + msg = ((Iterable<Object>)msg).iterator(); + } + if (msg instanceof java.util.Iterator) { + List<KeyedMessage<K, V>> data = new LinkedList<KeyedMessage<K, V>>(); + Iterator<V> it = CastUtils.cast((Iterator<?>)msg); + while (it.hasNext()) { + V m = it.next(); + if (hasPartitionKey && hasMessageKey) { + data.add(new KeyedMessage<K, V>(topic, messageKey, partitionKey, m)); + } else if (hasPartitionKey) { + data.add(new KeyedMessage<K, V>(topic, partitionKey, m)); + } else if (hasMessageKey) { + data.add(new KeyedMessage<K, V>(topic, messageKey, m)); + } else { + data.add(new KeyedMessage<K, V>(topic, messageKey, partitionKey, m)); + } + } + producer.send(data); } else { - log.warn("No message key or partition key set"); - data = new KeyedMessage<K, V>(topic, messageKey, partitionKey, msg); + + KeyedMessage<K, V> data; + V m = (V)msg; + if (hasPartitionKey && hasMessageKey) { + data = new KeyedMessage<K, V>(topic, messageKey, partitionKey, m); + } else if (hasPartitionKey) { + data = new KeyedMessage<K, V>(topic, partitionKey, m); + } else if (hasMessageKey) { + data = new KeyedMessage<K, V>(topic, messageKey, m); + } else { + data = new KeyedMessage<K, V>(topic, messageKey, partitionKey, m); + } + producer.send(data); } - producer.send(data); } } http://git-wip-us.apache.org/repos/asf/camel/blob/d83f2d67/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java index d76a059..80370ff 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java @@ -35,9 +35,7 @@ import org.apache.camel.EndpointInject; import org.apache.camel.Produce; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; -import org.junit.After; import org.junit.AfterClass; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; @@ -192,7 +190,7 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { private final KafkaStream<byte[], byte[]> stream; private final CountDownLatch latch; - public KakfaTopicConsumer(KafkaStream<byte[], byte[]> stream, CountDownLatch latch) { + KakfaTopicConsumer(KafkaStream<byte[], byte[]> stream, CountDownLatch latch) { this.stream = stream; this.latch = latch; }