[CAMEL-10065] Add a test case
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c4236142 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c4236142 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c4236142 Branch: refs/heads/camel-2.17.x Commit: c4236142c8c4e8778343a73db9331e34569a69a5 Parents: b7eed97 Author: Daniel Kulp <dk...@apache.org> Authored: Thu Jun 16 13:23:37 2016 -0400 Committer: Daniel Kulp <dk...@apache.org> Committed: Thu Jun 16 15:19:38 2016 -0400 ---------------------------------------------------------------------- .../component/kafka/KafkaProducerFullTest.java | 29 ++++++++++++++++++++ 1 file changed, 29 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c4236142/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java index 0bb4740..d5b65fa 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java @@ -17,8 +17,10 @@ package org.apache.camel.component.kafka; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; @@ -125,6 +127,33 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived); } + + @Test + public void producedStringCollectionMessageIsReceivedByKafka() throws InterruptedException, IOException { + int messageInTopic = 10; + int messageInOtherTopic = 5; + + CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic); + + List<String> msgs = new ArrayList<String>(); + for (int x = 0; x < messageInTopic; x++) { + msgs.add("Message " + x); + } + + sendMessagesInRoute(1, stringsTemplate, msgs, KafkaConstants.PARTITION_KEY, "1"); + msgs = new ArrayList<String>(); + for (int x = 0; x < messageInOtherTopic; x++) { + msgs.add("Other Message " + x); + } + sendMessagesInRoute(1, stringsTemplate, msgs, KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_STRINGS_IN_HEADER); + + createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, messagesLatch); + + boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS); + + assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived); + } + @Test public void producedBytesMessageIsReceivedByKafka() throws InterruptedException, IOException { int messageInTopic = 10;