Repository: camel
Updated Branches:
  refs/heads/camel-2.16.x c3d4ad71b -> d83f2d67b


[CAMEL-10065] Update camel-kafka to support Iterable and Iterator


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d83f2d67
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d83f2d67
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d83f2d67

Branch: refs/heads/camel-2.16.x
Commit: d83f2d67b98dce7a793052e3984ea96bdd18b2f8
Parents: c3d4ad7
Author: Daniel Kulp <dk...@apache.org>
Authored: Wed Jun 22 15:17:00 2016 -0400
Committer: Daniel Kulp <dk...@apache.org>
Committed: Wed Jun 22 15:17:31 2016 -0400

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaConsumer.java    |  6 +--
 .../camel/component/kafka/KafkaProducer.java    | 50 +++++++++++++++-----
 .../component/kafka/KafkaProducerFullTest.java  |  4 +-
 3 files changed, 42 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d83f2d67/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 94893fb..5ad4d82 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -131,7 +131,7 @@ public class KafkaConsumer extends DefaultConsumer {
         private KafkaStream<byte[], byte[]> stream;
         private CyclicBarrier barrier;
 
-        public BatchingConsumerTask(KafkaStream<byte[], byte[]> stream, 
CyclicBarrier barrier) {
+        BatchingConsumerTask(KafkaStream<byte[], byte[]> stream, CyclicBarrier 
barrier) {
             this.stream = stream;
             this.barrier = barrier;
         }
@@ -185,7 +185,7 @@ public class KafkaConsumer extends DefaultConsumer {
 
         private final ConsumerConnector consumer;
 
-        public CommitOffsetTask(ConsumerConnector consumer) {
+        CommitOffsetTask(ConsumerConnector consumer) {
             this.consumer = consumer;
         }
 
@@ -201,7 +201,7 @@ public class KafkaConsumer extends DefaultConsumer {
         private final ConsumerConnector consumer;
         private KafkaStream<byte[], byte[]> stream;
 
-        public AutoCommitConsumerTask(ConsumerConnector consumer, 
KafkaStream<byte[], byte[]> stream) {
+        AutoCommitConsumerTask(ConsumerConnector consumer, KafkaStream<byte[], 
byte[]> stream) {
             this.consumer = consumer;
             this.stream = stream;
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/d83f2d67/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 06a0317..09de594 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.component.kafka;
 
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Properties;
 
 import kafka.javaapi.producer.Producer;
@@ -25,6 +28,7 @@ import org.apache.camel.CamelException;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.CastUtils;
 
 /**
  *
@@ -77,20 +81,42 @@ public class KafkaProducer<K, V> extends DefaultProducer {
         K messageKey = (K) exchange.getIn().getHeader(KafkaConstants.KEY);
         boolean hasMessageKey = messageKey != null;
 
-        V msg = (V) exchange.getIn().getBody();
-        KeyedMessage<K, V> data;
-
-        if (hasPartitionKey && hasMessageKey) {
-            data = new KeyedMessage<K, V>(topic, messageKey, partitionKey, 
msg);
-        } else if (hasPartitionKey) {
-            data = new KeyedMessage<K, V>(topic, partitionKey, msg);
-        } else if (hasMessageKey) {
-            data = new KeyedMessage<K, V>(topic, messageKey, msg);
+        Object msg = exchange.getIn().getBody();
+        
+        if (msg instanceof Iterable) {
+            msg = ((Iterable<Object>)msg).iterator();
+        }
+        if (msg instanceof java.util.Iterator) {
+            List<KeyedMessage<K, V>> data = new LinkedList<KeyedMessage<K, 
V>>();
+            Iterator<V> it = CastUtils.cast((Iterator<?>)msg);
+            while (it.hasNext()) {
+                V m = it.next();
+                if (hasPartitionKey && hasMessageKey) {
+                    data.add(new KeyedMessage<K, V>(topic, messageKey, 
partitionKey, m));
+                } else if (hasPartitionKey) {
+                    data.add(new KeyedMessage<K, V>(topic, partitionKey, m));
+                } else if (hasMessageKey) {
+                    data.add(new KeyedMessage<K, V>(topic, messageKey, m));
+                } else {
+                    data.add(new KeyedMessage<K, V>(topic, messageKey, 
partitionKey, m));
+                }
+            }
+            producer.send(data);
         } else {
-            log.warn("No message key or partition key set");
-            data = new KeyedMessage<K, V>(topic, messageKey, partitionKey, 
msg);
+        
+            KeyedMessage<K, V> data;
+            V m = (V)msg;
+            if (hasPartitionKey && hasMessageKey) {
+                data = new KeyedMessage<K, V>(topic, messageKey, partitionKey, 
m);
+            } else if (hasPartitionKey) {
+                data = new KeyedMessage<K, V>(topic, partitionKey, m);
+            } else if (hasMessageKey) {
+                data = new KeyedMessage<K, V>(topic, messageKey, m);
+            } else {
+                data = new KeyedMessage<K, V>(topic, messageKey, partitionKey, 
m);
+            }
+            producer.send(data);
         }
-        producer.send(data);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/d83f2d67/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index d76a059..80370ff 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -35,9 +35,7 @@ import org.apache.camel.EndpointInject;
 import org.apache.camel.Produce;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
-import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -192,7 +190,7 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
         private final KafkaStream<byte[], byte[]> stream;
         private final CountDownLatch latch;
 
-        public KakfaTopicConsumer(KafkaStream<byte[], byte[]> stream, 
CountDownLatch latch) {
+        KakfaTopicConsumer(KafkaStream<byte[], byte[]> stream, CountDownLatch 
latch) {
             this.stream = stream;
             this.latch = latch;
         }

Reply via email to