Updated Branches: refs/heads/master a346c1c1c -> e5f44a11a
CAMWL-7092. Add camel-kafka component. Thanks Stephen Samuel for patch Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e5f44a11 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e5f44a11 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e5f44a11 Branch: refs/heads/master Commit: e5f44a11a1750bc2bd151bd56ffac6c714190641 Parents: a346c1c Author: Hadrian Zbarcea <[email protected]> Authored: Fri Feb 7 17:28:16 2014 -0500 Committer: Hadrian Zbarcea <[email protected]> Committed: Fri Feb 7 17:28:16 2014 -0500 ---------------------------------------------------------------------- components/camel-kafka/pom.xml | 75 ++++++++++ .../camel/component/kafka/KafkaComponent.java | 44 ++++++ .../camel/component/kafka/KafkaConstants.java | 30 ++++ .../camel/component/kafka/KafkaConsumer.java | 124 ++++++++++++++++ .../camel/component/kafka/KafkaEndpoint.java | 142 +++++++++++++++++++ .../camel/component/kafka/KafkaProducer.java | 77 ++++++++++ .../services/org/apache/camel/component/kafka | 1 + .../component/kafka/KafkaComponentTest.java | 40 ++++++ .../camel/component/kafka/KafkaConsumerIT.java | 78 ++++++++++ .../component/kafka/KafkaConsumerTest.java | 45 ++++++ .../component/kafka/KafkaEndpointTest.java | 45 ++++++ .../camel/component/kafka/KafkaProducerIT.java | 112 +++++++++++++++ .../component/kafka/KafkaProducerTest.java | 80 +++++++++++ .../component/kafka/SimplePartitioner.java | 24 ++++ .../camel-kafka/src/test/resources/log4j.xml | 19 +++ components/pom.xml | 1 + 16 files changed, 937 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-kafka/pom.xml b/components/camel-kafka/pom.xml new file mode 100644 index 0000000..f469b30 --- /dev/null +++ b/components/camel-kafka/pom.xml @@ -0,0 +1,75 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.camel</groupId> + <artifactId>components</artifactId> + <version>2.13-SNAPSHOT</version> + </parent> + + <artifactId>camel-kafka</artifactId> + <packaging>bundle</packaging> + <name>Camel :: Kafka</name> + <description>Camel kafka support</description> + + <properties> + <camel.osgi.export.pkg>org.apache.camel.component.kafka.*</camel.osgi.export.pkg> + <kafka.version>0.8.0</kafka.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + <version>${kafka.version}</version> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala-version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java new file mode 100644 index 0000000..d3d4679 --- /dev/null +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka; + +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.impl.DefaultComponent; + +/** + * @author Stephen Samuel + */ +public class KafkaComponent extends DefaultComponent { + + public KafkaComponent() { + } + + public KafkaComponent(CamelContext context) { + super(context); + } + + @Override + protected KafkaEndpoint createEndpoint(String uri, + String remaining, + Map<String, Object> params) throws Exception { + KafkaEndpoint endpoint = new KafkaEndpoint(uri, remaining, this); + setProperties(endpoint, params); + return endpoint; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java new file mode 100644 index 0000000..b7f6bdf --- /dev/null +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka; + +/** + * @author Stephen Samuel + */ +public class KafkaConstants { + + public static final String DEFAULT_GROUP = "group1"; + + public static final String PARTITION_KEY = "kafka.PARTITION_KEY"; + public static final String PARTITION = "kafka.EXCHANGE_NAME"; + public static final String KEY = "kafka.CONTENT_TYPE"; + public static final String TOPIC = "kafka.TOPIC"; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java new file mode 100644 index 0000000..b4c605a --- /dev/null +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutorService; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.impl.DefaultConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import kafka.consumer.ConsumerConfig; +import kafka.consumer.ConsumerIterator; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; +import kafka.message.MessageAndMetadata; + +/** + * @author Stephen Samuel + */ +public class KafkaConsumer extends DefaultConsumer { + + private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); + + private final KafkaEndpoint endpoint; + private final Processor processor; + + ConsumerConnector consumer; + ExecutorService executor; + + public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) { + super(endpoint, processor); + this.endpoint = endpoint; + this.processor = processor; + if (endpoint.getZookeeperHost() == null) + throw new IllegalArgumentException("zookeeper host must be specified"); + if (endpoint.getZookeeperPort() == 0) + throw new IllegalArgumentException("zookeeper port must be specified"); + if (endpoint.getGroupId() == null) + throw new IllegalArgumentException("groupId must not be null"); + } + + Properties getProps() { + Properties props = new Properties(); + props.put("zookeeper.connect", endpoint.getZookeeperHost() + ":" + endpoint.getZookeeperPort()); + props.put("group.id", endpoint.getGroupId()); + props.put("zookeeper.session.timeout.ms", "400"); + props.put("zookeeper.sync.time.ms", "200"); + props.put("auto.commit.interval.ms", "1000"); + return props; + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + log.info("Starting Kafka consumer"); + + consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(getProps())); + + Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); + topicCountMap.put(endpoint.getTopic(), endpoint.getConsumerStreams()); + Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); + List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(endpoint.getTopic()); + + executor = endpoint.createExecutor(); + for (final KafkaStream stream : streams) { + executor.submit(new ConsumerTask(stream)); + } + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + log.info("Stopping Kafka consumer"); + + if (consumer != null) + consumer.shutdown(); + if (executor != null) + executor.shutdown(); + executor = null; + } + + class ConsumerTask implements Runnable { + + private KafkaStream stream; + + public ConsumerTask(KafkaStream stream) { + this.stream = stream; + } + + public void run() { + ConsumerIterator<byte[], byte[]> it = stream.iterator(); + while ((Boolean) it.hasNext()) { + MessageAndMetadata<byte[], byte[]> mm = it.next(); + Exchange exchange = endpoint.createKafkaExchange(mm); + try { + processor.process(exchange); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java new file mode 100644 index 0000000..6ad9887 --- /dev/null +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka; + +import java.net.URISyntaxException; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; + +import org.apache.camel.Consumer; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.impl.DefaultExchange; +import org.apache.camel.impl.DefaultMessage; + +import kafka.message.MessageAndMetadata; + +/** + * @author Stephen Samuel + */ +public class KafkaEndpoint extends DefaultEndpoint { + + private String brokers; + private String zookeeperHost; + private int zookeeperPort; + private String groupId; + private int consumerStreams = 10; + private String partitioner; + private String topic; + + public String getZookeeperHost() { + return zookeeperHost; + } + + public void setZookeeperHost(String zookeeperHost) { + this.zookeeperHost = zookeeperHost; + } + + public int getZookeeperPort() { + return zookeeperPort; + } + + public void setZookeeperPort(int zookeeperPort) { + this.zookeeperPort = zookeeperPort; + } + + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + + public String getPartitioner() { + return partitioner; + } + + public void setPartitioner(String partitioner) { + this.partitioner = partitioner; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getBrokers() { + return brokers; + } + + public int getConsumerStreams() { + return consumerStreams; + } + + public void setConsumerStreams(int consumerStreams) { + this.consumerStreams = consumerStreams; + } + + public KafkaEndpoint() { + } + + public KafkaEndpoint(String endpointUri, + String remaining, + KafkaComponent component) throws URISyntaxException { + super(endpointUri, component); + this.brokers = remaining.split("\\?")[0]; + } + + public Exchange createKafkaExchange(MessageAndMetadata<byte[], byte[]> mm) { + Exchange exchange = new DefaultExchange(getCamelContext(), getExchangePattern()); + + Message message = new DefaultMessage(); + message.setHeader(KafkaConstants.PARTITION, mm.partition()); + message.setHeader(KafkaConstants.TOPIC, mm.topic()); + message.setHeader(KafkaConstants.KEY, new String(mm.key())); + exchange.setIn(message); + + return exchange; + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + KafkaConsumer consumer = new KafkaConsumer(this, processor); + configureConsumer(consumer); + return consumer; + } + + @Override + public Producer createProducer() throws Exception { + return new KafkaProducer(this); + } + + @Override + public boolean isSingleton() { + return true; + } + + public ThreadPoolExecutor createExecutor() { + return (ThreadPoolExecutor) Executors.newFixedThreadPool(getConsumerStreams()); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java new file mode 100644 index 0000000..d930313 --- /dev/null +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka; + +import java.util.Properties; + +import org.apache.camel.CamelException; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultProducer; + +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; + +/** + * @author Stephen Samuel + */ +public class KafkaProducer extends DefaultProducer { + + private final KafkaEndpoint endpoint; + Producer<String, String> producer; + + public KafkaProducer(KafkaEndpoint endpoint) throws ClassNotFoundException, IllegalAccessException, + InstantiationException { + super(endpoint); + this.endpoint = endpoint; + } + + @Override + protected void doStop() throws Exception { + if (producer != null) + producer.close(); + } + + Properties getProps() { + Properties props = new Properties(); + props.put("metadata.broker.list", endpoint.getBrokers()); + props.put("serializer.class", "kafka.serializer.StringEncoder"); + props.put("partitioner.class", endpoint.getPartitioner()); + props.put("request.required.acks", "1"); + return props; + } + + @Override + protected void doStart() throws Exception { + Properties props = getProps(); + ProducerConfig config = new ProducerConfig(props); + producer = new Producer<String, String>(config); + } + + @Override + public void process(Exchange exchange) throws CamelException { + + Object partitionKey = exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY); + if (partitionKey == null) + throw new CamelException("No partition key set"); + String msg = exchange.getIn().getBody(String.class); + + KeyedMessage<String, String> data = + new KeyedMessage<String, String>(endpoint.getTopic(), partitionKey.toString(), msg); + producer.send(data); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/main/resources/META-INF/services/org/apache/camel/component/kafka ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/resources/META-INF/services/org/apache/camel/component/kafka b/components/camel-kafka/src/main/resources/META-INF/services/org/apache/camel/component/kafka new file mode 100644 index 0000000..c0dc9d1 --- /dev/null +++ b/components/camel-kafka/src/main/resources/META-INF/services/org/apache/camel/component/kafka @@ -0,0 +1 @@ +class=org.apache.camel.component.kafka.KafkaComponent \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java new file mode 100644 index 0000000..b7fe4eb --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java @@ -0,0 +1,40 @@ +package org.apache.camel.component.kafka; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; + +/** + * @author Stephen Samuel + */ +public class KafkaComponentTest { + + private CamelContext context = Mockito.mock(CamelContext.class); + + @Test + public void testPropertiesSet() throws Exception { + Map<String, Object> params = new HashMap<String, Object>(); + params.put("zookeeperHost", "somehost"); + params.put("zookeeperPort", 2987); + params.put("portNumber", 14123); + params.put("consumerStreams", "3"); + params.put("topic", "mytopic"); + params.put("partitioner", "com.class.Party"); + + String uri = "kafka:broker1:12345,broker2:12566"; + String remaining = "broker1:12345,broker2:12566"; + + KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params); + assertEquals("somehost", endpoint.getZookeeperHost()); + assertEquals(2987, endpoint.getZookeeperPort()); + assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers()); + assertEquals("mytopic", endpoint.getTopic()); + assertEquals(3, endpoint.getConsumerStreams()); + assertEquals("com.class.Party", endpoint.getPartitioner()); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java new file mode 100644 index 0000000..99b5c91 --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java @@ -0,0 +1,78 @@ +package org.apache.camel.component.kafka; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; + +/** + * @author Stephen Samuel + * <p/> + * <p/> + * The Producer IT tests require a Kafka broker running on 9092 and a zookeeper instance running on 2181. + * The broker must have a topic called test created. + */ +public class KafkaConsumerIT extends CamelTestSupport { + + public static final String TOPIC = "test"; + + @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC + + "&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1") + private Endpoint from; + + @EndpointInject(uri = "mock:result") + private MockEndpoint to; + + private Producer<String, String> producer; + + @Before + public void before() { + Properties props = new Properties(); + props.put("metadata.broker.list", "localhost:9092"); + props.put("serializer.class", "kafka.serializer.StringEncoder"); + props.put("partitioner.class", "org.apache.camel.component.kafka.SimplePartitioner"); + props.put("request.required.acks", "1"); + + ProducerConfig config = new ProducerConfig(props); + producer = new Producer<String, String>(config); + } + + @After + public void after() { + producer.close(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + + @Override + public void configure() throws Exception { + from(from).to(to); + } + }; + } + + @Test + public void kaftMessageIsConsumedByCamel() throws InterruptedException, IOException { + to.expectedMessageCount(5); + for (int k = 0; k < 5; k++) { + String msg = "message-" + k; + KeyedMessage<String, String> data = new KeyedMessage<String, String>(TOPIC, "1", msg); + producer.send(data); + } + to.assertIsSatisfied(3000); + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java new file mode 100644 index 0000000..8aa756f --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java @@ -0,0 +1,45 @@ +package org.apache.camel.component.kafka; + +import java.util.concurrent.ThreadPoolExecutor; + +import org.apache.camel.Processor; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.mockito.Mockito.*; + +/** + * @author Stephen Samuel + */ +public class KafkaConsumerTest { + + private KafkaEndpoint endpoint = mock(KafkaEndpoint.class); + private Processor processor = mock(Processor.class); + + @Test(expected = IllegalArgumentException.class) + public void consumerRequiresZookeeperHost() throws Exception { + Mockito.when(endpoint.getZookeeperPort()).thenReturn(2181); + new KafkaConsumer(endpoint, processor); + } + + @Test(expected = IllegalArgumentException.class) + public void consumerRequiresZookeeperPort() throws Exception { + Mockito.when(endpoint.getZookeeperHost()).thenReturn("localhost"); + new KafkaConsumer(endpoint, processor); + } + + @Test + public void testStoppingConsumerShutsdownExecutor() throws Exception { + + when(endpoint.getZookeeperHost()).thenReturn("localhost"); + when(endpoint.getZookeeperPort()).thenReturn(2181); + when(endpoint.getGroupId()).thenReturn("12345"); + + KafkaConsumer consumer = new KafkaConsumer(endpoint, processor); + + ThreadPoolExecutor e = mock(ThreadPoolExecutor.class); + consumer.executor = e; + consumer.doStop(); + verify(e).shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java new file mode 100644 index 0000000..767d4d5 --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java @@ -0,0 +1,45 @@ +package org.apache.camel.component.kafka; + +import java.net.URISyntaxException; +import java.util.concurrent.ThreadPoolExecutor; + +import org.apache.camel.Exchange; +import org.junit.Test; + +import kafka.message.MessageAndMetadata; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * @author Stephen Samuel + */ +public class KafkaEndpointTest { + + @Test + public void testCreatingKafkaExchangeSetsHeaders() throws URISyntaxException { + KafkaEndpoint endpoint = new KafkaEndpoint("kafka:localhost", "localhost", new KafkaComponent()); + + MessageAndMetadata<byte[], byte[]> mm = + new MessageAndMetadata<byte[], byte[]>("somekey".getBytes(), "mymessage".getBytes(), "topic", 4, 56); + + Exchange exchange = endpoint.createKafkaExchange(mm); + assertEquals("somekey", exchange.getIn().getHeader(KafkaConstants.KEY)); + assertEquals("topic", exchange.getIn().getHeader(KafkaConstants.TOPIC)); + assertEquals(4, exchange.getIn().getHeader(KafkaConstants.PARTITION)); + } + + @Test + public void creatingExecutorUsesThreadPoolSettings() throws Exception { + KafkaEndpoint endpoint = new KafkaEndpoint("kafka:localhost", "kafka:localhost", new KafkaComponent()); + endpoint.setConsumerStreams(44); + ThreadPoolExecutor executor = endpoint.createExecutor(); + assertEquals(44, executor.getCorePoolSize()); + } + + @Test + public void assertSingleton() throws URISyntaxException { + KafkaEndpoint endpoint = new KafkaEndpoint("kafka:localhost", "localhost", new KafkaComponent()); + assertTrue(endpoint.isSingleton()); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java new file mode 100644 index 0000000..2e98d9c --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java @@ -0,0 +1,112 @@ +package org.apache.camel.component.kafka; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import kafka.consumer.ConsumerConfig; +import kafka.consumer.ConsumerIterator; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; + +/** + * @author Stephen Samuel + * <p/> + * <p/> + * The Producer IT tests require a Kafka broker running on 9092 and a zookeeper instance running on 2181. + * The broker must have a topic called test created. + */ +public class KafkaProducerIT extends CamelTestSupport { + + public static final String TOPIC = "test"; + + @EndpointInject(uri = + "kafka:localhost:9092?topic=" + TOPIC + "&partitioner=org.apache.camel.component.kafka.SimplePartitioner") + private Endpoint to; + + @Produce(uri = "direct:start") + protected ProducerTemplate template; + + private ConsumerConnector kafkaConsumer; + + @Before + public void before() { + Properties props = new Properties(); + props.put("zookeeper.connect", "localhost:2181"); + props.put("group.id", KafkaConstants.DEFAULT_GROUP); + props.put("zookeeper.session.timeout.ms", "400"); + props.put("zookeeper.sync.time.ms", "200"); + props.put("auto.commit.interval.ms", "1000"); + + kafkaConsumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); + } + + @After + public void after() { + kafkaConsumer.shutdown(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + + @Override + public void configure() throws Exception { + from("direct:start").to(to); + } + }; + } + + @Test + public void producedMessageIsReceivedByKafka() throws InterruptedException, IOException { + + final List<String> messages = new ArrayList<String>(); + + Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); + topicCountMap.put(TOPIC, 5); + Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = kafkaConsumer.createMessageStreams(topicCountMap); + List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC); + + ExecutorService executor = Executors.newFixedThreadPool(5); + for (final KafkaStream stream : streams) { + executor.submit(new Runnable() { + @Override + public void run() { + ConsumerIterator<byte[], byte[]> it = stream.iterator(); + while (it.hasNext()) { + String msg = new String(it.next().message()); + messages.add(msg); + } + } + }); + } + + for (int k = 0; k < 10; k++) { + template.sendBodyAndHeader("IT test message", KafkaConstants.PARTITION_KEY, "1"); + } + + for (int k = 0; k < 20; k++) { + if (messages.size() == 10) + return; + Thread.sleep(200); + } + + fail(); + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java new file mode 100644 index 0000000..6fe7010 --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java @@ -0,0 +1,80 @@ +package org.apache.camel.component.kafka; + +import java.net.URISyntaxException; +import java.util.Properties; + +import org.apache.camel.CamelException; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.impl.DefaultMessage; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Matchers; +import org.mockito.Mockito; + +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; + +import static org.junit.Assert.assertEquals; + +/** + * @author Stephen Samuel + */ +public class KafkaProducerTest { + + private KafkaProducer producer; + private KafkaEndpoint endpoint; + + private Exchange exchange = Mockito.mock(Exchange.class); + private Message in = new DefaultMessage(); + + public KafkaProducerTest() throws IllegalAccessException, InstantiationException, ClassNotFoundException, + URISyntaxException { + endpoint = new KafkaEndpoint("kafka:broker1:1234,broker2:4567?topic=sometopic", + "broker1:1234," + "broker2:4567?topic=sometopic", null); + producer = new KafkaProducer(endpoint); + producer.producer = Mockito.mock(Producer.class); + } + + @Test + public void testPropertyBuilder() throws Exception { + endpoint.setPartitioner("com.sksamuel.someclass"); + Properties props = producer.getProps(); + assertEquals("com.sksamuel.someclass", props.getProperty("partitioner.class")); + assertEquals("broker1:1234,broker2:4567", props.getProperty("metadata.broker.list")); + } + + @Test + public void processSendsMesssage() throws Exception { + + endpoint.setTopic("sometopic"); + Mockito.when(exchange.getIn()).thenReturn(in); + in.setHeader(KafkaConstants.PARTITION_KEY, "4"); + + producer.process(exchange); + + Mockito.verify(producer.producer).send(Matchers.any(KeyedMessage.class)); + } + + @Test(expected = CamelException.class) + public void processRequiresPartitionHeader() throws Exception { + endpoint.setTopic("sometopic"); + Mockito.when(exchange.getIn()).thenReturn(in); + producer.process(exchange); + } + + @Test + public void processSendsMesssageWithPartitionKeyHeader() throws Exception { + + endpoint.setTopic("sometopic"); + Mockito.when(exchange.getIn()).thenReturn(in); + in.setHeader(KafkaConstants.PARTITION_KEY, "4"); + + producer.process(exchange); + + ArgumentCaptor<KeyedMessage> captor = ArgumentCaptor.forClass(KeyedMessage.class); + Mockito.verify(producer.producer).send(captor.capture()); + assertEquals("4", captor.getValue().key()); + assertEquals("sometopic", captor.getValue().topic()); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java new file mode 100644 index 0000000..d0eb738 --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java @@ -0,0 +1,24 @@ +package org.apache.camel.component.kafka; + +import kafka.producer.Partitioner; +import kafka.utils.VerifiableProperties; + +/** + * @author Stephen Samuel + */ +public class SimplePartitioner implements Partitioner<String> { + + public SimplePartitioner(VerifiableProperties props) { + } + + /** + * Uses the key to calculate a partition bucket id for routing + * the data to the appropriate broker partition + * + * @return an integer between 0 and numPartitions-1 + */ + @Override + public int partition(String key, int numPartitions) { + return key.hashCode() % numPartitions; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/test/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/resources/log4j.xml b/components/camel-kafka/src/test/resources/log4j.xml new file mode 100644 index 0000000..ff6e863 --- /dev/null +++ b/components/camel-kafka/src/test/resources/log4j.xml @@ -0,0 +1,19 @@ +<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> +<log4j:configuration threshold="all" debug="true" xmlns:log4j="http://jakarta.apache.org/log4j/"> + + <appender name="console" class="org.apache.log4j.ConsoleAppender"> + <param name="Target" value="System.out"/> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="[%t] %-5p %c{1}.%M - %m%n"/> + </layout> + </appender> + + <logger name="org.apache.camel.component.kafka"> + <level value="debug"/> + </logger> + <root> + <level value="warn"/> + <appender-ref ref="console"/> + </root> + +</log4j:configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/pom.xml ---------------------------------------------------------------------- diff --git a/components/pom.xml b/components/pom.xml index d061a9a..f921954 100644 --- a/components/pom.xml +++ b/components/pom.xml @@ -118,6 +118,7 @@ <module>camel-jt400</module> <module>camel-juel</module> <module>camel-jxpath</module> + <module>camel-kafka</module> <module>camel-kestrel</module> <module>camel-krati</module> <module>camel-ldap</module>
