#121: KafkaProducer: lookup the topic in the message header. Thanks to Fabien Chaillou for the patch. Fixed CS.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/02f2945c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/02f2945c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/02f2945c Branch: refs/heads/camel-2.13.x Commit: 02f2945cd0d290a9e9427ee7cb8d9c2be560b0f2 Parents: 6af7f21 Author: Claus Ibsen <davscl...@apache.org> Authored: Thu Mar 27 08:04:08 2014 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Mar 27 08:05:49 2014 +0100 ---------------------------------------------------------------------- .../camel/component/kafka/KafkaProducer.java | 8 ++- .../camel/component/kafka/KafkaConsumerIT.java | 2 +- .../camel/component/kafka/KafkaProducerIT.java | 75 +++++++++++++------- .../component/kafka/KafkaProducerTest.java | 40 +++++++++++ 4 files changed, 98 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/02f2945c/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index 66440f3..6c2d167 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -69,9 +69,15 @@ public class KafkaProducer extends DefaultProducer { if (partitionKey == null) { throw new CamelExchangeException("No partition key set", exchange); } + + String topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, endpoint.getTopic(), String.class); + if (topic == null) { + throw new CamelExchangeException("No topic key set", exchange); + } + String msg = exchange.getIn().getBody(String.class); - KeyedMessage<String, String> data = new KeyedMessage<String, String>(endpoint.getTopic(), partitionKey.toString(), msg); + KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, partitionKey.toString(), msg); producer.send(data); } http://git-wip-us.apache.org/repos/asf/camel/blob/02f2945c/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java index a8ca6c3..5a4baf7 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java @@ -79,7 +79,7 @@ public class KafkaConsumerIT extends CamelTestSupport { @Test public void kaftMessageIsConsumedByCamel() throws InterruptedException, IOException { to.expectedMessageCount(5); - to.expectedBodiesReceived("message-0","message-1","message-2","message-3","message-4" ); + to.expectedBodiesReceived("message-0", "message-1", "message-2", "message-3", "message-4"); for (int k = 0; k < 5; k++) { String msg = "message-" + k; KeyedMessage<String, String> data = new KeyedMessage<String, String>(TOPIC, "1", msg); http://git-wip-us.apache.org/repos/asf/camel/blob/02f2945c/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java index 5805666..85fa272 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java @@ -17,13 +17,14 @@ package org.apache.camel.component.kafka; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; @@ -47,6 +48,7 @@ import org.junit.Test; public class KafkaProducerIT extends CamelTestSupport { public static final String TOPIC = "test"; + public static final String TOPIC_IN_HEADER = "testHeader"; @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC + "&partitioner=org.apache.camel.component.kafka.SimplePartitioner") private Endpoint to; @@ -86,40 +88,63 @@ public class KafkaProducerIT extends CamelTestSupport { @Test public void producedMessageIsReceivedByKafka() throws InterruptedException, IOException { + int messageInTopic = 10; + int messageInOtherTopic = 5; - final List<String> messages = new ArrayList<String>(); + CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(TOPIC, 5); + topicCountMap.put(TOPIC_IN_HEADER, 5); + createKafkaMessageConsumer(messagesLatch, topicCountMap); + + sendMessagesInRoute(messageInTopic, "IT test message", KafkaConstants.PARTITION_KEY, "1"); + sendMessagesInRoute(messageInOtherTopic, "IT test message in other topic", KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_IN_HEADER); + + boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS); + + assertTrue("Not all messages were published to the kafka topics", allMessagesReceived); + } + + private void createKafkaMessageConsumer(CountDownLatch messagesLatch, Map<String, Integer> topicCountMap) { Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = kafkaConsumer.createMessageStreams(topicCountMap); - List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC); - - ExecutorService executor = Executors.newFixedThreadPool(5); - for (final KafkaStream stream : streams) { - executor.submit(new Runnable() { - @Override - public void run() { - ConsumerIterator<byte[], byte[]> it = stream.iterator(); - while (it.hasNext()) { - String msg = new String(it.next().message()); - messages.add(msg); - } - } - }); + + ExecutorService executor = Executors.newFixedThreadPool(10); + for (final KafkaStream stream : consumerMap.get(TOPIC)) { + executor.submit(new KakfaTopicConsumer(stream, messagesLatch)); + } + for (final KafkaStream stream : consumerMap.get(TOPIC_IN_HEADER)) { + executor.submit(new KakfaTopicConsumer(stream, messagesLatch)); + } + } + + private void sendMessagesInRoute(int messageInOtherTopic, String bodyOther, String... headersWithValue) { + Map<String, Object> headerMap = new HashMap<String, Object>(); + for (int i = 0; i < headersWithValue.length; i = i + 2) { + headerMap.put(headersWithValue[i], headersWithValue[i + 1]); } - for (int k = 0; k < 10; k++) { - template.sendBodyAndHeader("IT test message", KafkaConstants.PARTITION_KEY, "1"); + for (int k = 0; k < messageInOtherTopic; k++) { + template.sendBodyAndHeaders(bodyOther, headerMap); } + } - for (int k = 0; k < 20; k++) { - if (messages.size() == 10) { - return; - } - Thread.sleep(200); + private static class KakfaTopicConsumer implements Runnable { + private final KafkaStream stream; + private final CountDownLatch latch; + + public KakfaTopicConsumer(KafkaStream stream, CountDownLatch latch) { + this.stream = stream; + this.latch = latch; } - fail(); + @Override + public void run() { + ConsumerIterator<byte[], byte[]> it = stream.iterator(); + while (it.hasNext()) { + String msg = new String(it.next().message()); + latch.countDown(); + } + } } } - http://git-wip-us.apache.org/repos/asf/camel/blob/02f2945c/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java index ccaaab5..acdfc60 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java @@ -70,6 +70,46 @@ public class KafkaProducerTest { Mockito.verify(producer.producer).send(Matchers.any(KeyedMessage.class)); } + @Test + public void processSendsMesssageWithTopicHeaderAndNoTopicInEndPoint() throws Exception { + + endpoint.setTopic(null); + Mockito.when(exchange.getIn()).thenReturn(in); + in.setHeader(KafkaConstants.PARTITION_KEY, "4"); + in.setHeader(KafkaConstants.TOPIC, "anotherTopic"); + + producer.process(exchange); + + ArgumentCaptor<KeyedMessage> captor = ArgumentCaptor.forClass(KeyedMessage.class); + Mockito.verify(producer.producer).send(captor.capture()); + assertEquals("4", captor.getValue().key()); + assertEquals("anotherTopic", captor.getValue().topic()); + } + + @Test + public void processSendsMesssageWithTopicHeaderAndEndPoint() throws Exception { + + endpoint.setTopic("sometopic"); + Mockito.when(exchange.getIn()).thenReturn(in); + in.setHeader(KafkaConstants.PARTITION_KEY, "4"); + in.setHeader(KafkaConstants.TOPIC, "anotherTopic"); + + producer.process(exchange); + + ArgumentCaptor<KeyedMessage> captor = ArgumentCaptor.forClass(KeyedMessage.class); + Mockito.verify(producer.producer).send(captor.capture()); + assertEquals("4", captor.getValue().key()); + assertEquals("anotherTopic", captor.getValue().topic()); + } + + @Test(expected = CamelException.class) + public void processRequiresTopicInEndpointOrInHeader() throws Exception { + endpoint.setTopic(null); + Mockito.when(exchange.getIn()).thenReturn(in); + in.setHeader(KafkaConstants.PARTITION_KEY, "4"); + producer.process(exchange); + } + @Test(expected = CamelException.class) public void processRequiresPartitionHeader() throws Exception { endpoint.setTopic("sometopic");