Repository: camel Updated Branches: refs/heads/master fbac9200a -> 0409cf089
Changed KafkaProducer to return RecordMetaData in Exchange. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8919d65a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8919d65a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8919d65a Branch: refs/heads/master Commit: 8919d65a48f03ede1068547e74220a4121231cb4 Parents: fbac920 Author: Leo Prince <leoprince.francisxav...@target.com> Authored: Tue Jun 28 10:03:32 2016 -0500 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Jul 1 08:45:18 2016 +0200 ---------------------------------------------------------------------- .../apache/camel/component/kafka/KafkaConstants.java | 1 + .../apache/camel/component/kafka/KafkaProducer.java | 15 ++++++++++++--- .../camel/component/kafka/KafkaProducerTest.java | 14 ++++++++++++++ 3 files changed, 27 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/8919d65a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java index db99a09..1ae0759 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java @@ -30,6 +30,7 @@ public final class KafkaConstants { public static final String KAFKA_DEFAULT_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; public static final String KAFKA_DEFAULT_PARTITIONER = "org.apache.kafka.clients.producer.internals.DefaultPartitioner"; public static final String PARTITIONER_RANGE_ASSIGNOR = "org.apache.kafka.clients.consumer.RangeAssignor"; + public static final String KAFKA_RECORDMETA = "kafka.RECORDMETA"; private KafkaConstants() { // Utility class http://git-wip-us.apache.org/repos/asf/camel/blob/8919d65a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index af41a24..da227c7 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -16,11 +16,13 @@ */ package org.apache.camel.component.kafka; +import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Properties; +import java.util.Vector; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; @@ -174,13 +176,16 @@ public class KafkaProducer extends DefaultAsyncProducer { // Camel calls this method if the endpoint isSynchronous(), as the KafkaEndpoint creates a SynchronousDelegateProducer for it public void process(Exchange exchange) throws Exception { Iterator<ProducerRecord> c = createRecorder(exchange); - List<Future<ProducerRecord>> futures = new LinkedList<Future<ProducerRecord>>(); + List<Future<RecordMetadata>> futures = new LinkedList<Future<RecordMetadata>>(); + List<RecordMetadata> recordMetadatas = new ArrayList<RecordMetadata>(); + exchange.getOut().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas); + while (c.hasNext()) { futures.add(kafkaProducer.send(c.next())); } - for (Future<ProducerRecord> f : futures) { + for (Future<RecordMetadata> f : futures) { //wait for them all to be sent - f.get(); + recordMetadatas.add(f.get()); } } @@ -207,10 +212,12 @@ public class KafkaProducer extends DefaultAsyncProducer { private final Exchange exchange; private final AsyncCallback callback; private final AtomicInteger count = new AtomicInteger(1); + private final List<RecordMetadata> recordMetadatas = new Vector<>(); KafkaProducerCallBack(Exchange exchange, AsyncCallback callback) { this.exchange = exchange; this.callback = callback; + exchange.getOut().setHeader(KafkaConstants.KAFKA_RECORDMETA, recordMetadatas); } void increment() { @@ -230,6 +237,8 @@ public class KafkaProducer extends DefaultAsyncProducer { if (e != null) { exchange.setException(e); } + recordMetadatas.add(recordMetadata); + if (count.decrementAndGet() == 0) { // use worker pool to continue routing the exchange // as this thread is from Kafka Callback and should not be used by Camel routing http://git-wip-us.apache.org/repos/asf/camel/blob/8919d65a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java index dcd3365..1a29c4d 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java @@ -43,6 +43,7 @@ public class KafkaProducerTest { private Exchange exchange = Mockito.mock(Exchange.class); private Message in = new DefaultMessage(); + private Message out = new DefaultMessage(); private AsyncCallback callback = Mockito.mock(AsyncCallback.class); @SuppressWarnings({"unchecked"}) @@ -72,6 +73,8 @@ public class KafkaProducerTest { public void processSendsMessage() throws Exception { endpoint.setTopic("sometopic"); Mockito.when(exchange.getIn()).thenReturn(in); + Mockito.when(exchange.getOut()).thenReturn(out); + in.setHeader(KafkaConstants.PARTITION_KEY, "4"); producer.process(exchange); @@ -95,6 +98,7 @@ public class KafkaProducerTest { public void processAsyncSendsMessage() throws Exception { endpoint.setTopic("sometopic"); Mockito.when(exchange.getIn()).thenReturn(in); + Mockito.when(exchange.getOut()).thenReturn(out); in.setHeader(KafkaConstants.PARTITION_KEY, "4"); @@ -108,6 +112,7 @@ public class KafkaProducerTest { endpoint.setTopic("sometopic"); Mockito.when(exchange.getIn()).thenReturn(in); + Mockito.when(exchange.getOut()).thenReturn(out); // setup the exception here org.apache.kafka.clients.producer.KafkaProducer kp = producer.getKafkaProducer(); @@ -127,6 +132,7 @@ public class KafkaProducerTest { endpoint.setTopic(null); Mockito.when(exchange.getIn()).thenReturn(in); in.setHeader(KafkaConstants.TOPIC, "anotherTopic"); + Mockito.when(exchange.getOut()).thenReturn(out); producer.process(exchange); @@ -137,6 +143,8 @@ public class KafkaProducerTest { public void processSendsMessageWithTopicHeaderAndEndPoint() throws Exception { endpoint.setTopic("sometopic"); Mockito.when(exchange.getIn()).thenReturn(in); + Mockito.when(exchange.getOut()).thenReturn(out); + in.setHeader(KafkaConstants.PARTITION_KEY, "4"); in.setHeader(KafkaConstants.TOPIC, "anotherTopic"); in.setHeader(KafkaConstants.KEY, "someKey"); @@ -158,6 +166,8 @@ public class KafkaProducerTest { public void processDoesNotRequirePartitionHeader() throws Exception { endpoint.setTopic("sometopic"); Mockito.when(exchange.getIn()).thenReturn(in); + Mockito.when(exchange.getOut()).thenReturn(out); + producer.process(exchange); } @@ -165,6 +175,7 @@ public class KafkaProducerTest { public void processSendsMesssageWithPartitionKeyHeader() throws Exception { endpoint.setTopic("someTopic"); Mockito.when(exchange.getIn()).thenReturn(in); + Mockito.when(exchange.getOut()).thenReturn(out); in.setHeader(KafkaConstants.PARTITION_KEY, "4"); in.setHeader(KafkaConstants.KEY, "someKey"); producer.process(exchange); @@ -175,6 +186,7 @@ public class KafkaProducerTest { public void processSendsMesssageWithMessageKeyHeader() throws Exception { endpoint.setTopic("someTopic"); Mockito.when(exchange.getIn()).thenReturn(in); + Mockito.when(exchange.getOut()).thenReturn(out); in.setHeader(KafkaConstants.KEY, "someKey"); producer.process(exchange); @@ -187,6 +199,7 @@ public class KafkaProducerTest { endpoint.setTopic("someTopic"); endpoint.setBridgeEndpoint(true); Mockito.when(exchange.getIn()).thenReturn(in); + Mockito.when(exchange.getOut()).thenReturn(out); in.setHeader(KafkaConstants.TOPIC, "anotherTopic"); in.setHeader(KafkaConstants.KEY, "someKey"); in.setHeader(KafkaConstants.PARTITION_KEY, "4"); @@ -199,6 +212,7 @@ public class KafkaProducerTest { public void processSendsMesssageWithMessageTopicName() throws Exception { endpoint.setTopic("someTopic"); Mockito.when(exchange.getIn()).thenReturn(in); + Mockito.when(exchange.getOut()).thenReturn(out); producer.process(exchange);