Repository: camel Updated Branches: refs/heads/master 038e1617f -> 621a704c4
http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java index 7934f41..0bb4740 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java @@ -17,24 +17,22 @@ package org.apache.camel.component.kafka; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; import org.apache.camel.Endpoint; import org.apache.camel.EndpointInject; import org.apache.camel.Produce; import org.apache.camel.ProducerTemplate; +import org.apache.camel.RoutesBuilder; import org.apache.camel.builder.RouteBuilder; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -42,7 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { - + private static final String TOPIC_STRINGS = "test"; private static final String TOPIC_STRINGS_IN_HEADER = "testHeader"; private static final String TOPIC_BYTES = "testBytes"; @@ -52,15 +50,16 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerFullTest.class); - private static ConsumerConnector stringsConsumerConn; - private static ConsumerConnector bytesConsumerConn; + private static KafkaConsumer<String, String> stringsConsumerConn; + private static KafkaConsumer<byte[], byte[]> bytesConsumerConn; @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC_STRINGS - + "&partitioner=org.apache.camel.component.kafka.SimplePartitioner&serializerClass=kafka.serializer.StringEncoder" - + "&requestRequiredAcks=-1") + + "&requestRequiredAcks=-1") private Endpoint toStrings; - @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC_BYTES + "&requestRequiredAcks=-1") + @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC_BYTES + "&requestRequiredAcks=-1" + + "&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer&" + + "keySerializerClass=org.apache.kafka.common.serialization.ByteArraySerializer") private Endpoint toBytes; @Produce(uri = "direct:startStrings") @@ -69,46 +68,42 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { @Produce(uri = "direct:startBytes") private ProducerTemplate bytesTemplate; - @BeforeClass public static void before() { Properties stringsProps = new Properties(); - - stringsProps.put("zookeeper.connect", "localhost:" + getZookeeperPort()); - stringsProps.put("group.id", GROUP_STRINGS); - stringsProps.put("zookeeper.session.timeout.ms", "6000"); - stringsProps.put("zookeeper.connectiontimeout.ms", "12000"); - stringsProps.put("zookeeper.sync.time.ms", "200"); - stringsProps.put("auto.commit.interval.ms", "1000"); - stringsProps.put("auto.offset.reset", "smallest"); - stringsConsumerConn = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(stringsProps)); + + stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKarfkaPort()); + stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer"); + stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); + stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); + stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + stringsConsumerConn = new KafkaConsumer<String, String>(stringsProps); Properties bytesProps = new Properties(); bytesProps.putAll(stringsProps); bytesProps.put("group.id", GROUP_BYTES); - bytesConsumerConn = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(bytesProps)); + bytesProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + bytesProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + bytesConsumerConn = new KafkaConsumer<byte[], byte[]>(bytesProps); } @AfterClass public static void after() { - stringsConsumerConn.shutdown(); - bytesConsumerConn.shutdown(); + stringsConsumerConn.close(); + bytesConsumerConn.close(); } @Override - protected RouteBuilder[] createRouteBuilders() throws Exception { - return new RouteBuilder[] { - new RouteBuilder() { - @Override - public void configure() throws Exception { - from("direct:startStrings").to(toStrings); - } - }, - new RouteBuilder() { - @Override - public void configure() throws Exception { - from("direct:startBytes").to(toBytes); - } + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:startStrings").to(toStrings); + + from("direct:startBytes").to(toBytes); } }; } @@ -120,14 +115,11 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic); - Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); - topicCountMap.put(TOPIC_STRINGS, 5); - topicCountMap.put(TOPIC_STRINGS_IN_HEADER, 5); - createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, messagesLatch, topicCountMap); - sendMessagesInRoute(messageInTopic, stringsTemplate, "IT test message", KafkaConstants.PARTITION_KEY, "1"); sendMessagesInRoute(messageInOtherTopic, stringsTemplate, "IT test message in other topic", KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_STRINGS_IN_HEADER); + createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, messagesLatch); + boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS); assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived); @@ -140,11 +132,6 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic); - Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); - topicCountMap.put(TOPIC_BYTES, 5); - topicCountMap.put(TOPIC_BYTES_IN_HEADER, 5); - createKafkaMessageConsumer(bytesConsumerConn, TOPIC_BYTES, TOPIC_BYTES_IN_HEADER, messagesLatch, topicCountMap); - Map<String, Object> inTopicHeaders = new HashMap<String, Object>(); inTopicHeaders.put(KafkaConstants.PARTITION_KEY, "1".getBytes()); sendMessagesInRoute(messageInTopic, bytesTemplate, "IT test message".getBytes(), inTopicHeaders); @@ -154,22 +141,47 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { otherTopicHeaders.put(KafkaConstants.TOPIC, TOPIC_BYTES_IN_HEADER); sendMessagesInRoute(messageInOtherTopic, bytesTemplate, "IT test message in other topic".getBytes(), otherTopicHeaders); + createKafkaBytesMessageConsumer(bytesConsumerConn, TOPIC_BYTES, TOPIC_BYTES_IN_HEADER, messagesLatch); + boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS); assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived); } - private void createKafkaMessageConsumer(ConsumerConnector consumerConn, String topic, String topicInHeader, - CountDownLatch messagesLatch, Map<String, Integer> topicCountMap) { - Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConn.createMessageStreams(topicCountMap); + private void createKafkaMessageConsumer(KafkaConsumer<String, String> consumerConn, + String topic, String topicInHeader, CountDownLatch messagesLatch) { + + consumerConn.subscribe(Arrays.asList(topic, topicInHeader)); + boolean run = true; - ExecutorService executor = Executors.newFixedThreadPool(10); - for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(topic)) { - executor.submit(new KakfaTopicConsumer(stream, messagesLatch)); + while (run) { + ConsumerRecords<String, String> records = consumerConn.poll(100); + for (ConsumerRecord<String, String> record : records) { + messagesLatch.countDown(); + if (messagesLatch.getCount() == 0) { + run = false; + } + } } - for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(topicInHeader)) { - executor.submit(new KakfaTopicConsumer(stream, messagesLatch)); + + } + + private void createKafkaBytesMessageConsumer(KafkaConsumer<byte[], byte[]> consumerConn, String topic, + String topicInHeader, CountDownLatch messagesLatch) { + + consumerConn.subscribe(Arrays.asList(topic, topicInHeader)); + boolean run = true; + + while (run) { + ConsumerRecords<byte[], byte[]> records = consumerConn.poll(100); + for (ConsumerRecord<byte[], byte[]> record : records) { + messagesLatch.countDown(); + if (messagesLatch.getCount() == 0) { + run = false; + } + } } + } private void sendMessagesInRoute(int messages, ProducerTemplate template, Object bodyOther, String... headersWithValue) { @@ -186,23 +198,4 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { } } - private static class KakfaTopicConsumer implements Runnable { - private final KafkaStream<byte[], byte[]> stream; - private final CountDownLatch latch; - - public KakfaTopicConsumer(KafkaStream<byte[], byte[]> stream, CountDownLatch latch) { - this.stream = stream; - this.latch = latch; - } - - @Override - public void run() { - ConsumerIterator<byte[], byte[]> it = stream.iterator(); - while (it.hasNext()) { - String msg = new String(it.next().message()); - LOG.info("Get the message" + msg); - latch.countDown(); - } - } - } } http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java index f2bcd6b..98f6421 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java @@ -18,12 +18,12 @@ package org.apache.camel.component.kafka; import java.util.Properties; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; import org.apache.camel.CamelException; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.impl.DefaultMessage; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Matchers; @@ -41,18 +41,17 @@ public class KafkaProducerTest { @SuppressWarnings({"unchecked"}) public KafkaProducerTest() throws Exception { - endpoint = new KafkaEndpoint("kafka:broker1:1234,broker2:4567?topic=sometopic", null); + endpoint = new KafkaEndpoint( + "kafka:broker1:1234,broker2:4567?topic=sometopic", null); endpoint.setBrokers("broker1:1234,broker2:4567"); producer = new KafkaProducer(endpoint); - producer.producer = Mockito.mock(Producer.class); + producer.setKafkaProducer(Mockito.mock(org.apache.kafka.clients.producer.KafkaProducer.class)); } @Test public void testPropertyBuilder() throws Exception { - endpoint.setPartitioner("com.sksamuel.someclass"); Properties props = producer.getProps(); - assertEquals("com.sksamuel.someclass", props.getProperty("partitioner.class")); - assertEquals("broker1:1234,broker2:4567", props.getProperty("metadata.broker.list")); + assertEquals("broker1:1234,broker2:4567", props.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } @Test @@ -63,20 +62,18 @@ public class KafkaProducerTest { in.setHeader(KafkaConstants.PARTITION_KEY, "4"); producer.process(exchange); - - Mockito.verify(producer.producer).send(Matchers.any(KeyedMessage.class)); + Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class)); } @Test public void processSendsMessageWithTopicHeaderAndNoTopicInEndPoint() throws Exception { endpoint.setTopic(null); Mockito.when(exchange.getIn()).thenReturn(in); - in.setHeader(KafkaConstants.PARTITION_KEY, "4"); in.setHeader(KafkaConstants.TOPIC, "anotherTopic"); producer.process(exchange); - verifySendMessage("4", "anotherTopic", "4"); + verifySendMessage("anotherTopic"); } @Test @@ -112,10 +109,9 @@ public class KafkaProducerTest { endpoint.setTopic("someTopic"); Mockito.when(exchange.getIn()).thenReturn(in); in.setHeader(KafkaConstants.PARTITION_KEY, "4"); - + in.setHeader(KafkaConstants.KEY, "someKey"); producer.process(exchange); - - verifySendMessage("4", "someTopic", "4"); + verifySendMessage("4", "someTopic", "someKey"); } @Test @@ -126,9 +122,9 @@ public class KafkaProducerTest { producer.process(exchange); - verifySendMessage("someKey", "someTopic", "someKey"); + verifySendMessage("someTopic", "someKey"); } - + @Test public void processSendMessageWithBridgeEndpoint() throws Exception { endpoint.setTopic("someTopic"); @@ -136,19 +132,44 @@ public class KafkaProducerTest { Mockito.when(exchange.getIn()).thenReturn(in); in.setHeader(KafkaConstants.TOPIC, "anotherTopic"); in.setHeader(KafkaConstants.KEY, "someKey"); - + in.setHeader(KafkaConstants.PARTITION_KEY, "4"); + producer.process(exchange); + + verifySendMessage("4", "someTopic", "someKey"); + } + + @Test // Message and Topic Name alone + public void processSendsMesssageWithMessageTopicName() throws Exception { + endpoint.setTopic("someTopic"); + Mockito.when(exchange.getIn()).thenReturn(in); + producer.process(exchange); - - verifySendMessage("someKey", "someTopic", "someKey"); + + verifySendMessage("someTopic"); } @SuppressWarnings({"unchecked", "rawtypes"}) protected void verifySendMessage(String partitionKey, String topic, String messageKey) { - ArgumentCaptor<KeyedMessage> captor = ArgumentCaptor.forClass(KeyedMessage.class); - Mockito.verify(producer.producer).send(captor.capture()); - assertEquals(partitionKey, captor.getValue().partitionKey()); + ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class); + Mockito.verify(producer.getKafkaProducer()).send(captor.capture()); + assertEquals(new Integer(partitionKey), captor.getValue().partition()); + assertEquals(messageKey, captor.getValue().key()); + assertEquals(topic, captor.getValue().topic()); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + protected void verifySendMessage(String topic, String messageKey) { + ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class); + Mockito.verify(producer.getKafkaProducer()).send(captor.capture()); assertEquals(messageKey, captor.getValue().key()); assertEquals(topic, captor.getValue().topic()); } + @SuppressWarnings({"unchecked", "rawtypes"}) + protected void verifySendMessage(String topic) { + ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class); + Mockito.verify(producer.getKafkaProducer()).send(captor.capture()); + assertEquals(topic, captor.getValue().topic()); + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java deleted file mode 100644 index 039a2e7..0000000 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.kafka; - -import kafka.producer.Partitioner; -import kafka.utils.VerifiableProperties; - -public class SimplePartitioner implements Partitioner { - - public SimplePartitioner(VerifiableProperties props) { - } - - /** - * Uses the key to calculate a partition bucket id for routing - * the data to the appropriate broker partition - * - * @return an integer between 0 and numPartitions-1 - */ - @Override - public int partition(Object key, int numPartitions) { - return key.hashCode() % numPartitions; - } - -} - http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java index ce11a47..42403c2 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java @@ -27,13 +27,9 @@ import kafka.admin.AdminUtils; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; import kafka.utils.ZkUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.Option; public class EmbeddedKafkaCluster { - private static final Logger LOG = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); - private final List<Integer> ports; private final String zkConnection; private final Properties baseProperties; @@ -68,7 +64,7 @@ public class EmbeddedKafkaCluster { return null; } - public void createTopics(String...topics) { + public void createTopics(String... topics) { for (String topic : topics) { AdminUtils.createTopic(getZkUtils(), topic, 2, 1, new Properties()); } @@ -112,10 +108,10 @@ public class EmbeddedKafkaCluster { properties.setProperty("host.name", "localhost"); properties.setProperty("port", Integer.toString(port)); properties.setProperty("log.dir", logDir.getAbsolutePath()); - properties.setProperty("num.partitions", String.valueOf(1)); - properties.setProperty("auto.create.topics.enable", String.valueOf(Boolean.TRUE)); + properties.setProperty("num.partitions", String.valueOf(1)); + properties.setProperty("auto.create.topics.enable", String.valueOf(Boolean.TRUE)); + System.out.println("EmbeddedKafkaCluster: local directory: " + logDir.getAbsolutePath()); properties.setProperty("log.flush.interval.messages", String.valueOf(1)); - LOG.info("EmbeddedKafkaCluster: local directory: " + logDir.getAbsolutePath()); KafkaServer broker = startBroker(properties); http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/resources/log4j.properties b/components/camel-kafka/src/test/resources/log4j.properties index 67458fd..44266d1 100644 --- a/components/camel-kafka/src/test/resources/log4j.properties +++ b/components/camel-kafka/src/test/resources/log4j.properties @@ -32,4 +32,4 @@ log4j.appender.out=org.apache.log4j.FileAppender log4j.appender.out.layout=org.apache.log4j.PatternLayout log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n log4j.appender.out.file=target/camel-kafka-test.log -log4j.appender.out.append=true +log4j.appender.out.append=true \ No newline at end of file