Repository: camel
Updated Branches:
  refs/heads/master 038e1617f -> 621a704c4


http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index 7934f41..0bb4740 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -17,24 +17,22 @@
 package org.apache.camel.component.kafka;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
 import org.apache.camel.Endpoint;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.Produce;
 import org.apache.camel.ProducerTemplate;
+import org.apache.camel.RoutesBuilder;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -42,7 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
-    
+
     private static final String TOPIC_STRINGS = "test";
     private static final String TOPIC_STRINGS_IN_HEADER = "testHeader";
     private static final String TOPIC_BYTES = "testBytes";
@@ -52,15 +50,16 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaProducerFullTest.class);
 
-    private static ConsumerConnector stringsConsumerConn;
-    private static ConsumerConnector bytesConsumerConn;
+    private static KafkaConsumer<String, String> stringsConsumerConn;
+    private static KafkaConsumer<byte[], byte[]> bytesConsumerConn;
 
     @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + 
TOPIC_STRINGS
-        + 
"&partitioner=org.apache.camel.component.kafka.SimplePartitioner&serializerClass=kafka.serializer.StringEncoder"
-        + "&requestRequiredAcks=-1")
+            + "&requestRequiredAcks=-1")
     private Endpoint toStrings;
 
-    @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + 
TOPIC_BYTES + "&requestRequiredAcks=-1")
+    @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + 
TOPIC_BYTES + "&requestRequiredAcks=-1"
+            + 
"&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer&"
+            + 
"keySerializerClass=org.apache.kafka.common.serialization.ByteArraySerializer")
     private Endpoint toBytes;
 
     @Produce(uri = "direct:startStrings")
@@ -69,46 +68,42 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
     @Produce(uri = "direct:startBytes")
     private ProducerTemplate bytesTemplate;
 
-
     @BeforeClass
     public static void before() {
         Properties stringsProps = new Properties();
-       
-        stringsProps.put("zookeeper.connect", "localhost:" + 
getZookeeperPort());
-        stringsProps.put("group.id", GROUP_STRINGS);
-        stringsProps.put("zookeeper.session.timeout.ms", "6000");
-        stringsProps.put("zookeeper.connectiontimeout.ms", "12000");
-        stringsProps.put("zookeeper.sync.time.ms", "200");
-        stringsProps.put("auto.commit.interval.ms", "1000");
-        stringsProps.put("auto.offset.reset", "smallest");
-        stringsConsumerConn = 
kafka.consumer.Consumer.createJavaConsumerConnector(new 
ConsumerConfig(stringsProps));
+
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
 "localhost:" + getKarfkaPort());
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG,
 "DemoConsumer");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
 "true");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
 "1000");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
 "30000");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 "org.apache.kafka.common.serialization.StringDeserializer");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 "org.apache.kafka.common.serialization.StringDeserializer");
+        
stringsProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
 "earliest");
+        stringsConsumerConn = new KafkaConsumer<String, String>(stringsProps);
 
         Properties bytesProps = new Properties();
         bytesProps.putAll(stringsProps);
         bytesProps.put("group.id", GROUP_BYTES);
-        bytesConsumerConn = 
kafka.consumer.Consumer.createJavaConsumerConnector(new 
ConsumerConfig(bytesProps));
+        
bytesProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        
bytesProps.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        bytesConsumerConn = new KafkaConsumer<byte[], byte[]>(bytesProps);
     }
 
     @AfterClass
     public static void after() {
-        stringsConsumerConn.shutdown();
-        bytesConsumerConn.shutdown();
+        stringsConsumerConn.close();
+        bytesConsumerConn.close();
     }
 
     @Override
-    protected RouteBuilder[] createRouteBuilders() throws Exception {
-        return new RouteBuilder[] {
-            new RouteBuilder() {
-                @Override
-                public void configure() throws Exception {
-                    from("direct:startStrings").to(toStrings);
-                }
-            },
-            new RouteBuilder() {
-                @Override
-                public void configure() throws Exception {
-                    from("direct:startBytes").to(toBytes);
-                }
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:startStrings").to(toStrings);
+
+                from("direct:startBytes").to(toBytes);
             }
         };
     }
@@ -120,14 +115,11 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
 
         CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + 
messageInOtherTopic);
 
