Updated Branches:
  refs/heads/master a346c1c1c -> e5f44a11a

CAMWL-7092. Add camel-kafka component. Thanks Stephen Samuel for patch


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e5f44a11
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e5f44a11
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e5f44a11

Branch: refs/heads/master
Commit: e5f44a11a1750bc2bd151bd56ffac6c714190641
Parents: a346c1c
Author: Hadrian Zbarcea <[email protected]>
Authored: Fri Feb 7 17:28:16 2014 -0500
Committer: Hadrian Zbarcea <[email protected]>
Committed: Fri Feb 7 17:28:16 2014 -0500

----------------------------------------------------------------------
 components/camel-kafka/pom.xml                  |  75 ++++++++++
 .../camel/component/kafka/KafkaComponent.java   |  44 ++++++
 .../camel/component/kafka/KafkaConstants.java   |  30 ++++
 .../camel/component/kafka/KafkaConsumer.java    | 124 ++++++++++++++++
 .../camel/component/kafka/KafkaEndpoint.java    | 142 +++++++++++++++++++
 .../camel/component/kafka/KafkaProducer.java    |  77 ++++++++++
 .../services/org/apache/camel/component/kafka   |   1 +
 .../component/kafka/KafkaComponentTest.java     |  40 ++++++
 .../camel/component/kafka/KafkaConsumerIT.java  |  78 ++++++++++
 .../component/kafka/KafkaConsumerTest.java      |  45 ++++++
 .../component/kafka/KafkaEndpointTest.java      |  45 ++++++
 .../camel/component/kafka/KafkaProducerIT.java  | 112 +++++++++++++++
 .../component/kafka/KafkaProducerTest.java      |  80 +++++++++++
 .../component/kafka/SimplePartitioner.java      |  24 ++++
 .../camel-kafka/src/test/resources/log4j.xml    |  19 +++
 components/pom.xml                              |   1 +
 16 files changed, 937 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-kafka/pom.xml b/components/camel-kafka/pom.xml
