Repository: camel
Updated Branches:
  refs/heads/camel-2.14.x d5fede982 -> f77a40672
  refs/heads/master ed240a0f1 -> f1e91780b


CAMEL-8190: Kafka producer: partition key is optional, not required by kafka 
client api. Thanks to Ivan Vasyliev for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f1e91780
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f1e91780
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f1e91780

Branch: refs/heads/master
Commit: f1e91780b8b22b625616325bd1ed3f46b19f2d36
Parents: ed240a0
Author: Claus Ibsen <davscl...@apache.org>
Authored: Fri Feb 6 14:23:53 2015 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Feb 6 14:23:53 2015 +0100

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaProducer.java    | 12 ++----
 .../component/kafka/KafkaProducerTest.java      | 40 ++++++++------------
 2 files changed, 19 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f1e91780/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 4d681f1..2918ffc 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -21,7 +21,6 @@ import java.util.Properties;
 import kafka.javaapi.producer.Producer;
 import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
-
 import org.apache.camel.CamelException;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
@@ -62,19 +61,14 @@ public class KafkaProducer extends DefaultProducer {
 
     @Override
     public void process(Exchange exchange) throws CamelException {
-        Object partitionKey = 
exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY);
-        if (partitionKey == null) {
-            throw new CamelExchangeException("No partition key set", exchange);
-        }
-
         String topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, 
endpoint.getTopic(), String.class);
         if (topic == null) {
             throw new CamelExchangeException("No topic key set", exchange);
         }
-
+        String partitionKey = 
exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, String.class);
+        String messageKey = exchange.getIn().getHeader(KafkaConstants.KEY, 
String.class);
         String msg = exchange.getIn().getBody(String.class);
-
-        KeyedMessage<String, String> data = new KeyedMessage<String, 
String>(topic, partitionKey.toString(), msg);
+        KeyedMessage<String, String> data = new KeyedMessage<String, 
String>(topic, messageKey, partitionKey, msg);
         producer.send(data);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/f1e91780/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 3c71417..b0f516f 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -16,12 +16,10 @@
  */
 package org.apache.camel.component.kafka;
 
-import java.net.URISyntaxException;
 import java.util.Properties;
 
 import kafka.javaapi.producer.Producer;
 import kafka.producer.KeyedMessage;
-
 import org.apache.camel.CamelException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -31,7 +29,6 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Matchers;
 import org.mockito.Mockito;
 
-
 import static org.junit.Assert.assertEquals;
 
 public class KafkaProducerTest {
@@ -43,8 +40,7 @@ public class KafkaProducerTest {
     private Message in = new DefaultMessage();
 
     @SuppressWarnings({"unchecked"})
-    public KafkaProducerTest() throws IllegalAccessException, 
InstantiationException, ClassNotFoundException,
-            URISyntaxException {
+    public KafkaProducerTest() throws Exception {
         endpoint = new 
KafkaEndpoint("kafka:broker1:1234,broker2:4567?topic=sometopic",
                 "broker1:1234," + "broker2:4567?topic=sometopic", null);
         producer = new KafkaProducer(endpoint);
@@ -61,8 +57,7 @@ public class KafkaProducerTest {
 
     @Test
     @SuppressWarnings({"unchecked"})
-    public void processSendsMesssage() throws Exception {
-
+    public void processSendsMessage() throws Exception {
         endpoint.setTopic("sometopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
         in.setHeader(KafkaConstants.PARTITION_KEY, "4");
@@ -73,8 +68,7 @@ public class KafkaProducerTest {
     }
 
     @Test
-    public void processSendsMesssageWithTopicHeaderAndNoTopicInEndPoint() 
throws Exception {
-
+    public void processSendsMessageWithTopicHeaderAndNoTopicInEndPoint() 
throws Exception {
         endpoint.setTopic(null);
         Mockito.when(exchange.getIn()).thenReturn(in);
         in.setHeader(KafkaConstants.PARTITION_KEY, "4");
@@ -82,21 +76,20 @@ public class KafkaProducerTest {
 
         producer.process(exchange);
 
-        verifySendMessage("4", "anotherTopic");
+        verifySendMessage("4", "anotherTopic", null);
     }
 
     @Test
-    public void processSendsMesssageWithTopicHeaderAndEndPoint() throws 
Exception {
-
+    public void processSendsMessageWithTopicHeaderAndEndPoint() throws 
Exception {
         endpoint.setTopic("sometopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
         in.setHeader(KafkaConstants.PARTITION_KEY, "4");
         in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
+        in.setHeader(KafkaConstants.KEY, "someKey");
 
         producer.process(exchange);
 
-        verifySendMessage("4", "anotherTopic");
-      
+        verifySendMessage("4", "anotherTopic", "someKey");
     }
 
     @Test(expected = CamelException.class)
@@ -107,32 +100,31 @@ public class KafkaProducerTest {
         producer.process(exchange);
     }
 
-    @Test(expected = CamelException.class)
-    public void processRequiresPartitionHeader() throws Exception {
+    @Test
+    public void processDoesNotRequirePartitionHeader() throws Exception {
         endpoint.setTopic("sometopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
         producer.process(exchange);
     }
-    
+
     @Test
-    
     public void processSendsMesssageWithPartitionKeyHeader() throws Exception {
-
         endpoint.setTopic("someTopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
         in.setHeader(KafkaConstants.PARTITION_KEY, "4");
 
         producer.process(exchange);
 
-        verifySendMessage("4", "someTopic");
-        
+        verifySendMessage("4", "someTopic", null);
     }
-    
+
     @SuppressWarnings({"unchecked", "rawtypes"})
-    protected void verifySendMessage(String key, String topic) {
+    protected void verifySendMessage(String partitionKey, String topic, String 
messageKey) {
         ArgumentCaptor<KeyedMessage> captor = 
ArgumentCaptor.forClass(KeyedMessage.class);
         Mockito.verify(producer.producer).send(captor.capture());
-        assertEquals(key, captor.getValue().key());
+        assertEquals(partitionKey, captor.getValue().partitionKey());
+        assertEquals(messageKey, captor.getValue().key());
         assertEquals(topic, captor.getValue().topic());
     }
+
 }

Reply via email to