CAMEL-8409: Kafka producer: when no message key specified, use partition key. Thanks to Mark Mindenhall for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/108dbfca Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/108dbfca Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/108dbfca Branch: refs/heads/camel-2.14.x Commit: 108dbfca9ed86dff8207dbee71b6058c0a416fb0 Parents: a219cc3 Author: Claus Ibsen <davscl...@apache.org> Authored: Thu Feb 26 10:12:39 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Feb 26 10:13:38 2015 +0100 ---------------------------------------------------------------------- .../apache/camel/component/kafka/KafkaProducer.java | 14 +++++++++++++- .../camel/component/kafka/KafkaProducerTest.java | 15 +++++++++++++-- 2 files changed, 26 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/108dbfca/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index 2918ffc..c598cf9 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -66,9 +66,21 @@ public class KafkaProducer extends DefaultProducer { throw new CamelExchangeException("No topic key set", exchange); } String partitionKey = exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, String.class); + boolean hasPartitionKey = partitionKey != null; String messageKey = exchange.getIn().getHeader(KafkaConstants.KEY, String.class); + boolean hasMessageKey = messageKey != null; String msg = exchange.getIn().getBody(String.class); - KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, messageKey, partitionKey, msg); + KeyedMessage<String, String> data; + if (hasPartitionKey && hasMessageKey) { + data = new KeyedMessage<String, String>(topic, messageKey, partitionKey, msg); + } else if (hasPartitionKey) { + data = new KeyedMessage<String, String>(topic, partitionKey, msg); + } else if (hasMessageKey) { + data = new KeyedMessage<String, String>(topic, messageKey, msg); + } else { + log.warn("No message key or partition key set"); + data = new KeyedMessage<String, String>(topic, messageKey, partitionKey, msg); + } producer.send(data); } http://git-wip-us.apache.org/repos/asf/camel/blob/108dbfca/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java index b0f516f..d989c96 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java @@ -76,7 +76,7 @@ public class KafkaProducerTest { producer.process(exchange); - verifySendMessage("4", "anotherTopic", null); + verifySendMessage("4", "anotherTopic", "4"); } @Test @@ -115,7 +115,18 @@ public class KafkaProducerTest { producer.process(exchange); - verifySendMessage("4", "someTopic", null); + verifySendMessage("4", "someTopic", "4"); + } + + @Test + public void processSendsMesssageWithMessageKeyHeader() throws Exception { + endpoint.setTopic("someTopic"); + Mockito.when(exchange.getIn()).thenReturn(in); + in.setHeader(KafkaConstants.KEY, "someKey"); + + producer.process(exchange); + + verifySendMessage("someKey", "someTopic", "someKey"); } @SuppressWarnings({"unchecked", "rawtypes"})