Repository: camel Updated Branches: refs/heads/master 3ce49256f -> bbe7f4c4b
[CAMEL-10065] Update camel-kafka to support Iterable and Iterator Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bbe7f4c4 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bbe7f4c4 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bbe7f4c4 Branch: refs/heads/master Commit: bbe7f4c4b90d97dd39f2f0c2916160bea8d397dd Parents: 3ce4925 Author: Daniel Kulp <dk...@apache.org> Authored: Thu Jun 16 13:09:04 2016 -0400 Committer: Daniel Kulp <dk...@apache.org> Committed: Thu Jun 16 13:14:35 2016 -0400 ---------------------------------------------------------------------- .../camel/component/kafka/KafkaProducer.java | 96 ++++++++++++++++---- 1 file changed, 76 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/bbe7f4c4/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index 1254e97..e2f25fb 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -16,8 +16,14 @@ */ package org.apache.camel.component.kafka; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelException; @@ -96,7 +102,7 @@ public class KafkaProducer extends DefaultAsyncProducer { } @SuppressWarnings("unchecked") - protected ProducerRecord createRecorder(Exchange exchange) throws CamelException { + protected Iterator<ProducerRecord> createRecorder(Exchange exchange) throws CamelException { String topic = endpoint.getTopic(); if (!endpoint.isBridgeEndpoint()) { topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, topic, String.class); @@ -104,14 +110,39 @@ public class KafkaProducer extends DefaultAsyncProducer { if (topic == null) { throw new CamelExchangeException("No topic key set", exchange); } - Object partitionKey = exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY); - boolean hasPartitionKey = partitionKey != null; + final Object partitionKey = exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY); + final boolean hasPartitionKey = partitionKey != null; - Object messageKey = exchange.getIn().getHeader(KafkaConstants.KEY); - boolean hasMessageKey = messageKey != null; + final Object messageKey = exchange.getIn().getHeader(KafkaConstants.KEY); + final boolean hasMessageKey = messageKey != null; Object msg = exchange.getIn().getBody(); + Iterator<Object> iterator = null; + if (msg instanceof Iterable) { + iterator = ((Iterable<Object>)msg).iterator(); + } else if (msg instanceof Iterator) { + iterator = (Iterator<Object>)msg; + } + if (iterator != null) { + final Iterator<Object> msgList = iterator; + final String msgTopic = topic; + return new Iterator<ProducerRecord>() { + @Override + public boolean hasNext() { + return msgList.hasNext(); + } + @Override + public ProducerRecord next() { + if (hasPartitionKey && hasMessageKey) { + return new ProducerRecord(msgTopic, new Integer(partitionKey.toString()), messageKey, msgList.next()); + } else if (hasMessageKey) { + return new ProducerRecord(msgTopic, messageKey, msgList.next()); + } + return new ProducerRecord(msgTopic, msgList.next()); + } + }; + } ProducerRecord record; if (hasPartitionKey && hasMessageKey) { record = new ProducerRecord(topic, new Integer(partitionKey.toString()), messageKey, msg); @@ -121,25 +152,35 @@ public class KafkaProducer extends DefaultAsyncProducer { log.warn("No message key or partition key set"); record = new ProducerRecord(topic, msg); } - return record; + return Collections.singletonList(record).iterator(); } @Override @SuppressWarnings("unchecked") // Camel calls this method if the endpoint isSynchronous(), as the KafkaEndpoint creates a SynchronousDelegateProducer for it public void process(Exchange exchange) throws Exception { - ProducerRecord record = createRecorder(exchange); - kafkaProducer.send(record).get(); + Iterator<ProducerRecord> c = createRecorder(exchange); + List<Future<ProducerRecord>> futures = new LinkedList<Future<ProducerRecord>>(); + while (c.hasNext()) { + futures.add(kafkaProducer.send(c.next())); + } + for (Future<ProducerRecord> f : futures) { + //wait for them all to be sent + f.get(); + } } @Override @SuppressWarnings("unchecked") public boolean process(Exchange exchange, AsyncCallback callback) { try { - ProducerRecord record = createRecorder(exchange); - kafkaProducer.send(record, new KafkaProducerCallBack(exchange, callback)); - // return false to process asynchronous - return false; + Iterator<ProducerRecord> c = createRecorder(exchange); + KafkaProducerCallBack cb = new KafkaProducerCallBack(exchange, callback); + while (c.hasNext()) { + cb.increment(); + kafkaProducer.send(c.next(), cb); + } + return cb.allSent(); } catch (Exception ex) { exchange.setException(ex); } @@ -151,25 +192,40 @@ public class KafkaProducer extends DefaultAsyncProducer { private final Exchange exchange; private final AsyncCallback callback; + private final AtomicInteger count = new AtomicInteger(1); KafkaProducerCallBack(Exchange exchange, AsyncCallback callback) { this.exchange = exchange; this.callback = callback; } + void increment() { + count.incrementAndGet(); + } + boolean allSent() { + if (count.decrementAndGet() == 0) { + //was able to get all the work done while queuing the requests + callback.done(true); + return true; + } + return false; + } + @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { exchange.setException(e); } - // use worker pool to continue routing the exchange - // as this thread is from Kafka Callback and should not be used by Camel routing - workerPool.submit(new Runnable() { - @Override - public void run() { - callback.done(false); - } - }); + if (count.decrementAndGet() == 0) { + // use worker pool to continue routing the exchange + // as this thread is from Kafka Callback and should not be used by Camel routing + workerPool.submit(new Runnable() { + @Override + public void run() { + callback.done(false); + } + }); + } } }