CAMEL-8409: Kafka producer: when no message key specified, use partition key. 
Thanks to Mark Mindenhall for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/108dbfca
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/108dbfca
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/108dbfca

Branch: refs/heads/camel-2.14.x
Commit: 108dbfca9ed86dff8207dbee71b6058c0a416fb0
Parents: a219cc3
Author: Claus Ibsen <davscl...@apache.org>
Authored: Thu Feb 26 10:12:39 2015 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Thu Feb 26 10:13:38 2015 +0100

----------------------------------------------------------------------
 .../apache/camel/component/kafka/KafkaProducer.java  | 14 +++++++++++++-
 .../camel/component/kafka/KafkaProducerTest.java     | 15 +++++++++++++--
 2 files changed, 26 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/108dbfca/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 2918ffc..c598cf9 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -66,9 +66,21 @@ public class KafkaProducer extends DefaultProducer {
             throw new CamelExchangeException("No topic key set", exchange);
         }
         String partitionKey = 
exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, String.class);
+        boolean hasPartitionKey = partitionKey != null;
         String messageKey = exchange.getIn().getHeader(KafkaConstants.KEY, 
String.class);
+        boolean hasMessageKey = messageKey != null;
         String msg = exchange.getIn().getBody(String.class);
-        KeyedMessage<String, String> data = new KeyedMessage<String, 
String>(topic, messageKey, partitionKey, msg);
+        KeyedMessage<String, String> data;
+        if (hasPartitionKey && hasMessageKey) {
+            data = new KeyedMessage<String, String>(topic, messageKey, 
partitionKey, msg);
+        } else if (hasPartitionKey) {
+            data = new KeyedMessage<String, String>(topic, partitionKey, msg);
+        } else if (hasMessageKey) {
+            data = new KeyedMessage<String, String>(topic, messageKey, msg);
+        } else {
+            log.warn("No message key or partition key set");
+            data = new KeyedMessage<String, String>(topic, messageKey, 
partitionKey, msg);
+        }
         producer.send(data);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/108dbfca/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index b0f516f..d989c96 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -76,7 +76,7 @@ public class KafkaProducerTest {
 
         producer.process(exchange);
 
-        verifySendMessage("4", "anotherTopic", null);
+        verifySendMessage("4", "anotherTopic", "4");
     }
 
     @Test
@@ -115,7 +115,18 @@ public class KafkaProducerTest {
 
         producer.process(exchange);
 
-        verifySendMessage("4", "someTopic", null);
+        verifySendMessage("4", "someTopic", "4");
+    }
+
+    @Test
+    public void processSendsMesssageWithMessageKeyHeader() throws Exception {
+        endpoint.setTopic("someTopic");
+        Mockito.when(exchange.getIn()).thenReturn(in);
+        in.setHeader(KafkaConstants.KEY, "someKey");
+
+        producer.process(exchange);
+
+        verifySendMessage("someKey", "someTopic", "someKey");
     }
 
     @SuppressWarnings({"unchecked", "rawtypes"})

Reply via email to