-        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
-        topicCountMap.put(TOPIC_STRINGS, 5);
-        topicCountMap.put(TOPIC_STRINGS_IN_HEADER, 5);
-        createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, 
TOPIC_STRINGS_IN_HEADER, messagesLatch, topicCountMap);
-
         sendMessagesInRoute(messageInTopic, stringsTemplate, "IT test 
message", KafkaConstants.PARTITION_KEY, "1");
         sendMessagesInRoute(messageInOtherTopic, stringsTemplate, "IT test 
message in other topic", KafkaConstants.PARTITION_KEY, "1", 
KafkaConstants.TOPIC, TOPIC_STRINGS_IN_HEADER);
 
+        createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, 
TOPIC_STRINGS_IN_HEADER, messagesLatch);
+
         boolean allMessagesReceived = messagesLatch.await(200, 
TimeUnit.MILLISECONDS);
 
         assertTrue("Not all messages were published to the kafka topics. Not 
received: " + messagesLatch.getCount(), allMessagesReceived);
@@ -140,11 +132,6 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
 
         CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + 
messageInOtherTopic);
 
-        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
-        topicCountMap.put(TOPIC_BYTES, 5);
-        topicCountMap.put(TOPIC_BYTES_IN_HEADER, 5);
-        createKafkaMessageConsumer(bytesConsumerConn, TOPIC_BYTES, 
TOPIC_BYTES_IN_HEADER, messagesLatch, topicCountMap);
-
         Map<String, Object> inTopicHeaders = new HashMap<String, Object>();
         inTopicHeaders.put(KafkaConstants.PARTITION_KEY, "1".getBytes());
         sendMessagesInRoute(messageInTopic, bytesTemplate, "IT test 
message".getBytes(), inTopicHeaders);
@@ -154,22 +141,47 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
         otherTopicHeaders.put(KafkaConstants.TOPIC, TOPIC_BYTES_IN_HEADER);
         sendMessagesInRoute(messageInOtherTopic, bytesTemplate, "IT test 
message in other topic".getBytes(), otherTopicHeaders);
 
+        createKafkaBytesMessageConsumer(bytesConsumerConn, TOPIC_BYTES, 
TOPIC_BYTES_IN_HEADER, messagesLatch);
+
         boolean allMessagesReceived = messagesLatch.await(200, 
TimeUnit.MILLISECONDS);
 
         assertTrue("Not all messages were published to the kafka topics. Not 
received: " + messagesLatch.getCount(), allMessagesReceived);
     }
 
-    private void createKafkaMessageConsumer(ConsumerConnector consumerConn, 
String topic, String topicInHeader,
-                                            CountDownLatch messagesLatch, 
Map<String, Integer> topicCountMap) {
-        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
consumerConn.createMessageStreams(topicCountMap);
+    private void createKafkaMessageConsumer(KafkaConsumer<String, String> 
consumerConn,
+                                            String topic, String 
topicInHeader, CountDownLatch messagesLatch) {
+
+        consumerConn.subscribe(Arrays.asList(topic, topicInHeader));
+        boolean run = true;
 
-        ExecutorService executor = Executors.newFixedThreadPool(10);
-        for (final KafkaStream<byte[], byte[]> stream : 
consumerMap.get(topic)) {
-            executor.submit(new KakfaTopicConsumer(stream, messagesLatch));
+        while (run) {
+            ConsumerRecords<String, String> records = consumerConn.poll(100);
+            for (ConsumerRecord<String, String> record : records) {
+                messagesLatch.countDown();
+                if (messagesLatch.getCount() == 0) {
+                    run = false;
+                }
+            }
         }
-        for (final KafkaStream<byte[], byte[]> stream : 
consumerMap.get(topicInHeader)) {
-            executor.submit(new KakfaTopicConsumer(stream, messagesLatch));
+
+    }
+
+    private void createKafkaBytesMessageConsumer(KafkaConsumer<byte[], byte[]> 
consumerConn, String topic,
+                                                 String topicInHeader, 
CountDownLatch messagesLatch) {
+
+        consumerConn.subscribe(Arrays.asList(topic, topicInHeader));
+        boolean run = true;
+
+        while (run) {
+            ConsumerRecords<byte[], byte[]> records = consumerConn.poll(100);
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                messagesLatch.countDown();
+                if (messagesLatch.getCount() == 0) {
+                    run = false;
+                }
+            }
         }
+
     }
 
     private void sendMessagesInRoute(int messages, ProducerTemplate template, 
Object bodyOther, String... headersWithValue) {
@@ -186,23 +198,4 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
         }
     }
 
-    private static class KakfaTopicConsumer implements Runnable {
-        private final KafkaStream<byte[], byte[]> stream;
-        private final CountDownLatch latch;
-
-        public KakfaTopicConsumer(KafkaStream<byte[], byte[]> stream, 
CountDownLatch latch) {
-            this.stream = stream;
-            this.latch = latch;
-        }
-
-        @Override
-        public void run() {
-            ConsumerIterator<byte[], byte[]> it = stream.iterator();
-            while (it.hasNext()) {
-                String msg = new String(it.next().message());
-                LOG.info("Get the message" + msg);
-                latch.countDown();
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index f2bcd6b..98f6421 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -18,12 +18,12 @@ package org.apache.camel.component.kafka;
 
 import java.util.Properties;
 
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
 import org.apache.camel.CamelException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.impl.DefaultMessage;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Matchers;
@@ -41,18 +41,17 @@ public class KafkaProducerTest {
 
     @SuppressWarnings({"unchecked"})
     public KafkaProducerTest() throws Exception {
-        endpoint = new 
KafkaEndpoint("kafka:broker1:1234,broker2:4567?topic=sometopic", null);
+        endpoint = new KafkaEndpoint(
+                "kafka:broker1:1234,broker2:4567?topic=sometopic", null);
         endpoint.setBrokers("broker1:1234,broker2:4567");
         producer = new KafkaProducer(endpoint);
-        producer.producer = Mockito.mock(Producer.class);
+        
producer.setKafkaProducer(Mockito.mock(org.apache.kafka.clients.producer.KafkaProducer.class));
     }
 
     @Test
     public void testPropertyBuilder() throws Exception {
-        endpoint.setPartitioner("com.sksamuel.someclass");
         Properties props = producer.getProps();
-        assertEquals("com.sksamuel.someclass", 
props.getProperty("partitioner.class"));
-        assertEquals("broker1:1234,broker2:4567", 
props.getProperty("metadata.broker.list"));
+        assertEquals("broker1:1234,broker2:4567", 
props.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
     }
 
     @Test
@@ -63,20 +62,18 @@ public class KafkaProducerTest {
         in.setHeader(KafkaConstants.PARTITION_KEY, "4");
 
         producer.process(exchange);
-
-        
Mockito.verify(producer.producer).send(Matchers.any(KeyedMessage.class));
+        
Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class));
     }
 
     @Test
     public void processSendsMessageWithTopicHeaderAndNoTopicInEndPoint() 
throws Exception {
         endpoint.setTopic(null);
         Mockito.when(exchange.getIn()).thenReturn(in);
-        in.setHeader(KafkaConstants.PARTITION_KEY, "4");
         in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
 
         producer.process(exchange);
 
-        verifySendMessage("4", "anotherTopic", "4");
+        verifySendMessage("anotherTopic");
     }
 
     @Test
@@ -112,10 +109,9 @@ public class KafkaProducerTest {
         endpoint.setTopic("someTopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
         in.setHeader(KafkaConstants.PARTITION_KEY, "4");
-
+        in.setHeader(KafkaConstants.KEY, "someKey");
         producer.process(exchange);
-
-        verifySendMessage("4", "someTopic", "4");
+        verifySendMessage("4", "someTopic", "someKey");
     }
 
     @Test
@@ -126,9 +122,9 @@ public class KafkaProducerTest {
 
         producer.process(exchange);
 
-        verifySendMessage("someKey", "someTopic", "someKey");
+        verifySendMessage("someTopic", "someKey");
     }
-    
+
     @Test
     public void processSendMessageWithBridgeEndpoint() throws Exception {
         endpoint.setTopic("someTopic");
@@ -136,19 +132,44 @@ public class KafkaProducerTest {
         Mockito.when(exchange.getIn()).thenReturn(in);
         in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
         in.setHeader(KafkaConstants.KEY, "someKey");
-        
+        in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+        producer.process(exchange);
+
+        verifySendMessage("4", "someTopic", "someKey");
+    }
+
+    @Test // Message and Topic Name alone
+    public void processSendsMesssageWithMessageTopicName() throws Exception {
+        endpoint.setTopic("someTopic");
+        Mockito.when(exchange.getIn()).thenReturn(in);
+
         producer.process(exchange);
-        
-        verifySendMessage("someKey", "someTopic", "someKey");
+
+        verifySendMessage("someTopic");
     }
 
     @SuppressWarnings({"unchecked", "rawtypes"})
     protected void verifySendMessage(String partitionKey, String topic, String 
messageKey) {
-        ArgumentCaptor<KeyedMessage> captor = 
ArgumentCaptor.forClass(KeyedMessage.class);
-        Mockito.verify(producer.producer).send(captor.capture());
-        assertEquals(partitionKey, captor.getValue().partitionKey());
+        ArgumentCaptor<ProducerRecord> captor = 
ArgumentCaptor.forClass(ProducerRecord.class);
+        Mockito.verify(producer.getKafkaProducer()).send(captor.capture());
+        assertEquals(new Integer(partitionKey), captor.getValue().partition());
+        assertEquals(messageKey, captor.getValue().key());
+        assertEquals(topic, captor.getValue().topic());
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    protected void verifySendMessage(String topic, String messageKey) {
+        ArgumentCaptor<ProducerRecord> captor = 
ArgumentCaptor.forClass(ProducerRecord.class);
+        Mockito.verify(producer.getKafkaProducer()).send(captor.capture());
         assertEquals(messageKey, captor.getValue().key());
         assertEquals(topic, captor.getValue().topic());
     }
 
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    protected void verifySendMessage(String topic) {
+        ArgumentCaptor<ProducerRecord> captor = 
ArgumentCaptor.forClass(ProducerRecord.class);
+        Mockito.verify(producer.getKafkaProducer()).send(captor.capture());
+        assertEquals(topic, captor.getValue().topic());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java
deleted file mode 100644
index 039a2e7..0000000
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/SimplePartitioner.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.kafka;
-
-import kafka.producer.Partitioner;
-import kafka.utils.VerifiableProperties;
-
-public class SimplePartitioner implements Partitioner {
-
-    public SimplePartitioner(VerifiableProperties props) {
-    }
-
-    /**
-     * Uses the key to calculate a partition bucket id for routing
-     * the data to the appropriate broker partition
-     *
-     * @return an integer between 0 and numPartitions-1
-     */
-    @Override
-    public int partition(Object key, int numPartitions) {
-        return key.hashCode() % numPartitions;
-    }
-
-}
-

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
index ce11a47..42403c2 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
@@ -27,13 +27,9 @@ import kafka.admin.AdminUtils;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import kafka.utils.ZkUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import scala.Option;
 
 public class EmbeddedKafkaCluster {
-    private static final Logger LOG = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
-
     private final List<Integer> ports;
     private final String zkConnection;
     private final Properties baseProperties;
@@ -68,7 +64,7 @@ public class EmbeddedKafkaCluster {
         return null;
     }
 
-    public void createTopics(String...topics) {
+    public void createTopics(String... topics) {
         for (String topic : topics) {
             AdminUtils.createTopic(getZkUtils(), topic, 2, 1, new 
Properties());
         }
@@ -112,10 +108,10 @@ public class EmbeddedKafkaCluster {
             properties.setProperty("host.name", "localhost");
             properties.setProperty("port", Integer.toString(port));
             properties.setProperty("log.dir", logDir.getAbsolutePath());
-            properties.setProperty("num.partitions",  String.valueOf(1));
-            properties.setProperty("auto.create.topics.enable",  
String.valueOf(Boolean.TRUE));
+            properties.setProperty("num.partitions", String.valueOf(1));
+            properties.setProperty("auto.create.topics.enable", 
String.valueOf(Boolean.TRUE));
+            System.out.println("EmbeddedKafkaCluster: local directory: " + 
logDir.getAbsolutePath());
             properties.setProperty("log.flush.interval.messages", 
String.valueOf(1));
-            LOG.info("EmbeddedKafkaCluster: local directory: " + 
logDir.getAbsolutePath());
 
             KafkaServer broker = startBroker(properties);
 

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/resources/log4j.properties 
b/components/camel-kafka/src/test/resources/log4j.properties
index 67458fd..44266d1 100644
--- a/components/camel-kafka/src/test/resources/log4j.properties
+++ b/components/camel-kafka/src/test/resources/log4j.properties
@@ -32,4 +32,4 @@ log4j.appender.out=org.apache.log4j.FileAppender
 log4j.appender.out.layout=org.apache.log4j.PatternLayout
 log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - 
%m%n
 log4j.appender.out.file=target/camel-kafka-test.log
-log4j.appender.out.append=true
+log4j.appender.out.append=true
\ No newline at end of file

Reply via email to