new file mode 100644
index 0000000..f469b30
--- /dev/null
+++ b/components/camel-kafka/pom.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.camel</groupId>
+        <artifactId>components</artifactId>
+        <version>2.13-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>camel-kafka</artifactId>
+    <packaging>bundle</packaging>
+    <name>Camel :: Kafka</name>
+    <description>Camel kafka support</description>
+
+    <properties>
+        
<camel.osgi.export.pkg>org.apache.camel.component.kafka.*</camel.osgi.export.pkg>
+        <kafka.version>0.8.0</kafka.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.10</artifactId>
+            <version>${kafka.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala-version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
new file mode 100644
index 0000000..d3d4679
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.impl.DefaultComponent;
+
+/**
+ * @author Stephen Samuel
+ */
+public class KafkaComponent extends DefaultComponent {
+
+    public KafkaComponent() {
+    }
+
+    public KafkaComponent(CamelContext context) {
+        super(context);
+    }
+
+    @Override
+    protected KafkaEndpoint createEndpoint(String uri,
+                                           String remaining,
+                                           Map<String, Object> params) throws 
Exception {
+        KafkaEndpoint endpoint = new KafkaEndpoint(uri, remaining, this);
+        setProperties(endpoint, params);
+        return endpoint;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
new file mode 100644
index 0000000..b7f6bdf
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+/**
+ * @author Stephen Samuel
+ */
+public class KafkaConstants {
+
+    public static final String DEFAULT_GROUP = "group1";
+
+    public static final String PARTITION_KEY = "kafka.PARTITION_KEY";
+    public static final String PARTITION = "kafka.EXCHANGE_NAME";
+    public static final String KEY = "kafka.CONTENT_TYPE";
+    public static final String TOPIC = "kafka.TOPIC";
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
new file mode 100644
index 0000000..b4c605a
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+
+/**
+ * @author Stephen Samuel
+ */
+public class KafkaConsumer extends DefaultConsumer {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(KafkaConsumer.class);
+
+    private final KafkaEndpoint endpoint;
+    private final Processor processor;
+
+    ConsumerConnector consumer;
+    ExecutorService executor;
+
+    public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+        this.endpoint = endpoint;
+        this.processor = processor;
+        if (endpoint.getZookeeperHost() == null)
+            throw new IllegalArgumentException("zookeeper host must be 
specified");
+        if (endpoint.getZookeeperPort() == 0)
+            throw new IllegalArgumentException("zookeeper port must be 
specified");
+        if (endpoint.getGroupId() == null)
+            throw new IllegalArgumentException("groupId must not be null");
+    }
+
+    Properties getProps() {
+        Properties props = new Properties();
+        props.put("zookeeper.connect", endpoint.getZookeeperHost() + ":" + 
endpoint.getZookeeperPort());
+        props.put("group.id", endpoint.getGroupId());
+        props.put("zookeeper.session.timeout.ms", "400");
+        props.put("zookeeper.sync.time.ms", "200");
+        props.put("auto.commit.interval.ms", "1000");
+        return props;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        log.info("Starting Kafka consumer");
+
+        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new 
ConsumerConfig(getProps()));
+
+        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
+        topicCountMap.put(endpoint.getTopic(), endpoint.getConsumerStreams());
+        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
consumer.createMessageStreams(topicCountMap);
+        List<KafkaStream<byte[], byte[]>> streams = 
consumerMap.get(endpoint.getTopic());
+
+        executor = endpoint.createExecutor();
+        for (final KafkaStream stream : streams) {
+            executor.submit(new ConsumerTask(stream));
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        log.info("Stopping Kafka consumer");
+
+        if (consumer != null)
+            consumer.shutdown();
+        if (executor != null)
+            executor.shutdown();
+        executor = null;
+    }
+
+    class ConsumerTask implements Runnable {
+
+        private KafkaStream stream;
+
+        public ConsumerTask(KafkaStream stream) {
+            this.stream = stream;
+        }
+
+        public void run() {
+            ConsumerIterator<byte[], byte[]> it = stream.iterator();
+            while ((Boolean) it.hasNext()) {
+                MessageAndMetadata<byte[], byte[]> mm = it.next();
+                Exchange exchange = endpoint.createKafkaExchange(mm);
+                try {
+                    processor.process(exchange);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
new file mode 100644
index 0000000..6ad9887
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+import java.net.URISyntaxException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultMessage;
+
+import kafka.message.MessageAndMetadata;
+
+/**
+ * @author Stephen Samuel
+ */
+public class KafkaEndpoint extends DefaultEndpoint {
+
+    private String brokers;
+    private String zookeeperHost;
+    private int zookeeperPort;
+    private String groupId;
+    private int consumerStreams = 10;
+    private String partitioner;
+    private String topic;
+
+    public String getZookeeperHost() {
+        return zookeeperHost;
+    }
+
+    public void setZookeeperHost(String zookeeperHost) {
+        this.zookeeperHost = zookeeperHost;
+    }
+
+    public int getZookeeperPort() {
+        return zookeeperPort;
+    }
+
+    public void setZookeeperPort(int zookeeperPort) {
+        this.zookeeperPort = zookeeperPort;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
+    public String getPartitioner() {
+        return partitioner;
+    }
+
+    public void setPartitioner(String partitioner) {
+        this.partitioner = partitioner;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public String getBrokers() {
+        return brokers;
+    }
+
+    public int getConsumerStreams() {
+        return consumerStreams;
+    }
+
+    public void setConsumerStreams(int consumerStreams) {
+        this.consumerStreams = consumerStreams;
+    }
+
+    public KafkaEndpoint() {
+    }
+
+    public KafkaEndpoint(String endpointUri,
+                         String remaining,
+                         KafkaComponent component) throws URISyntaxException {
+        super(endpointUri, component);
+        this.brokers = remaining.split("\\?")[0];
+    }
+
+    public Exchange createKafkaExchange(MessageAndMetadata<byte[], byte[]> mm) 
{
+        Exchange exchange = new DefaultExchange(getCamelContext(), 
getExchangePattern());
+
+        Message message = new DefaultMessage();
+        message.setHeader(KafkaConstants.PARTITION, mm.partition());
+        message.setHeader(KafkaConstants.TOPIC, mm.topic());
+        message.setHeader(KafkaConstants.KEY, new String(mm.key()));
+        exchange.setIn(message);
+
+        return exchange;
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        KafkaConsumer consumer = new KafkaConsumer(this, processor);
+        configureConsumer(consumer);
+        return consumer;
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        return new KafkaProducer(this);
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+
+    public ThreadPoolExecutor createExecutor() {
+        return (ThreadPoolExecutor) 
Executors.newFixedThreadPool(getConsumerStreams());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
new file mode 100644
index 0000000..d930313
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+import java.util.Properties;
+
+import org.apache.camel.CamelException;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultProducer;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
+/**
+ * @author Stephen Samuel
+ */
+public class KafkaProducer extends DefaultProducer {
+
+    private final KafkaEndpoint endpoint;
+    Producer<String, String> producer;
+
+    public KafkaProducer(KafkaEndpoint endpoint) throws 
ClassNotFoundException, IllegalAccessException,
+            InstantiationException {
+        super(endpoint);
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (producer != null)
+            producer.close();
+    }
+
+    Properties getProps() {
+        Properties props = new Properties();
+        props.put("metadata.broker.list", endpoint.getBrokers());
+        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        props.put("partitioner.class", endpoint.getPartitioner());
+        props.put("request.required.acks", "1");
+        return props;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        Properties props = getProps();
+        ProducerConfig config = new ProducerConfig(props);
+        producer = new Producer<String, String>(config);
+    }
+
+    @Override
+    public void process(Exchange exchange) throws CamelException {
+
+        Object partitionKey = 
exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY);
+        if (partitionKey == null)
+            throw new CamelException("No partition key set");
+        String msg = exchange.getIn().getBody(String.class);
+
+        KeyedMessage<String, String> data =
+                new KeyedMessage<String, String>(endpoint.getTopic(), 
partitionKey.toString(), msg);
+        producer.send(data);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/main/resources/META-INF/services/org/apache/camel/component/kafka
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/resources/META-INF/services/org/apache/camel/component/kafka
 
b/components/camel-kafka/src/main/resources/META-INF/services/org/apache/camel/component/kafka
new file mode 100644
index 0000000..c0dc9d1
--- /dev/null
+++ 
b/components/camel-kafka/src/main/resources/META-INF/services/org/apache/camel/component/kafka
@@ -0,0 +1 @@
+class=org.apache.camel.component.kafka.KafkaComponent
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
new file mode 100644
index 0000000..b7fe4eb
--- /dev/null
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
@@ -0,0 +1,40 @@
+package org.apache.camel.component.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Stephen Samuel
+ */
+public class KafkaComponentTest {
+
+    private CamelContext context = Mockito.mock(CamelContext.class);
+
+    @Test
+    public void testPropertiesSet() throws Exception {
+        Map<String, Object> params = new HashMap<String, Object>();
+        params.put("zookeeperHost", "somehost");
+        params.put("zookeeperPort", 2987);
+        params.put("portNumber", 14123);
+        params.put("consumerStreams", "3");
+        params.put("topic", "mytopic");
+        params.put("partitioner", "com.class.Party");
+
+        String uri = "kafka:broker1:12345,broker2:12566";
+        String remaining = "broker1:12345,broker2:12566";
+
+        KafkaEndpoint endpoint = new 
KafkaComponent(context).createEndpoint(uri, remaining, params);
+        assertEquals("somehost", endpoint.getZookeeperHost());
+        assertEquals(2987, endpoint.getZookeeperPort());
+        assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers());
+        assertEquals("mytopic", endpoint.getTopic());
+        assertEquals(3, endpoint.getConsumerStreams());
+        assertEquals("com.class.Party", endpoint.getPartitioner());
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
new file mode 100644
index 0000000..99b5c91
--- /dev/null
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java
@@ -0,0 +1,78 @@
+package org.apache.camel.component.kafka;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
+/**
+ * @author Stephen Samuel
+ *         <p/>
+ *         <p/>
+ *         The Producer IT tests require a Kafka broker running on 9092 and a 
zookeeper instance running on 2181.
+ *         The broker must have a topic called test created.
+ */
+public class KafkaConsumerIT extends CamelTestSupport {
+
+    public static final String TOPIC = "test";
+
+    @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC +
+            "&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1")
+    private Endpoint from;
+
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint to;
+
+    private Producer<String, String> producer;
+
+    @Before
+    public void before() {
+        Properties props = new Properties();
+        props.put("metadata.broker.list", "localhost:9092");
+        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        props.put("partitioner.class", 
"org.apache.camel.component.kafka.SimplePartitioner");
+        props.put("request.required.acks", "1");
+
+        ProducerConfig config = new ProducerConfig(props);
+        producer = new Producer<String, String>(config);
+    }
+
+    @After
+    public void after() {
+        producer.close();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+                from(from).to(to);
+            }
+        };
+    }
+
+    @Test
+    public void kaftMessageIsConsumedByCamel() throws InterruptedException, 
IOException {
+        to.expectedMessageCount(5);
+        for (int k = 0; k < 5; k++) {
+            String msg = "message-" + k;
+            KeyedMessage<String, String> data = new KeyedMessage<String, 
String>(TOPIC, "1", msg);
+            producer.send(data);
+        }
+        to.assertIsSatisfied(3000);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
new file mode 100644
index 0000000..8aa756f
--- /dev/null
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
@@ -0,0 +1,45 @@
+package org.apache.camel.component.kafka;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.camel.Processor;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * @author Stephen Samuel
+ */
+public class KafkaConsumerTest {
+
+    private KafkaEndpoint endpoint = mock(KafkaEndpoint.class);
+    private Processor processor = mock(Processor.class);
+
+    @Test(expected = IllegalArgumentException.class)
+    public void consumerRequiresZookeeperHost() throws Exception {
+        Mockito.when(endpoint.getZookeeperPort()).thenReturn(2181);
+        new KafkaConsumer(endpoint, processor);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void consumerRequiresZookeeperPort() throws Exception {
+        Mockito.when(endpoint.getZookeeperHost()).thenReturn("localhost");
+        new KafkaConsumer(endpoint, processor);
+    }
+
+    @Test
+    public void testStoppingConsumerShutsdownExecutor() throws Exception {
+
+        when(endpoint.getZookeeperHost()).thenReturn("localhost");
+        when(endpoint.getZookeeperPort()).thenReturn(2181);
+        when(endpoint.getGroupId()).thenReturn("12345");
+
+        KafkaConsumer consumer = new KafkaConsumer(endpoint, processor);
+
+        ThreadPoolExecutor e = mock(ThreadPoolExecutor.class);
+        consumer.executor = e;
+        consumer.doStop();
+        verify(e).shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
new file mode 100644
index 0000000..767d4d5
--- /dev/null
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
@@ -0,0 +1,45 @@
+package org.apache.camel.component.kafka;
+
+import java.net.URISyntaxException;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.camel.Exchange;
+import org.junit.Test;
+
+import kafka.message.MessageAndMetadata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Stephen Samuel
+ */
+public class KafkaEndpointTest {
+
+    @Test
+    public void testCreatingKafkaExchangeSetsHeaders() throws 
URISyntaxException {
+        KafkaEndpoint endpoint = new KafkaEndpoint("kafka:localhost", 
"localhost", new KafkaComponent());
+
+        MessageAndMetadata<byte[], byte[]> mm =
+                new MessageAndMetadata<byte[], byte[]>("somekey".getBytes(), 
"mymessage".getBytes(), "topic", 4, 56);
+
+        Exchange exchange = endpoint.createKafkaExchange(mm);
+        assertEquals("somekey", 
exchange.getIn().getHeader(KafkaConstants.KEY));
+        assertEquals("topic", 
exchange.getIn().getHeader(KafkaConstants.TOPIC));
+        assertEquals(4, exchange.getIn().getHeader(KafkaConstants.PARTITION));
+    }
+
+    @Test
+    public void creatingExecutorUsesThreadPoolSettings() throws Exception {
+        KafkaEndpoint endpoint = new KafkaEndpoint("kafka:localhost", 
"kafka:localhost", new KafkaComponent());
+        endpoint.setConsumerStreams(44);
+        ThreadPoolExecutor executor = endpoint.createExecutor();
+        assertEquals(44, executor.getCorePoolSize());
+    }
+
+    @Test
+    public void assertSingleton() throws URISyntaxException {
+        KafkaEndpoint endpoint = new KafkaEndpoint("kafka:localhost", 
"localhost", new KafkaComponent());
+        assertTrue(endpoint.isSingleton());
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
new file mode 100644
index 0000000..2e98d9c
--- /dev/null
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
@@ -0,0 +1,112 @@
+package org.apache.camel.component.kafka;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+
+/**
+ * @author Stephen Samuel
+ *         <p/>
+ *         <p/>
+ *         The Producer IT tests require a Kafka broker running on 9092 and a 
zookeeper instance running on 2181.
+ *         The broker must have a topic called test created.
+ */
+public class KafkaProducerIT extends CamelTestSupport {
+
+    public static final String TOPIC = "test";
+
+    @EndpointInject(uri =
+            "kafka:localhost:9092?topic=" + TOPIC + 
"&partitioner=org.apache.camel.component.kafka.SimplePartitioner")
+    private Endpoint to;
+
+    @Produce(uri = "direct:start")
+    protected ProducerTemplate template;
+
+    private ConsumerConnector kafkaConsumer;
+
+    @Before
+    public void before() {
+        Properties props = new Properties();
+        props.put("zookeeper.connect", "localhost:2181");
+        props.put("group.id", KafkaConstants.DEFAULT_GROUP);
+        props.put("zookeeper.session.timeout.ms", "400");
+        props.put("zookeeper.sync.time.ms", "200");
+        props.put("auto.commit.interval.ms", "1000");
+
+        kafkaConsumer = 
kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
+    }
+
+    @After
+    public void after() {
+        kafkaConsumer.shutdown();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to(to);
+            }
+        };
+    }
+
+    @Test
+    public void producedMessageIsReceivedByKafka() throws 
InterruptedException, IOException {
+
+        final List<String> messages = new ArrayList<String>();
+
+        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
+        topicCountMap.put(TOPIC, 5);
+        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
kafkaConsumer.createMessageStreams(topicCountMap);
+        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC);
+
+        ExecutorService executor = Executors.newFixedThreadPool(5);
+        for (final KafkaStream stream : streams) {
+            executor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    ConsumerIterator<byte[], byte[]> it = stream.iterator();
+                    while (it.hasNext()) {
+                        String msg = new String(it.next().message());
+                        messages.add(msg);
+                    }
+                }
+            });
+        }
+
+        for (int k = 0; k < 10; k++) {
+            template.sendBodyAndHeader("IT test message", 
KafkaConstants.PARTITION_KEY, "1");
+        }
+
+        for (int k = 0; k < 20; k++) {
+            if (messages.size() == 10)
+                return;
+            Thread.sleep(200);
+        }
+
+        fail();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
new file mode 100644
index 0000000..6fe7010
--- /dev/null
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -0,0 +1,80 @@
+package org.apache.camel.component.kafka;
+
+import java.net.URISyntaxException;
+import java.util.Properties;
+
+import org.apache.camel.CamelException;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.impl.DefaultMessage;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Stephen Samuel
+ */
+public class KafkaProducerTest {
+
+    private KafkaProducer producer;
+    private KafkaEndpoint endpoint;
+
+    private Exchange exchange = Mockito.mock(Exchange.class);
+    private Message in = new DefaultMessage();
+
+    public KafkaProducerTest() throws IllegalAccessException, 
InstantiationException, ClassNotFoundException,
+            URISyntaxException {
+        endpoint = new 
KafkaEndpoint("kafka:broker1:1234,broker2:4567?topic=sometopic",
+                "broker1:1234," + "broker2:4567?topic=sometopic", null);
+        producer = new KafkaProducer(endpoint);
+        producer.producer = Mockito.mock(Producer.class);
+    }
+
+    @Test
+    public void testPropertyBuilder() throws Exception {
+        endpoint.setPartitioner("com.sksamuel.someclass");
+        Properties props = producer.getProps();
+        assertEquals("com.sksamuel.someclass", 
props.getProperty("partitioner.class"));
+        assertEquals("broker1:1234,broker2:4567", 
props.getProperty("metadata.broker.list"));
+    }
+
+    @Test
+    public void processSendsMesssage() throws Exception {
+
+        endpoint.setTopic("sometopic");
+        Mockito.when(exchange.getIn()).thenReturn(in);
+        in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+
+        producer.process(exchange);
+
+        
Mockito.verify(producer.producer).send(Matchers.any(KeyedMessage.class));
+    }
+
+    @Test(expected = CamelException.class)
+    public void processRequiresPartitionHeader() throws Exception {
+        endpoint.setTopic("sometopic");
+        Mockito.when(exchange.getIn()).thenReturn(in);
+        producer.process(exchange);
+    }
+
+    @Test
+    public void processSendsMesssageWithPartitionKeyHeader() throws Exception {
+
+        endpoint.setTopic("sometopic");
+        Mockito.when(exchange.getIn()).thenReturn(in);
+        in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+
+        producer.process(exchange);
+
+        ArgumentCaptor<KeyedMessage> captor = 
ArgumentCaptor.forClass(KeyedMessage.class);
+        Mockito.verify(producer.producer).send(captor.capture());
+        assertEquals("4", captor.getValue().key());
+        assertEquals("sometopic", captor.getValue().topic());
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java
new file mode 100644
index 0000000..d0eb738
--- /dev/null
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java
@@ -0,0 +1,24 @@
+package org.apache.camel.component.kafka;
+
+import kafka.producer.Partitioner;
+import kafka.utils.VerifiableProperties;
+
+/**
+ * @author Stephen Samuel
+ */
+public class SimplePartitioner implements Partitioner<String> {
+
+    public SimplePartitioner(VerifiableProperties props) {
+    }
+
+    /**
+     * Uses the key to calculate a partition bucket id for routing
+     * the data to the appropriate broker partition
+     *
+     * @return an integer between 0 and numPartitions-1
+     */
+    @Override
+    public int partition(String key, int numPartitions) {
+        return key.hashCode() % numPartitions;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/camel-kafka/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/resources/log4j.xml 
b/components/camel-kafka/src/test/resources/log4j.xml
new file mode 100644
index 0000000..ff6e863
--- /dev/null
+++ b/components/camel-kafka/src/test/resources/log4j.xml
@@ -0,0 +1,19 @@
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration threshold="all" debug="true" 
xmlns:log4j="http://jakarta.apache.org/log4j/";>
+
+    <appender name="console" class="org.apache.log4j.ConsoleAppender">
+        <param name="Target" value="System.out"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="[%t] %-5p %c{1}.%M - %m%n"/>
+        </layout>
+    </appender>
+
+    <logger name="org.apache.camel.component.kafka">
+        <level value="debug"/>
+    </logger>
+    <root>
+        <level value="warn"/>
+        <appender-ref ref="console"/>
+    </root>
+
+</log4j:configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/e5f44a11/components/pom.xml
----------------------------------------------------------------------
diff --git a/components/pom.xml b/components/pom.xml
index d061a9a..f921954 100644
--- a/components/pom.xml
+++ b/components/pom.xml
@@ -118,6 +118,7 @@
     <module>camel-jt400</module>
     <module>camel-juel</module>
     <module>camel-jxpath</module>
+    <module>camel-kafka</module>
     <module>camel-kestrel</module>
     <module>camel-krati</module>
     <module>camel-ldap</module>

Reply via email to