Repository: camel
Updated Branches:
  refs/heads/master fbac9200a -> 0409cf089


Changed KafkaProducer to return RecordMetaData in Exchange.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8919d65a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8919d65a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8919d65a

Branch: refs/heads/master
Commit: 8919d65a48f03ede1068547e74220a4121231cb4
Parents: fbac920
Author: Leo Prince <leoprince.francisxav...@target.com>
Authored: Tue Jun 28 10:03:32 2016 -0500
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Jul 1 08:45:18 2016 +0200

----------------------------------------------------------------------
 .../apache/camel/component/kafka/KafkaConstants.java |  1 +
 .../apache/camel/component/kafka/KafkaProducer.java  | 15 ++++++++++++---
 .../camel/component/kafka/KafkaProducerTest.java     | 14 ++++++++++++++
 3 files changed, 27 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8919d65a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
index db99a09..1ae0759 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
@@ -30,6 +30,7 @@ public final class KafkaConstants {
     public static final String KAFKA_DEFAULT_DESERIALIZER  = 
"org.apache.kafka.common.serialization.StringDeserializer";
     public static final String KAFKA_DEFAULT_PARTITIONER = 
"org.apache.kafka.clients.producer.internals.DefaultPartitioner";
     public static final String PARTITIONER_RANGE_ASSIGNOR = 
"org.apache.kafka.clients.consumer.RangeAssignor";
+    public static final String KAFKA_RECORDMETA = "kafka.RECORDMETA";
 
     private KafkaConstants() {
         // Utility class

http://git-wip-us.apache.org/repos/asf/camel/blob/8919d65a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index af41a24..da227c7 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -16,11 +16,13 @@
  */
 package org.apache.camel.component.kafka;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
+import java.util.Vector;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -174,13 +176,16 @@ public class KafkaProducer extends DefaultAsyncProducer {
     // Camel calls this method if the endpoint isSynchronous(), as the 
KafkaEndpoint creates a SynchronousDelegateProducer for it
     public void process(Exchange exchange) throws Exception {
         Iterator<ProducerRecord> c = createRecorder(exchange);
-        List<Future<ProducerRecord>> futures = new 
LinkedList<Future<ProducerRecord>>();
+        List<Future<RecordMetadata>> futures = new 
LinkedList<Future<RecordMetadata>>();
+        List<RecordMetadata> recordMetadatas = new ArrayList<RecordMetadata>();
+        exchange.getOut().setHeader(KafkaConstants.KAFKA_RECORDMETA, 
recordMetadatas);
+
         while (c.hasNext()) {
             futures.add(kafkaProducer.send(c.next()));
         }
-        for (Future<ProducerRecord> f : futures) {
+        for (Future<RecordMetadata> f : futures) {
             //wait for them all to be sent
-            f.get();
+            recordMetadatas.add(f.get());
         }
     }
 
@@ -207,10 +212,12 @@ public class KafkaProducer extends DefaultAsyncProducer {
         private final Exchange exchange;
         private final AsyncCallback callback;
         private final AtomicInteger count = new AtomicInteger(1);
+        private final List<RecordMetadata> recordMetadatas = new Vector<>();
 
         KafkaProducerCallBack(Exchange exchange, AsyncCallback callback) {
             this.exchange = exchange;
             this.callback = callback;
+            exchange.getOut().setHeader(KafkaConstants.KAFKA_RECORDMETA, 
recordMetadatas);
         }
 
         void increment() {
@@ -230,6 +237,8 @@ public class KafkaProducer extends DefaultAsyncProducer {
             if (e != null) {
                 exchange.setException(e);
             }
+            recordMetadatas.add(recordMetadata);
+
             if (count.decrementAndGet() == 0) {
                 // use worker pool to continue routing the exchange
                 // as this thread is from Kafka Callback and should not be 
used by Camel routing

http://git-wip-us.apache.org/repos/asf/camel/blob/8919d65a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index dcd3365..1a29c4d 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -43,6 +43,7 @@ public class KafkaProducerTest {
 
     private Exchange exchange = Mockito.mock(Exchange.class);
     private Message in = new DefaultMessage();
+    private Message out = new DefaultMessage();
     private AsyncCallback callback = Mockito.mock(AsyncCallback.class);
 
     @SuppressWarnings({"unchecked"})
@@ -72,6 +73,8 @@ public class KafkaProducerTest {
     public void processSendsMessage() throws Exception {
         endpoint.setTopic("sometopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
+        Mockito.when(exchange.getOut()).thenReturn(out);
+
         in.setHeader(KafkaConstants.PARTITION_KEY, "4");
 
         producer.process(exchange);
@@ -95,6 +98,7 @@ public class KafkaProducerTest {
     public void processAsyncSendsMessage() throws Exception {
         endpoint.setTopic("sometopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
+        Mockito.when(exchange.getOut()).thenReturn(out);
 
         in.setHeader(KafkaConstants.PARTITION_KEY, "4");
 
@@ -108,6 +112,7 @@ public class KafkaProducerTest {
 
         endpoint.setTopic("sometopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
+        Mockito.when(exchange.getOut()).thenReturn(out);
 
         // setup the exception here
         org.apache.kafka.clients.producer.KafkaProducer kp = 
producer.getKafkaProducer();
@@ -127,6 +132,7 @@ public class KafkaProducerTest {
         endpoint.setTopic(null);
         Mockito.when(exchange.getIn()).thenReturn(in);
         in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
+        Mockito.when(exchange.getOut()).thenReturn(out);
 
         producer.process(exchange);
 
@@ -137,6 +143,8 @@ public class KafkaProducerTest {
     public void processSendsMessageWithTopicHeaderAndEndPoint() throws 
Exception {
         endpoint.setTopic("sometopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
+        Mockito.when(exchange.getOut()).thenReturn(out);
+
         in.setHeader(KafkaConstants.PARTITION_KEY, "4");
         in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
         in.setHeader(KafkaConstants.KEY, "someKey");
@@ -158,6 +166,8 @@ public class KafkaProducerTest {
     public void processDoesNotRequirePartitionHeader() throws Exception {
         endpoint.setTopic("sometopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
+        Mockito.when(exchange.getOut()).thenReturn(out);
+
         producer.process(exchange);
     }
 
@@ -165,6 +175,7 @@ public class KafkaProducerTest {
     public void processSendsMesssageWithPartitionKeyHeader() throws Exception {
         endpoint.setTopic("someTopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
+        Mockito.when(exchange.getOut()).thenReturn(out);
         in.setHeader(KafkaConstants.PARTITION_KEY, "4");
         in.setHeader(KafkaConstants.KEY, "someKey");
         producer.process(exchange);
@@ -175,6 +186,7 @@ public class KafkaProducerTest {
     public void processSendsMesssageWithMessageKeyHeader() throws Exception {
         endpoint.setTopic("someTopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
+        Mockito.when(exchange.getOut()).thenReturn(out);
         in.setHeader(KafkaConstants.KEY, "someKey");
 
         producer.process(exchange);
@@ -187,6 +199,7 @@ public class KafkaProducerTest {
         endpoint.setTopic("someTopic");
         endpoint.setBridgeEndpoint(true);
         Mockito.when(exchange.getIn()).thenReturn(in);
+        Mockito.when(exchange.getOut()).thenReturn(out);
         in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
         in.setHeader(KafkaConstants.KEY, "someKey");
         in.setHeader(KafkaConstants.PARTITION_KEY, "4");
@@ -199,6 +212,7 @@ public class KafkaProducerTest {
     public void processSendsMesssageWithMessageTopicName() throws Exception {
         endpoint.setTopic("someTopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
+        Mockito.when(exchange.getOut()).thenReturn(out);
 
         producer.process(exchange);
 

Reply via email to