Repository: camel Updated Branches: refs/heads/master bbe7f4c4b -> 6cb7b85f8
[CAMEL-10065] Add a test case Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6cb7b85f Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6cb7b85f Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6cb7b85f Branch: refs/heads/master Commit: 6cb7b85f8d8151477f9fde06d67fec161b0dc084 Parents: bbe7f4c Author: Daniel Kulp <[email protected]> Authored: Thu Jun 16 13:23:37 2016 -0400 Committer: Daniel Kulp <[email protected]> Committed: Thu Jun 16 13:23:37 2016 -0400 ---------------------------------------------------------------------- .../component/kafka/KafkaProducerFullTest.java | 29 ++++++++++++++++++++ 1 file changed, 29 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/6cb7b85f/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java index 0bb4740..d5b65fa 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java @@ -17,8 +17,10 @@ package org.apache.camel.component.kafka; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; @@ -125,6 +127,33 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived); } + + @Test + public void producedStringCollectionMessageIsReceivedByKafka() throws InterruptedException, IOException { + int messageInTopic = 10; + int messageInOtherTopic = 5; + + CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic); + + List<String> msgs = new ArrayList<String>(); + for (int x = 0; x < messageInTopic; x++) { + msgs.add("Message " + x); + } + + sendMessagesInRoute(1, stringsTemplate, msgs, KafkaConstants.PARTITION_KEY, "1"); + msgs = new ArrayList<String>(); + for (int x = 0; x < messageInOtherTopic; x++) { + msgs.add("Other Message " + x); + } + sendMessagesInRoute(1, stringsTemplate, msgs, KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_STRINGS_IN_HEADER); + + createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, messagesLatch); + + boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS); + + assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived); + } + @Test public void producedBytesMessageIsReceivedByKafka() throws InterruptedException, IOException { int messageInTopic = 10;
