Repository: camel Updated Branches: refs/heads/camel-2.14.x d5fede982 -> f77a40672 refs/heads/master ed240a0f1 -> f1e91780b
CAMEL-8190: Kafka producer: partition key is optional, not required by kafka client api. Thanks to Ivan Vasyliev for the patch. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f1e91780 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f1e91780 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f1e91780 Branch: refs/heads/master Commit: f1e91780b8b22b625616325bd1ed3f46b19f2d36 Parents: ed240a0 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Feb 6 14:23:53 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Feb 6 14:23:53 2015 +0100 ---------------------------------------------------------------------- .../camel/component/kafka/KafkaProducer.java | 12 ++---- .../component/kafka/KafkaProducerTest.java | 40 ++++++++------------ 2 files changed, 19 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/f1e91780/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index 4d681f1..2918ffc 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -21,7 +21,6 @@ import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; - import org.apache.camel.CamelException; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; @@ -62,19 +61,14 @@ public class KafkaProducer extends DefaultProducer { @Override public void process(Exchange exchange) throws CamelException { - Object partitionKey = exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY); - if (partitionKey == null) { - throw new CamelExchangeException("No partition key set", exchange); - } - String topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, endpoint.getTopic(), String.class); if (topic == null) { throw new CamelExchangeException("No topic key set", exchange); } - + String partitionKey = exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, String.class); + String messageKey = exchange.getIn().getHeader(KafkaConstants.KEY, String.class); String msg = exchange.getIn().getBody(String.class); - - KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, partitionKey.toString(), msg); + KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, messageKey, partitionKey, msg); producer.send(data); } http://git-wip-us.apache.org/repos/asf/camel/blob/f1e91780/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java index 3c71417..b0f516f 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java @@ -16,12 +16,10 @@ */ package org.apache.camel.component.kafka; -import java.net.URISyntaxException; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; - import org.apache.camel.CamelException; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -31,7 +29,6 @@ import org.mockito.ArgumentCaptor; import org.mockito.Matchers; import org.mockito.Mockito; - import static org.junit.Assert.assertEquals; public class KafkaProducerTest { @@ -43,8 +40,7 @@ public class KafkaProducerTest { private Message in = new DefaultMessage(); @SuppressWarnings({"unchecked"}) - public KafkaProducerTest() throws IllegalAccessException, InstantiationException, ClassNotFoundException, - URISyntaxException { + public KafkaProducerTest() throws Exception { endpoint = new KafkaEndpoint("kafka:broker1:1234,broker2:4567?topic=sometopic", "broker1:1234," + "broker2:4567?topic=sometopic", null); producer = new KafkaProducer(endpoint); @@ -61,8 +57,7 @@ public class KafkaProducerTest { @Test @SuppressWarnings({"unchecked"}) - public void processSendsMesssage() throws Exception { - + public void processSendsMessage() throws Exception { endpoint.setTopic("sometopic"); Mockito.when(exchange.getIn()).thenReturn(in); in.setHeader(KafkaConstants.PARTITION_KEY, "4"); @@ -73,8 +68,7 @@ public class KafkaProducerTest { } @Test - public void processSendsMesssageWithTopicHeaderAndNoTopicInEndPoint() throws Exception { - + public void processSendsMessageWithTopicHeaderAndNoTopicInEndPoint() throws Exception { endpoint.setTopic(null); Mockito.when(exchange.getIn()).thenReturn(in); in.setHeader(KafkaConstants.PARTITION_KEY, "4"); @@ -82,21 +76,20 @@ public class KafkaProducerTest { producer.process(exchange); - verifySendMessage("4", "anotherTopic"); + verifySendMessage("4", "anotherTopic", null); } @Test - public void processSendsMesssageWithTopicHeaderAndEndPoint() throws Exception { - + public void processSendsMessageWithTopicHeaderAndEndPoint() throws Exception { endpoint.setTopic("sometopic"); Mockito.when(exchange.getIn()).thenReturn(in); in.setHeader(KafkaConstants.PARTITION_KEY, "4"); in.setHeader(KafkaConstants.TOPIC, "anotherTopic"); + in.setHeader(KafkaConstants.KEY, "someKey"); producer.process(exchange); - verifySendMessage("4", "anotherTopic"); - + verifySendMessage("4", "anotherTopic", "someKey"); } @Test(expected = CamelException.class) @@ -107,32 +100,31 @@ public class KafkaProducerTest { producer.process(exchange); } - @Test(expected = CamelException.class) - public void processRequiresPartitionHeader() throws Exception { + @Test + public void processDoesNotRequirePartitionHeader() throws Exception { endpoint.setTopic("sometopic"); Mockito.when(exchange.getIn()).thenReturn(in); producer.process(exchange); } - + @Test - public void processSendsMesssageWithPartitionKeyHeader() throws Exception { - endpoint.setTopic("someTopic"); Mockito.when(exchange.getIn()).thenReturn(in); in.setHeader(KafkaConstants.PARTITION_KEY, "4"); producer.process(exchange); - verifySendMessage("4", "someTopic"); - + verifySendMessage("4", "someTopic", null); } - + @SuppressWarnings({"unchecked", "rawtypes"}) - protected void verifySendMessage(String key, String topic) { + protected void verifySendMessage(String partitionKey, String topic, String messageKey) { ArgumentCaptor<KeyedMessage> captor = ArgumentCaptor.forClass(KeyedMessage.class); Mockito.verify(producer.producer).send(captor.capture()); - assertEquals(key, captor.getValue().key()); + assertEquals(partitionKey, captor.getValue().partitionKey()); + assertEquals(messageKey, captor.getValue().key()); assertEquals(topic, captor.getValue().topic()); } + }