http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 94893fb..e65ed3b 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -16,30 +16,19 @@
  */
 package org.apache.camel.component.kafka;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.Arrays;
 import java.util.Properties;
-import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.ConsumerTimeoutException;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
-import org.apache.camel.util.ObjectHelper;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- *
- */
 public class KafkaConsumer extends DefaultConsumer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaConsumer.class);
@@ -47,15 +36,14 @@ public class KafkaConsumer extends DefaultConsumer {
     protected ExecutorService executor;
     private final KafkaEndpoint endpoint;
     private final Processor processor;
-    private Map<ConsumerConnector, CyclicBarrier> consumerBarriers;
-
+    
     public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
         this.endpoint = endpoint;
         this.processor = processor;
-        this.consumerBarriers = new HashMap<ConsumerConnector, 
CyclicBarrier>();
-        if (endpoint.getZookeeperConnect() == null) {
-            throw new IllegalArgumentException("zookeeper host or zookeeper 
connect must be specified");
+
+        if (endpoint.getBrokers() == null) {
+            throw new IllegalArgumentException("BootStrap servers must be 
specified");
         }
         if (endpoint.getGroupId() == null) {
             throw new IllegalArgumentException("groupId must not be null");
@@ -64,57 +52,25 @@ public class KafkaConsumer extends DefaultConsumer {
 
     Properties getProps() {
         Properties props = 
endpoint.getConfiguration().createConsumerProperties();
-        props.put("zookeeper.connect", endpoint.getZookeeperConnect());
-        props.put("group.id", endpoint.getGroupId());
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
endpoint.getBrokers());
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, endpoint.getGroupId());
         return props;
     }
 
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        log.info("Starting Kafka consumer");
-
+        LOG.info("Starting Kafka consumer");
         executor = endpoint.createExecutor();
         for (int i = 0; i < endpoint.getConsumersCount(); i++) {
-            ConsumerConnector consumer = 
kafka.consumer.Consumer.createJavaConsumerConnector(new 
ConsumerConfig(getProps()));
-            Map<String, Integer> topicCountMap = new HashMap<String, 
Integer>();
-            topicCountMap.put(endpoint.getTopic(), 
endpoint.getConsumerStreams());
-            Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
consumer.createMessageStreams(topicCountMap);
-            List<KafkaStream<byte[], byte[]>> streams = 
consumerMap.get(endpoint.getTopic());
-
-            // commit periodically
-            if (endpoint.isAutoCommitEnable() != null && 
!endpoint.isAutoCommitEnable()) {
-                if ((endpoint.getConsumerTimeoutMs() == null || 
endpoint.getConsumerTimeoutMs() < 0)
-                        && endpoint.getConsumerStreams() > 1) {
-                    LOG.warn("consumerTimeoutMs is set to -1 (infinite) while 
requested multiple consumer streams.");
-                }
-                CyclicBarrier barrier = new 
CyclicBarrier(endpoint.getConsumerStreams(), new CommitOffsetTask(consumer));
-                for (final KafkaStream<byte[], byte[]> stream : streams) {
-                    executor.submit(new BatchingConsumerTask(stream, barrier));
-                }
-                consumerBarriers.put(consumer, barrier);
-            } else {
-                // auto commit
-                for (final KafkaStream<byte[], byte[]> stream : streams) {
-                    executor.submit(new AutoCommitConsumerTask(consumer, 
stream));
-                }
-                consumerBarriers.put(consumer, null);
-            }
+            executor.submit(new KafkaFetchRecords(endpoint.getTopic(), i + "", 
getProps()));
         }
-
     }
 
     @Override
     protected void doStop() throws Exception {
         super.doStop();
-        log.info("Stopping Kafka consumer");
-
-        for (ConsumerConnector consumer : consumerBarriers.keySet()) {
-            if (consumer != null) {
-                consumer.shutdown();
-            }
-        }
-        consumerBarriers.clear();
+        LOG.info("Stopping Kafka consumer");
 
         if (executor != null) {
             if (getEndpoint() != null && getEndpoint().getCamelContext() != 
null) {
@@ -126,102 +82,57 @@ public class KafkaConsumer extends DefaultConsumer {
         executor = null;
     }
 
-    class BatchingConsumerTask implements Runnable {
+    class KafkaFetchRecords implements Runnable {
 
-        private KafkaStream<byte[], byte[]> stream;
-        private CyclicBarrier barrier;
+        private final org.apache.kafka.clients.consumer.KafkaConsumer consumer;
+        private final String topicName;
+        private final String threadId;
+        private final Properties kafkaProps;
 
-        public BatchingConsumerTask(KafkaStream<byte[], byte[]> stream, 
CyclicBarrier barrier) {
-            this.stream = stream;
-            this.barrier = barrier;
+        public KafkaFetchRecords(String topicName, String id, Properties 
kafkaProps) {
+            this.topicName = topicName;
+            this.threadId = topicName + "-" + "Thread " + id;
+            this.kafkaProps = kafkaProps;
+            this.consumer = new 
org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
         }
 
+        @Override
+        @SuppressWarnings("unchecked")
         public void run() {
-
             int processed = 0;
-            boolean consumerTimeout;
-            MessageAndMetadata<byte[], byte[]> mm;
-            ConsumerIterator<byte[], byte[]> it = stream.iterator();
-            boolean hasNext = true;
-            while (hasNext) {
-                try {
-                    consumerTimeout = false;
-                    // only poll the next message if we are allowed to run and 
are not suspending
-                    if (isRunAllowed() && !isSuspendingOrSuspended() && 
it.hasNext()) {
-                        mm = it.next();
-                        Exchange exchange = endpoint.createKafkaExchange(mm);
+            try {
+                LOG.debug("Subscribing {} to topic {}", threadId, topicName);
+                consumer.subscribe(Arrays.asList(topicName));
+                while (isRunAllowed() && !isSuspendingOrSuspended()) {
+                    ConsumerRecords<Object, Object> records = 
consumer.poll(Long.MAX_VALUE);
+                    for (ConsumerRecord<Object, Object> record : records) {
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("offset = {}, key = {}, value = {}", 
record.offset(), record.key(), record.value());
+                        }
+                        Exchange exchange = 
endpoint.createKafkaExchange(record);
                         try {
                             processor.process(exchange);
                         } catch (Exception e) {
-                            LOG.error(e.getMessage(), e);
+                            getExceptionHandler().handleException("Error 
during processing", exchange, e);
                         }
                         processed++;
-                    } else {
-                        // we don't need to process the message
-                        hasNext = false;
-                    }
-                } catch (ConsumerTimeoutException e) {
-                    LOG.debug("Consumer timeout occurred due " + 
e.getMessage(), e);
-                    consumerTimeout = true;
-                }
-
-                if (processed >= endpoint.getBatchSize() || consumerTimeout 
-                    || (processed > 0 && !hasNext)) { // Need to commit the 
offset for the last round
-                    try {
-                        barrier.await(endpoint.getBarrierAwaitTimeoutMs(), 
TimeUnit.MILLISECONDS);
-                        if (!consumerTimeout) {
-                            processed = 0;
+                        // if autocommit is false
+                        if (endpoint.isAutoCommitEnable() != null && 
!endpoint.isAutoCommitEnable()) {
+                            if (processed >= endpoint.getBatchSize()) {
+                                consumer.commitSync();
+                                processed = 0;
+                            }
                         }
-                    } catch (Exception e) {
-                        getExceptionHandler().handleException("Error waiting 
for batch to complete", e);
-                        break;
                     }
                 }
+                LOG.debug("Unsubscribing {} from topic {}", threadId, 
topicName);
+                consumer.unsubscribe();
+            } catch (Exception e) {
+                getExceptionHandler().handleException("Error consuming " + 
threadId + " from kafka topic", e);
             }
         }
-    }
-
-    class CommitOffsetTask implements Runnable {
-
-        private final ConsumerConnector consumer;
-
-        public CommitOffsetTask(ConsumerConnector consumer) {
-            this.consumer = consumer;
-        }
 
-        @Override
-        public void run() {
-            LOG.debug("Commit offsets on consumer: {}", 
ObjectHelper.getIdentityHashCode(consumer));
-            consumer.commitOffsets();
-        }
     }
 
-    class AutoCommitConsumerTask implements Runnable {
-
-        private final ConsumerConnector consumer;
-        private KafkaStream<byte[], byte[]> stream;
-
-        public AutoCommitConsumerTask(ConsumerConnector consumer, 
KafkaStream<byte[], byte[]> stream) {
-            this.consumer = consumer;
-            this.stream = stream;
-        }
-
-        public void run() {
-            ConsumerIterator<byte[], byte[]> it = stream.iterator();
-            // only poll the next message if we are allowed to run and are not 
suspending
-            while (isRunAllowed() && !isSuspendingOrSuspended() && 
it.hasNext()) {
-                MessageAndMetadata<byte[], byte[]> mm = it.next();
-                Exchange exchange = endpoint.createKafkaExchange(mm);
-                try {
-                    processor.process(exchange);
-                } catch (Exception e) {
-                    getExceptionHandler().handleException("Error during 
processing", exchange, e);
-                }
-            }
-            // no more data so commit offset
-            LOG.debug("Commit offsets on consumer: {}", 
ObjectHelper.getIdentityHashCode(consumer));
-            consumer.commitOffsets();
-        }
-    }
 }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 760ec19a..5a56c39 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -16,10 +16,9 @@
  */
 package org.apache.camel.component.kafka;
 
+import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 
-import kafka.message.MessageAndMetadata;
-
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -29,6 +28,7 @@ import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 
 /**
  * The kafka component allows messages to be sent to (or consumed from) Apache 
Kafka brokers.
@@ -38,8 +38,7 @@ public class KafkaEndpoint extends DefaultEndpoint implements 
MultipleConsumersS
 
     @UriParam
     private KafkaConfiguration configuration = new KafkaConfiguration();
-
-    @UriParam(description = "If the option is true, then KafkaProducer will 
ignore the KafkaConstants.TOPIC header setting of the inbound message.", 
defaultValue = "false")
+    @UriParam
     private boolean bridgeEndpoint;
 
     public KafkaEndpoint() {
@@ -73,30 +72,7 @@ public class KafkaEndpoint extends DefaultEndpoint 
implements MultipleConsumersS
 
     @Override
     public Producer createProducer() throws Exception {
-        String msgClassName = getConfiguration().getSerializerClass();
-        String keyClassName = getConfiguration().getKeySerializerClass();
-        if (msgClassName == null) {
-            msgClassName = KafkaConstants.KAFKA_DEFAULT_ENCODER;
-        }
-        if (keyClassName == null) {
-            keyClassName = msgClassName;
-        }
-
-        ClassLoader cl = getClass().getClassLoader();
-
-        Class<?> k;
-        try {
-            k = cl.loadClass(keyClassName);
-        } catch (ClassNotFoundException x) {
-            k = 
getCamelContext().getClassResolver().resolveMandatoryClass(keyClassName);
-        }
-        Class<?> v;
-        try {
-            v = cl.loadClass(msgClassName);
-        } catch (ClassNotFoundException x) {
-            v = 
getCamelContext().getClassResolver().resolveMandatoryClass(msgClassName);
-        }
-        return createProducer(k, v, this);
+        return createProducer(this);
     }
 
     @Override
@@ -104,411 +80,597 @@ public class KafkaEndpoint extends DefaultEndpoint 
implements MultipleConsumersS
         return true;
     }
 
+    @Override
+    public boolean isMultipleConsumersSupported() {
+        return true;
+    }
+
     public ExecutorService createExecutor() {
         return 
getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, 
"KafkaTopic[" + configuration.getTopic() + "]", 
configuration.getConsumerStreams());
     }
 
-    public Exchange createKafkaExchange(MessageAndMetadata<byte[], byte[]> mm) 
{
+    public Exchange createKafkaExchange(ConsumerRecord record) {
         Exchange exchange = super.createExchange();
 
         Message message = exchange.getIn();
-        message.setHeader(KafkaConstants.PARTITION, mm.partition());
-        message.setHeader(KafkaConstants.TOPIC, mm.topic());
-        message.setHeader(KafkaConstants.OFFSET, mm.offset());
-        if (mm.key() != null) {
-            message.setHeader(KafkaConstants.KEY, new String(mm.key()));
+        message.setHeader(KafkaConstants.PARTITION, record.partition());
+        message.setHeader(KafkaConstants.TOPIC, record.topic());
+        message.setHeader(KafkaConstants.OFFSET, record.offset());
+        if (record.key() != null) {
+            message.setHeader(KafkaConstants.KEY, record.key());
         }
-        message.setBody(mm.message());
+        message.setBody(record.value());
 
         return exchange;
     }
 
-    protected <K, V> KafkaProducer<K, V> createProducer(Class<K> keyClass, 
Class<V> valueClass, KafkaEndpoint endpoint) {
-        return new KafkaProducer<K, V>(endpoint);
+    protected KafkaProducer createProducer(KafkaEndpoint endpoint) {
+        return new KafkaProducer(endpoint);
     }
 
     // Delegated properties from the configuration
     //-------------------------------------------------------------------------
 
-    public String getZookeeperConnect() {
-        return configuration.getZookeeperConnect();
+    public Properties createProducerProperties() {
+        return configuration.createProducerProperties();
+    }
+
+    public void setValueDeserializer(String valueDeserializer) {
+        configuration.setValueDeserializer(valueDeserializer);
     }
 
-    public void setZookeeperConnect(String zookeeperConnect) {
-        configuration.setZookeeperConnect(zookeeperConnect);
+    public void setRequestTimeoutMs(Integer requestTimeoutMs) {
+        configuration.setRequestTimeoutMs(requestTimeoutMs);
     }
 
-    public String getZookeeperHost() {
-        return configuration.getZookeeperHost();
+    public void setProducerBatchSize(Integer producerBatchSize) {
+        configuration.setProducerBatchSize(producerBatchSize);
     }
 
-    public void setZookeeperHost(String zookeeperHost) {
-        configuration.setZookeeperHost(zookeeperHost);
+    public void setRetryBackoffMs(Integer retryBackoffMs) {
+        configuration.setRetryBackoffMs(retryBackoffMs);
     }
 
-    public int getZookeeperPort() {
-        return configuration.getZookeeperPort();
+    public void setNoOfMetricsSample(Integer noOfMetricsSample) {
+        configuration.setNoOfMetricsSample(noOfMetricsSample);
     }
 
-    public void setZookeeperPort(int zookeeperPort) {
-        configuration.setZookeeperPort(zookeeperPort);
+    public String getMetricReporters() {
+        return configuration.getMetricReporters();
     }
 
-    public String getGroupId() {
-        return configuration.getGroupId();
+    public void setSslKeystoreType(String sslKeystoreType) {
+        configuration.setSslKeystoreType(sslKeystoreType);
+    }
+
+    public void setSslCipherSuites(String sslCipherSuites) {
+        configuration.setSslCipherSuites(sslCipherSuites);
+    }
+
+    public void setClientId(String clientId) {
+        configuration.setClientId(clientId);
+    }
+
+    public void setMetricsSampleWindowMs(Integer metricsSampleWindowMs) {
+        configuration.setMetricsSampleWindowMs(metricsSampleWindowMs);
+    }
+
+    public String getKeyDeserializer() {
+        return configuration.getKeyDeserializer();
+    }
+
+    public int getConsumersCount() {
+        return configuration.getConsumersCount();
+    }
+
+    public String getSslKeyPassword() {
+        return configuration.getSslKeyPassword();
+    }
+
+    public void setSendBufferBytes(Integer sendBufferBytes) {
+        configuration.setSendBufferBytes(sendBufferBytes);
+    }
+
+    public Boolean isAutoCommitEnable() {
+        return configuration.isAutoCommitEnable();
+    }
+
+    public Integer getMaxBlockMs() {
+        return configuration.getMaxBlockMs();
+    }
+
+    public String getConsumerId() {
+        return configuration.getConsumerId();
+    }
+
+    public void setSslProtocol(String sslProtocol) {
+        configuration.setSslProtocol(sslProtocol);
+    }
+
+    public void setReceiveBufferBytes(Integer receiveBufferBytes) {
+        configuration.setReceiveBufferBytes(receiveBufferBytes);
+    }
+
+    public Boolean getCheckCrcs() {
+        return configuration.getCheckCrcs();
     }
 
     public void setGroupId(String groupId) {
         configuration.setGroupId(groupId);
     }
 
-    public String getPartitioner() {
-        return configuration.getPartitioner();
+    public String getCompressionCodec() {
+        return configuration.getCompressionCodec();
     }
 
-    public void setPartitioner(String partitioner) {
-        configuration.setPartitioner(partitioner);
+    public String getGroupId() {
+        return configuration.getGroupId();
     }
 
-    public String getTopic() {
-        return configuration.getTopic();
+    public void setSslTruststoreLocation(String sslTruststoreLocation) {
+        configuration.setSslTruststoreLocation(sslTruststoreLocation);
     }
 
-    public void setTopic(String topic) {
-        configuration.setTopic(topic);
+    public String getKerberosInitCmd() {
+        return configuration.getKerberosInitCmd();
     }
 
-    public String getBrokers() {
-        return configuration.getBrokers();
+    public String getAutoOffsetReset() {
+        return configuration.getAutoOffsetReset();
+    }
+
+    public void setAutoCommitEnable(Boolean autoCommitEnable) {
+        configuration.setAutoCommitEnable(autoCommitEnable);
+    }
+
+    public void setSerializerClass(String serializerClass) {
+        configuration.setSerializerClass(serializerClass);
+    }
+
+    public Integer getQueueBufferingMaxMessages() {
+        return configuration.getQueueBufferingMaxMessages();
+    }
+
+    public void setSslEndpointAlgorithm(String sslEndpointAlgorithm) {
+        configuration.setSslEndpointAlgorithm(sslEndpointAlgorithm);
+    }
+
+    public void setRetries(Integer retries) {
+        configuration.setRetries(retries);
+    }
+
+    public void setAutoOffsetReset(String autoOffsetReset) {
+        configuration.setAutoOffsetReset(autoOffsetReset);
+    }
+
+    public Integer getSessionTimeoutMs() {
+        return configuration.getSessionTimeoutMs();
+    }
+
+    public Integer getBufferMemorySize() {
+        return configuration.getBufferMemorySize();
+    }
+
+    public String getKeySerializerClass() {
+        return configuration.getKeySerializerClass();
+    }
+
+    public void setSslProvider(String sslProvider) {
+        configuration.setSslProvider(sslProvider);
+    }
+
+    public void setFetchMinBytes(Integer fetchMinBytes) {
+        configuration.setFetchMinBytes(fetchMinBytes);
+    }
+
+    public Integer getAutoCommitIntervalMs() {
+        return configuration.getAutoCommitIntervalMs();
+    }
+
+    public void setKeySerializerClass(String keySerializerClass) {
+        configuration.setKeySerializerClass(keySerializerClass);
+    }
+
+    public Integer getConnectionMaxIdleMs() {
+        return configuration.getConnectionMaxIdleMs();
+    }
+
+    public Integer getReceiveBufferBytes() {
+        return configuration.getReceiveBufferBytes();
     }
 
     public void setBrokers(String brokers) {
         configuration.setBrokers(brokers);
     }
 
-    public int getConsumerStreams() {
-        return configuration.getConsumerStreams();
+    public void setMetadataFetchTimeoutMs(Integer metadataFetchTimeoutMs) {
+        configuration.setMetadataFetchTimeoutMs(metadataFetchTimeoutMs);
     }
 
-    public void setConsumerStreams(int consumerStreams) {
-        configuration.setConsumerStreams(consumerStreams);
+    public String getValueDeserializer() {
+        return configuration.getValueDeserializer();
     }
 
-    public int getBatchSize() {
-        return configuration.getBatchSize();
+    public String getPartitioner() {
+        return configuration.getPartitioner();
     }
 
-    public void setBatchSize(int batchSize) {
-        this.configuration.setBatchSize(batchSize);
+    public String getSslTruststoreLocation() {
+        return configuration.getSslTruststoreLocation();
+    }
+
+    public void setBarrierAwaitTimeoutMs(int barrierAwaitTimeoutMs) {
+        configuration.setBarrierAwaitTimeoutMs(barrierAwaitTimeoutMs);
+    }
+
+    public String getSslProvider() {
+        return configuration.getSslProvider();
+    }
+
+    public void setMetricReporters(String metricReporters) {
+        configuration.setMetricReporters(metricReporters);
+    }
+
+    public void setSslTruststorePassword(String sslTruststorePassword) {
+        configuration.setSslTruststorePassword(sslTruststorePassword);
+    }
+
+    public void setMaxInFlightRequest(Integer maxInFlightRequest) {
+        configuration.setMaxInFlightRequest(maxInFlightRequest);
+    }
+
+    public String getTopic() {
+        return configuration.getTopic();
     }
 
     public int getBarrierAwaitTimeoutMs() {
         return configuration.getBarrierAwaitTimeoutMs();
     }
 
-    public void setBarrierAwaitTimeoutMs(int barrierAwaitTimeoutMs) {
-        this.configuration.setBarrierAwaitTimeoutMs(barrierAwaitTimeoutMs);
+    public Integer getFetchMinBytes() {
+        return configuration.getFetchMinBytes();
     }
 
-    public int getConsumersCount() {
-        return this.configuration.getConsumersCount();
+    public Integer getHeartbeatIntervalMs() {
+        return configuration.getHeartbeatIntervalMs();
     }
 
-    public void setConsumersCount(int consumersCount) {
-        this.configuration.setConsumersCount(consumersCount);
+    public void setKeyDeserializer(String keyDeserializer) {
+        configuration.setKeyDeserializer(keyDeserializer);
     }
 
-    public void setConsumerTimeoutMs(int consumerTimeoutMs) {
-        configuration.setConsumerTimeoutMs(consumerTimeoutMs);
+    public Integer getMaxRequestSize() {
+        return configuration.getMaxRequestSize();
     }
 
-    public void setSerializerClass(String serializerClass) {
-        configuration.setSerializerClass(serializerClass);
+    public void setMetadataMaxAgeMs(Integer metadataMaxAgeMs) {
+        configuration.setMetadataMaxAgeMs(metadataMaxAgeMs);
     }
 
-    public void setQueueBufferingMaxMessages(int queueBufferingMaxMessages) {
-        configuration.setQueueBufferingMaxMessages(queueBufferingMaxMessages);
+    public String getSslKeystoreType() {
+        return configuration.getSslKeystoreType();
     }
 
-    public int getFetchWaitMaxMs() {
-        return configuration.getFetchWaitMaxMs();
+    public void setKerberosRenewWindowFactor(Double kerberosRenewWindowFactor) 
{
+        configuration.setKerberosRenewWindowFactor(kerberosRenewWindowFactor);
     }
 
-    public Integer getZookeeperConnectionTimeoutMs() {
-        return configuration.getZookeeperConnectionTimeoutMs();
+    public Integer getKerberosBeforeReloginMinTime() {
+        return configuration.getKerberosBeforeReloginMinTime();
     }
 
-    public void setZookeeperConnectionTimeoutMs(Integer 
zookeeperConnectionTimeoutMs) {
-        
configuration.setZookeeperConnectionTimeoutMs(zookeeperConnectionTimeoutMs);
+    public String getSslEnabledProtocols() {
+        return configuration.getSslEnabledProtocols();
     }
 
-    public void setMessageSendMaxRetries(int messageSendMaxRetries) {
-        configuration.setMessageSendMaxRetries(messageSendMaxRetries);
+    public Integer getMaxInFlightRequest() {
+        return configuration.getMaxInFlightRequest();
     }
 
-    public int getQueueBufferingMaxMs() {
-        return configuration.getQueueBufferingMaxMs();
+    public Integer getProducerBatchSize() {
+        return configuration.getProducerBatchSize();
     }
 
-    public void setRequestRequiredAcks(short requestRequiredAcks) {
-        configuration.setRequestRequiredAcks(requestRequiredAcks);
+    public void setSslKeystorePassword(String sslKeystorePassword) {
+        configuration.setSslKeystorePassword(sslKeystorePassword);
     }
 
-    public Integer getRebalanceBackoffMs() {
-        return configuration.getRebalanceBackoffMs();
+    public Boolean getBlockOnBufferFull() {
+        return configuration.getBlockOnBufferFull();
     }
 
-    public void setQueueEnqueueTimeoutMs(int queueEnqueueTimeoutMs) {
-        configuration.setQueueEnqueueTimeoutMs(queueEnqueueTimeoutMs);
+    public void setCheckCrcs(Boolean checkCrcs) {
+        configuration.setCheckCrcs(checkCrcs);
     }
 
-    public int getFetchMessageMaxBytes() {
-        return configuration.getFetchMessageMaxBytes();
+    public int getConsumerStreams() {
+        return configuration.getConsumerStreams();
     }
 
-    public int getQueuedMaxMessages() {
-        return configuration.getQueuedMaxMessageChunks();
+    public void setConsumersCount(int consumersCount) {
+        configuration.setConsumersCount(consumersCount);
     }
 
-    public int getAutoCommitIntervalMs() {
-        return configuration.getAutoCommitIntervalMs();
+    public int getBatchSize() {
+        return configuration.getBatchSize();
     }
 
-    public void setSocketTimeoutMs(int socketTimeoutMs) {
-        configuration.setSocketTimeoutMs(socketTimeoutMs);
+    public void setAutoCommitIntervalMs(Integer autoCommitIntervalMs) {
+        configuration.setAutoCommitIntervalMs(autoCommitIntervalMs);
     }
 
-    public void setAutoCommitIntervalMs(int autoCommitIntervalMs) {
-        configuration.setAutoCommitIntervalMs(autoCommitIntervalMs);
+    public void setSslTruststoreType(String sslTruststoreType) {
+        configuration.setSslTruststoreType(sslTruststoreType);
     }
 
-    public void setRequestTimeoutMs(int requestTimeoutMs) {
-        configuration.setRequestTimeoutMs(requestTimeoutMs);
+    public Integer getConsumerRequestTimeoutMs() {
+        return configuration.getConsumerRequestTimeoutMs();
     }
 
-    public void setCompressedTopics(String compressedTopics) {
-        configuration.setCompressedTopics(compressedTopics);
+    public String getSslKeystorePassword() {
+        return configuration.getSslKeystorePassword();
     }
 
-    public int getSocketReceiveBufferBytes() {
-        return configuration.getSocketReceiveBufferBytes();
+    public void setSslKeyPassword(String sslKeyPassword) {
+        configuration.setSslKeyPassword(sslKeyPassword);
     }
 
-    public void setSendBufferBytes(int sendBufferBytes) {
-        configuration.setSendBufferBytes(sendBufferBytes);
+    public void setBlockOnBufferFull(Boolean blockOnBufferFull) {
+        configuration.setBlockOnBufferFull(blockOnBufferFull);
     }
 
-    public void setFetchMessageMaxBytes(int fetchMessageMaxBytes) {
-        configuration.setFetchMessageMaxBytes(fetchMessageMaxBytes);
+    public Integer getRequestRequiredAcks() {
+        return configuration.getRequestRequiredAcks();
     }
 
-    public int getRefreshLeaderBackoffMs() {
-        return configuration.getRefreshLeaderBackoffMs();
+    public Double getKerberosRenewWindowFactor() {
+        return configuration.getKerberosRenewWindowFactor();
     }
 
-    public void setFetchWaitMaxMs(int fetchWaitMaxMs) {
-        configuration.setFetchWaitMaxMs(fetchWaitMaxMs);
+    public void setKerberosInitCmd(String kerberosInitCmd) {
+        configuration.setKerberosInitCmd(kerberosInitCmd);
     }
 
-    public int getTopicMetadataRefreshIntervalMs() {
-        return configuration.getTopicMetadataRefreshIntervalMs();
+    public Integer getRetryBackoffMs() {
+        return configuration.getRetryBackoffMs();
     }
 
-    public void setZookeeperSessionTimeoutMs(int zookeeperSessionTimeoutMs) {
-        configuration.setZookeeperSessionTimeoutMs(zookeeperSessionTimeoutMs);
+    public void setSslTrustmanagerAlgorithm(String sslTrustmanagerAlgorithm) {
+        configuration.setSslTrustmanagerAlgorithm(sslTrustmanagerAlgorithm);
     }
 
-    public Integer getConsumerTimeoutMs() {
-        return configuration.getConsumerTimeoutMs();
+    public void setConsumerRequestTimeoutMs(Integer consumerRequestTimeoutMs) {
+        configuration.setConsumerRequestTimeoutMs(consumerRequestTimeoutMs);
     }
 
-    public void setAutoCommitEnable(boolean autoCommitEnable) {
-        configuration.setAutoCommitEnable(autoCommitEnable);
+    public void setReconnectBackoffMs(Integer reconnectBackoffMs) {
+        configuration.setReconnectBackoffMs(reconnectBackoffMs);
     }
 
-    public String getCompressionCodec() {
-        return configuration.getCompressionCodec();
+    public void setKerberosRenewJitter(Double kerberosRenewJitter) {
+        configuration.setKerberosRenewJitter(kerberosRenewJitter);
+    }
+
+    public String getSslKeystoreLocation() {
+        return configuration.getSslKeystoreLocation();
     }
 
-    public void setProducerType(String producerType) {
-        configuration.setProducerType(producerType);
+    public Integer getNoOfMetricsSample() {
+        return configuration.getNoOfMetricsSample();
+    }
+
+    public String getSslKeymanagerAlgorithm() {
+        return configuration.getSslKeymanagerAlgorithm();
+    }
+
+    public void setConsumerId(String consumerId) {
+        configuration.setConsumerId(consumerId);
     }
 
     public String getClientId() {
         return configuration.getClientId();
     }
 
-    public int getFetchMinBytes() {
-        return configuration.getFetchMinBytes();
+    public void setFetchWaitMaxMs(Integer fetchWaitMaxMs) {
+        configuration.setFetchWaitMaxMs(fetchWaitMaxMs);
     }
 
-    public String getAutoOffsetReset() {
-        return configuration.getAutoOffsetReset();
+    public String getSslCipherSuites() {
+        return configuration.getSslCipherSuites();
     }
 
-    public void setRefreshLeaderBackoffMs(int refreshLeaderBackoffMs) {
-        configuration.setRefreshLeaderBackoffMs(refreshLeaderBackoffMs);
+    public void setRequestRequiredAcks(Integer requestRequiredAcks) {
+        configuration.setRequestRequiredAcks(requestRequiredAcks);
     }
 
-    public void setAutoOffsetReset(String autoOffsetReset) {
-        configuration.setAutoOffsetReset(autoOffsetReset);
+    public void setConnectionMaxIdleMs(Integer connectionMaxIdleMs) {
+        configuration.setConnectionMaxIdleMs(connectionMaxIdleMs);
     }
 
-    public void setConsumerId(String consumerId) {
-        configuration.setConsumerId(consumerId);
+    public String getSslTrustmanagerAlgorithm() {
+        return configuration.getSslTrustmanagerAlgorithm();
     }
 
-    public int getRetryBackoffMs() {
-        return configuration.getRetryBackoffMs();
+    public String getSslTruststorePassword() {
+        return configuration.getSslTruststorePassword();
     }
 
-    public int getRebalanceMaxRetries() {
-        return configuration.getRebalanceMaxRetries();
+    public void setTimeoutMs(Integer timeoutMs) {
+        configuration.setTimeoutMs(timeoutMs);
     }
 
-    public Boolean isAutoCommitEnable() {
-        return configuration.isAutoCommitEnable();
+    public void setConsumerStreams(int consumerStreams) {
+        configuration.setConsumerStreams(consumerStreams);
     }
 
-    public void setQueueBufferingMaxMs(int queueBufferingMaxMs) {
-        configuration.setQueueBufferingMaxMs(queueBufferingMaxMs);
+    public String getSslTruststoreType() {
+        return configuration.getSslTruststoreType();
     }
 
-    public void setRebalanceMaxRetries(int rebalanceMaxRetries) {
-        configuration.setRebalanceMaxRetries(rebalanceMaxRetries);
+    public String getSecurityProtocol() {
+        return configuration.getSecurityProtocol();
     }
 
-    public int getZookeeperSessionTimeoutMs() {
-        return configuration.getZookeeperSessionTimeoutMs();
+    public void setBufferMemorySize(Integer bufferMemorySize) {
+        configuration.setBufferMemorySize(bufferMemorySize);
     }
 
-    public void setKeySerializerClass(String keySerializerClass) {
-        configuration.setKeySerializerClass(keySerializerClass);
+    public void setSaslKerberosServiceName(String saslKerberosServiceName) {
+        configuration.setSaslKerberosServiceName(saslKerberosServiceName);
     }
 
     public void setCompressionCodec(String compressionCodec) {
         configuration.setCompressionCodec(compressionCodec);
     }
 
-    public void setClientId(String clientId) {
-        configuration.setClientId(clientId);
+    public void setKerberosBeforeReloginMinTime(Integer 
kerberosBeforeReloginMinTime) {
+        
configuration.setKerberosBeforeReloginMinTime(kerberosBeforeReloginMinTime);
     }
 
-    public int getSocketTimeoutMs() {
-        return configuration.getSocketTimeoutMs();
+    public Integer getMetadataMaxAgeMs() {
+        return configuration.getMetadataMaxAgeMs();
     }
 
-    public String getCompressedTopics() {
-        return configuration.getCompressedTopics();
+    public String getSerializerClass() {
+        return configuration.getSerializerClass();
     }
 
-    public int getZookeeperSyncTimeMs() {
-        return configuration.getZookeeperSyncTimeMs();
+    public void setSslKeymanagerAlgorithm(String sslKeymanagerAlgorithm) {
+        configuration.setSslKeymanagerAlgorithm(sslKeymanagerAlgorithm);
     }
 
-    public void setSocketReceiveBufferBytes(int socketReceiveBufferBytes) {
-        configuration.setSocketReceiveBufferBytes(socketReceiveBufferBytes);
+    public void setMaxRequestSize(Integer maxRequestSize) {
+        configuration.setMaxRequestSize(maxRequestSize);
     }
 
-    public int getQueueEnqueueTimeoutMs() {
-        return configuration.getQueueEnqueueTimeoutMs();
+    public Double getKerberosRenewJitter() {
+        return configuration.getKerberosRenewJitter();
     }
 
-    public int getQueueBufferingMaxMessages() {
-        return configuration.getQueueBufferingMaxMessages();
+    public String getPartitionAssignor() {
+        return configuration.getPartitionAssignor();
     }
 
-    public void setZookeeperSyncTimeMs(int zookeeperSyncTimeMs) {
-        configuration.setZookeeperSyncTimeMs(zookeeperSyncTimeMs);
+    public Integer getMetadataFetchTimeoutMs() {
+        return configuration.getMetadataFetchTimeoutMs();
     }
 
-    public String getKeySerializerClass() {
-        return configuration.getKeySerializerClass();
+    public void setSecurityProtocol(String securityProtocol) {
+        configuration.setSecurityProtocol(securityProtocol);
     }
 
-    public void setTopicMetadataRefreshIntervalMs(int 
topicMetadataRefreshIntervalMs) {
-        
configuration.setTopicMetadataRefreshIntervalMs(topicMetadataRefreshIntervalMs);
+    public void setQueueBufferingMaxMessages(Integer 
queueBufferingMaxMessages) {
+        configuration.setQueueBufferingMaxMessages(queueBufferingMaxMessages);
     }
 
-    public void setBatchNumMessages(int batchNumMessages) {
-        configuration.setBatchNumMessages(batchNumMessages);
+    public String getSaslKerberosServiceName() {
+        return configuration.getSaslKerberosServiceName();
     }
 
-    public int getSendBufferBytes() {
-        return configuration.getSendBufferBytes();
+    public void setBatchSize(int batchSize) {
+        configuration.setBatchSize(batchSize);
     }
 
-    public void setRebalanceBackoffMs(Integer rebalanceBackoffMs) {
-        configuration.setRebalanceBackoffMs(rebalanceBackoffMs);
+    public Integer getLingerMs() {
+        return configuration.getLingerMs();
     }
 
-    public void setQueuedMaxMessages(int queuedMaxMessages) {
-        configuration.setQueuedMaxMessageChunks(queuedMaxMessages);
+    public Integer getRetries() {
+        return configuration.getRetries();
     }
 
-    public void setRetryBackoffMs(int retryBackoffMs) {
-        configuration.setRetryBackoffMs(retryBackoffMs);
+    public Integer getMaxPartitionFetchBytes() {
+        return configuration.getMaxPartitionFetchBytes();
     }
 
-    public int getBatchNumMessages() {
-        return configuration.getBatchNumMessages();
+    public String getSslEndpointAlgorithm() {
+        return configuration.getSslEndpointAlgorithm();
     }
 
-    public short getRequestRequiredAcks() {
-        return configuration.getRequestRequiredAcks();
+    public Integer getReconnectBackoffMs() {
+        return configuration.getReconnectBackoffMs();
     }
 
-    public String getProducerType() {
-        return configuration.getProducerType();
+    public void setLingerMs(Integer lingerMs) {
+        configuration.setLingerMs(lingerMs);
     }
 
-    public String getConsumerId() {
-        return configuration.getConsumerId();
+    public void setPartitionAssignor(String partitionAssignor) {
+        configuration.setPartitionAssignor(partitionAssignor);
     }
 
-    public int getMessageSendMaxRetries() {
-        return configuration.getMessageSendMaxRetries();
+    public Integer getRequestTimeoutMs() {
+        return configuration.getRequestTimeoutMs();
     }
 
-    public void setFetchMinBytes(int fetchMinBytes) {
-        configuration.setFetchMinBytes(fetchMinBytes);
+    public Properties createConsumerProperties() {
+        return configuration.createConsumerProperties();
     }
 
-    public String getSerializerClass() {
-        return configuration.getSerializerClass();
+    public void setTopic(String topic) {
+        configuration.setTopic(topic);
     }
 
-    public int getRequestTimeoutMs() {
-        return configuration.getRequestTimeoutMs();
+    public Integer getFetchWaitMaxMs() {
+        return configuration.getFetchWaitMaxMs();
     }
 
-    @Override
-    public boolean isMultipleConsumersSupported() {
-        return true;
+    public void setSessionTimeoutMs(Integer sessionTimeoutMs) {
+        configuration.setSessionTimeoutMs(sessionTimeoutMs);
     }
 
-    public boolean isBridgeEndpoint() {
-        return bridgeEndpoint;
+    public void setSslEnabledProtocols(String sslEnabledProtocols) {
+        configuration.setSslEnabledProtocols(sslEnabledProtocols);
     }
 
-    public void setBridgeEndpoint(boolean bridgeEndpoint) {
-        this.bridgeEndpoint = bridgeEndpoint;
+    public void setHeartbeatIntervalMs(Integer heartbeatIntervalMs) {
+        configuration.setHeartbeatIntervalMs(heartbeatIntervalMs);
     }
 
-    public String getOffsetsStorage() {
-        return configuration.getOffsetsStorage();
+    public void setMaxBlockMs(Integer maxBlockMs) {
+        configuration.setMaxBlockMs(maxBlockMs);
     }
 
-    public void setOffsetsStorage(String offsetsStorage) {
-        configuration.setOffsetsStorage(offsetsStorage);
+    public void setSslKeystoreLocation(String sslKeystoreLocation) {
+        configuration.setSslKeystoreLocation(sslKeystoreLocation);
     }
 
-    public Boolean isDualCommitEnabled() {
-        return configuration.isDualCommitEnabled();
+    public void setMaxPartitionFetchBytes(Integer maxPartitionFetchBytes) {
+        configuration.setMaxPartitionFetchBytes(maxPartitionFetchBytes);
+    }
+
+    public void setPartitioner(String partitioner) {
+        configuration.setPartitioner(partitioner);
+    }
+
+    public String getBrokers() {
+        return configuration.getBrokers();
     }
 
-    public void setDualCommitEnabled(boolean dualCommitEnabled) {
-        configuration.setDualCommitEnabled(dualCommitEnabled);
+    public Integer getMetricsSampleWindowMs() {
+        return configuration.getMetricsSampleWindowMs();
     }
 
+    public Integer getSendBufferBytes() {
+        return configuration.getSendBufferBytes();
+    }
+
+    public Integer getTimeoutMs() {
+        return configuration.getTimeoutMs();
+    }
+
+    public String getSslProtocol() {
+        return configuration.getSslProtocol();
+    }
+
+    public boolean isBridgeEndpoint() {
+        return bridgeEndpoint;
+    }
+
+    /**
+     * If the option is true, then KafkaProducer will ignore the 
KafkaConstants.TOPIC header setting of the inbound message.
+     */
+    public void setBridgeEndpoint(boolean bridgeEndpoint) {
+        this.bridgeEndpoint = bridgeEndpoint;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 06a0317..6f9ea79 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -18,20 +18,16 @@ package org.apache.camel.component.kafka;
 
 import java.util.Properties;
 
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
 import org.apache.camel.CamelException;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
 
-/**
- *
- */
-public class KafkaProducer<K, V> extends DefaultProducer {
+public class KafkaProducer extends DefaultProducer {
 
-    protected Producer<K, V> producer;
+    private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer;
     private final KafkaEndpoint endpoint;
 
     public KafkaProducer(KafkaEndpoint endpoint) {
@@ -39,26 +35,38 @@ public class KafkaProducer<K, V> extends DefaultProducer {
         this.endpoint = endpoint;
     }
 
-    @Override
-    protected void doStop() throws Exception {
-        if (producer != null) {
-            producer.close();
-        }
-    }
-
     Properties getProps() {
         Properties props = 
endpoint.getConfiguration().createProducerProperties();
         if (endpoint.getBrokers() != null) {
-            props.put("metadata.broker.list", endpoint.getBrokers());
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
endpoint.getBrokers());
         }
         return props;
     }
 
+    public org.apache.kafka.clients.producer.KafkaProducer getKafkaProducer() {
+        return kafkaProducer;
+    }
+
+    /**
+     * To use a custom {@link org.apache.kafka.clients.producer.KafkaProducer} 
instance.
+     */
+    public void 
setKafkaProducer(org.apache.kafka.clients.producer.KafkaProducer kafkaProducer) 
{
+        this.kafkaProducer = kafkaProducer;
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (kafkaProducer != null) {
+            kafkaProducer.close();
+        }
+    }
+
     @Override
     protected void doStart() throws Exception {
         Properties props = getProps();
-        ProducerConfig config = new ProducerConfig(props);
-        producer = new Producer<K, V>(config);
+        if (kafkaProducer == null) {
+            kafkaProducer = new 
org.apache.kafka.clients.producer.KafkaProducer(props);
+        }
     }
 
     @Override
@@ -71,26 +79,26 @@ public class KafkaProducer<K, V> extends DefaultProducer {
         if (topic == null) {
             throw new CamelExchangeException("No topic key set", exchange);
         }
-        K partitionKey = (K) 
exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY);
+        Object partitionKey = 
exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY);
         boolean hasPartitionKey = partitionKey != null;
 
-        K messageKey = (K) exchange.getIn().getHeader(KafkaConstants.KEY);
+        Object messageKey = exchange.getIn().getHeader(KafkaConstants.KEY);
         boolean hasMessageKey = messageKey != null;
 
-        V msg = (V) exchange.getIn().getBody();
-        KeyedMessage<K, V> data;
+        Object msg = exchange.getIn().getBody();
 
+        ProducerRecord record;
         if (hasPartitionKey && hasMessageKey) {
-            data = new KeyedMessage<K, V>(topic, messageKey, partitionKey, 
msg);
-        } else if (hasPartitionKey) {
-            data = new KeyedMessage<K, V>(topic, partitionKey, msg);
+            record = new ProducerRecord(topic, new 
Integer(partitionKey.toString()), messageKey, msg);
         } else if (hasMessageKey) {
-            data = new KeyedMessage<K, V>(topic, messageKey, msg);
+            record = new ProducerRecord(topic, messageKey, msg);
         } else {
             log.warn("No message key or partition key set");
-            data = new KeyedMessage<K, V>(topic, messageKey, partitionKey, 
msg);
+            record = new ProducerRecord(topic, msg);
         }
-        producer.send(data);
+
+        // TODO: add support for async callback in the send
+        kafkaProducer.send(record);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
index 0d2b003..d87f885 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
@@ -30,26 +30,23 @@ import org.apache.camel.test.AvailablePortFinder;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class BaseEmbeddedKafkaTest extends CamelTestSupport {
+
     static EmbeddedZookeeper embeddedZookeeper;
     static EmbeddedKafkaCluster embeddedKafkaCluster;
-    
-    private static final Logger LOG = 
LoggerFactory.getLogger(BaseEmbeddedKafkaTest.class);
 
     private static volatile int zookeeperPort;
-    
+
     private static volatile int karfkaPort;
-   
+
     @BeforeClass
     public static void beforeClass() {
         // start from somewhere in the 23xxx range
         zookeeperPort = AvailablePortFinder.getNextAvailable(23000);
         // find another ports for proxy route test
         karfkaPort = AvailablePortFinder.getNextAvailable(24000);
-        
+
         embeddedZookeeper = new EmbeddedZookeeper(zookeeperPort);
         List<Integer> kafkaPorts = new ArrayList<Integer>();
         // -1 for any available port
@@ -60,9 +57,9 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport {
         } catch (IOException e) {
             e.printStackTrace();
         }
-        LOG.info("Embedded Zookeeper connection: " + 
embeddedZookeeper.getConnection());
+        System.out.println("### Embedded Zookeeper connection: " + 
embeddedZookeeper.getConnection());
         embeddedKafkaCluster.startup();
-        LOG.info("Embedded Kafka cluster broker list: " + 
embeddedKafkaCluster.getBrokerList());
+        System.out.println("### Embedded Kafka cluster broker list: " + 
embeddedKafkaCluster.getBrokerList());
     }
 
     @AfterClass
@@ -70,7 +67,7 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport {
         embeddedKafkaCluster.shutdown();
         embeddedZookeeper.shutdown();
     }
-    
+
     @Override
     protected JndiRegistry createRegistry() throws Exception {
         JndiRegistry jndi = super.createRegistry();
@@ -81,7 +78,6 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport {
         jndi.bind("prop", prop);
         return jndi;
     }
-    
 
     @Override
     protected CamelContext createCamelContext() throws Exception {
@@ -89,12 +85,11 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport 
{
         context.addComponent("properties", new 
PropertiesComponent("ref:prop"));
         return context;
     }
-    
 
     protected static int getZookeeperPort() {
         return zookeeperPort;
     }
-    
+
     protected static int getKarfkaPort() {
         return karfkaPort;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
index eb6dd09..31c2dd6 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
@@ -18,14 +18,17 @@ package org.apache.camel.component.kafka;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.spi.Registry;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
 import org.junit.Test;
 import org.mockito.Mockito;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 
 public class KafkaComponentTest {
 
@@ -34,10 +37,6 @@ public class KafkaComponentTest {
     @Test
     public void testPropertiesSet() throws Exception {
         Map<String, Object> params = new HashMap<String, Object>();
-        params.put("zookeeperHost", "somehost");
-        params.put("zookeeperPort", 2987);
-        params.put("portNumber", 14123);
-        params.put("consumerStreams", "3");
         params.put("topic", "mytopic");
         params.put("partitioner", "com.class.Party");
 
@@ -45,97 +44,166 @@ public class KafkaComponentTest {
         String remaining = "broker1:12345,broker2:12566";
 
         KafkaEndpoint endpoint = new 
KafkaComponent(context).createEndpoint(uri, remaining, params);
-        assertEquals("somehost:2987", endpoint.getZookeeperConnect());
-        assertEquals("somehost", endpoint.getZookeeperHost());
-        assertEquals(2987, endpoint.getZookeeperPort());
         assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers());
         assertEquals("mytopic", endpoint.getTopic());
-        assertEquals(3, endpoint.getConsumerStreams());
         assertEquals("com.class.Party", endpoint.getPartitioner());
     }
 
     @Test
-    public void testZookeeperConnectPropertyOverride() throws Exception {
+    public void testAllProducerConfigProperty() throws Exception {
         Map<String, Object> params = new HashMap<String, Object>();
-        params.put("zookeeperConnect", "thehost:2181/chroot");
-        params.put("zookeeperHost", "somehost");
-        params.put("zookeeperPort", 2987);
-        params.put("portNumber", 14123);
-        params.put("consumerStreams", "3");
-        params.put("topic", "mytopic");
-        params.put("partitioner", "com.class.Party");
+        setProducerProperty(params);
 
-        String uri = "kafka:broker1:12345,broker2:12566";
-        String remaining = "broker1:12345,broker2:12566";
+        String uri = "kafka:dev1:12345,dev2:12566";
+        String remaining = "dev1:12345,dev2:12566";
 
         KafkaEndpoint endpoint = new 
KafkaComponent(context).createEndpoint(uri, remaining, params);
-        assertEquals("thehost:2181/chroot", endpoint.getZookeeperConnect());
-        assertNull(endpoint.getZookeeperHost());
-        assertEquals(-1, endpoint.getZookeeperPort());
-        assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers());
-        assertEquals("mytopic", endpoint.getTopic());
-        assertEquals(3, endpoint.getConsumerStreams());
-        assertEquals("com.class.Party", endpoint.getPartitioner());
+
+        assertEquals(new Integer(0), endpoint.getRequestRequiredAcks());
+        assertEquals(new Integer(1), endpoint.getBufferMemorySize());
+        assertEquals(new Integer(10), endpoint.getProducerBatchSize());
+        assertEquals(new Integer(12), endpoint.getConnectionMaxIdleMs());
+        assertEquals(new Integer(1), endpoint.getMaxBlockMs());
+        assertEquals(false, endpoint.getBlockOnBufferFull());
+        assertEquals(new Integer(1), endpoint.getBufferMemorySize());
+        assertEquals("testing", endpoint.getClientId());
+        assertEquals("none", endpoint.getCompressionCodec());
+        assertEquals(new Integer(1), endpoint.getLingerMs());
+        assertEquals(new Integer(100), endpoint.getMaxRequestSize());
+        assertEquals(100, endpoint.getRequestTimeoutMs().intValue());
+        assertEquals(new Integer(9043), endpoint.getMetadataFetchTimeoutMs());
+        assertEquals(new Integer(1029), endpoint.getMetadataMaxAgeMs());
+        assertEquals(new Integer(23), endpoint.getReceiveBufferBytes());
+        assertEquals(new Integer(234), endpoint.getReconnectBackoffMs());
+        assertEquals(new Integer(0), endpoint.getRetries());
+        assertEquals(3782, endpoint.getRetryBackoffMs().intValue());
+        assertEquals(765, endpoint.getSendBufferBytes().intValue());
+        assertEquals(new Integer(2045), endpoint.getTimeoutMs());
+        assertEquals(new Integer(1), endpoint.getMaxInFlightRequest());
+        
assertEquals("org.apache.camel.reporters.TestReport,org.apache.camel.reporters.SampleReport",
 endpoint.getMetricReporters());
+        assertEquals(new Integer(3), endpoint.getNoOfMetricsSample());
+        assertEquals(new Integer(12344), endpoint.getMetricsSampleWindowMs());
+        assertEquals(KafkaConstants.KAFKA_DEFAULT_SERIALIZER, 
endpoint.getSerializerClass());
+        assertEquals(KafkaConstants.KAFKA_DEFAULT_SERIALIZER, 
endpoint.getKeySerializerClass());
+        assertEquals("testing", endpoint.getSslKeyPassword());
+        assertEquals("/abc", endpoint.getSslKeystoreLocation());
+        assertEquals("testing", endpoint.getSslKeystorePassword());
+        assertEquals("/abc", endpoint.getSslTruststoreLocation());
+        assertEquals("testing", endpoint.getSslTruststorePassword());
+        assertEquals("test", endpoint.getSaslKerberosServiceName());
+        assertEquals("PLAINTEXT", endpoint.getSecurityProtocol());
+        assertEquals("TLSv1.2", endpoint.getSslEnabledProtocols());
+        assertEquals("JKS", endpoint.getSslKeystoreType());
+        assertEquals("TLS", endpoint.getSslProtocol());
+        assertEquals("test", endpoint.getSslProvider());
+        assertEquals("JKS", endpoint.getSslTruststoreType());
+        assertEquals("/usr/bin/kinit", endpoint.getKerberosInitCmd());
+        assertEquals(new Integer(60000), 
endpoint.getKerberosBeforeReloginMinTime());
+        assertEquals(new Double(0.05), endpoint.getKerberosRenewJitter());
+        assertEquals(new Double(0.8), endpoint.getKerberosRenewWindowFactor());
+        assertEquals("MAC", endpoint.getSslCipherSuites());
+        assertEquals("test", endpoint.getSslEndpointAlgorithm());
+        assertEquals("SunX509", endpoint.getSslKeymanagerAlgorithm());
+        assertEquals("PKIX", endpoint.getSslTrustmanagerAlgorithm());
     }
 
     @Test
-    public void testPropertiesConfigrationMerge() throws Exception {
+    public void testAllProducerKeys() throws Exception {
         Map<String, Object> params = new HashMap<String, Object>();
-        params.put("portNumber", 14123);
-        params.put("consumerStreams", "3");
-        params.put("topic", "mytopic");
-        params.put("partitioner", "com.class.Party");
 
-        KafkaConfiguration kc = new KafkaConfiguration();
-        kc.setZookeeperHost("somehost");
-        kc.setZookeeperPort(2987);
-        kc.setTopic("default");
-        params.put("configuration", kc);
-
-        String uri = "kafka:broker1:12345,broker2:12566";
-        String remaining = "broker1:12345,broker2:12566";
+        String uri = "kafka:dev1:12345,dev2:12566";
+        String remaining = "dev1:12345,dev2:12566";
 
         KafkaEndpoint endpoint = new 
KafkaComponent(context).createEndpoint(uri, remaining, params);
-        assertEquals("somehost:2987", endpoint.getZookeeperConnect());
-        assertEquals("somehost", endpoint.getZookeeperHost());
-        assertEquals(2987, endpoint.getZookeeperPort());
-        assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers());
-        assertEquals("mytopic", endpoint.getTopic());
-        assertEquals(3, endpoint.getConsumerStreams());
-        assertEquals("com.class.Party", endpoint.getPartitioner());
-        assertNull("dirty", kc.getBrokers());
-        assertEquals("default", kc.getTopic());
+        
assertEquals(endpoint.getConfiguration().createProducerProperties().keySet(), 
getProducerKeys().keySet());
     }
 
-    @Test
-    public void testPropertiesConfigrationRefMerge() throws Exception {
-        Map<String, Object> params = new HashMap<String, Object>();
-        params.put("portNumber", 14123);
-        params.put("consumerStreams", "3");
-        params.put("topic", "mytopic");
-        params.put("partitioner", "com.class.Party");
-
-        KafkaConfiguration kc = new KafkaConfiguration();
-        kc.setZookeeperHost("somehost");
-        kc.setZookeeperPort(2987);
-        kc.setTopic("default");
-        Registry registry = Mockito.mock(Registry.class);
-        Mockito.when(registry.lookupByName("baseconf")).thenReturn(kc);
-        Mockito.when(context.getRegistry()).thenReturn(registry);
-        params.put("configuration", "#baseconf");
-
-        String uri = "kafka:broker1:12345,broker2:12566";
-        String remaining = "broker1:12345,broker2:12566";
+    private Properties getProducerKeys() {
+        Properties props = new Properties();
+
+        props.put(ProducerConfig.ACKS_CONFIG, "1");
+        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
+        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
+        props.put(ProducerConfig.RETRIES_CONFIG, "0");
+        props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
+        props.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "540000");
+        props.put(ProducerConfig.LINGER_MS_CONFIG, "0");
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000");
+        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "1048576");
+        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 
KafkaConstants.KAFKA_DEFAULT_PARTITIONER);
+        props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, "32768");
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
+        props.put(ProducerConfig.SEND_BUFFER_CONFIG, "131072");
+        props.put(ProducerConfig.TIMEOUT_CONFIG, "30000");
+        props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false");
+        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
+        props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, "60000");
+        props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "300000");
+        props.put(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, "2");
+        props.put(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, "30000");
+        props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "50");
+        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100");
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
+        props.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, "TLSv1.2, TLSv1.1, 
TLSv1");
+        props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
+        props.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLS");
+        props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
+        props.put(SaslConfigs.SASL_KERBEROS_KINIT_CMD, "/usr/bin/kinit");
+        props.put(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, "60000");
+        props.put(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, "0.05");
+        props.put(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, "0.8");
+        props.put(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, "SunX509");
+        props.put(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, "PKIX");
+
+        return props;
+    }
 
-        KafkaEndpoint endpoint = new 
KafkaComponent(context).createEndpoint(uri, remaining, params);
-        assertEquals("somehost:2987", endpoint.getZookeeperConnect());
-        assertEquals("somehost", endpoint.getZookeeperHost());
-        assertEquals(2987, endpoint.getZookeeperPort());
-        assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers());
-        assertEquals("mytopic", endpoint.getTopic());
-        assertEquals(3, endpoint.getConsumerStreams());
-        assertEquals("com.class.Party", endpoint.getPartitioner());
-        assertNull("dirty", kc.getBrokers());
-        assertEquals("default", kc.getTopic());
+    private void setProducerProperty(Map<String, Object> params) {
+        params.put("requestRequiredAcks", 0);
+        params.put("bufferMemorySize", 1);
+        params.put("compressionCodec", "none");
+        params.put("retries", 0);
+        params.put("producerBatchSize", 10);
+        params.put("connectionMaxIdleMs", 12);
+        params.put("lingerMs", 1);
+        params.put("maxBlockMs", 1);
+        params.put("maxRequestSize", 100);
+        params.put("receiveBufferBytes", 23);
+        params.put("requestTimeoutMs", 100);
+        params.put("sendBufferBytes", 765);
+        params.put("timeoutMs", 2045);
+        params.put("blockOnBufferFull", false);
+        params.put("maxInFlightRequest", 1);
+        params.put("metadataFetchTimeoutMs", 9043);
+        params.put("metadataMaxAgeMs", 1029);
+        params.put("reconnectBackoffMs", 234);
+        params.put("retryBackoffMs", 3782);
+        params.put("noOfMetricsSample", 3);
+        params.put("metricReporters", 
"org.apache.camel.reporters.TestReport,org.apache.camel.reporters.SampleReport");
+        params.put("metricsSampleWindowMs", 12344);
+        params.put("clientId", "testing");
+        params.put("sslKeyPassword", "testing");
+        params.put("sslKeystoreLocation", "/abc");
+        params.put("sslKeystorePassword", "testing");
+        params.put("sslTruststoreLocation", "/abc");
+        params.put("sslTruststorePassword", "testing");
+        params.put("saslKerberosServiceName", "test");
+        params.put("securityProtocol", "PLAINTEXT");
+        params.put("sslEnabledProtocols", "TLSv1.2");
+        params.put("sslKeystoreType", "JKS");
+        params.put("sslProtocol", "TLS");
+        params.put("sslProvider", "test");
+        params.put("sslTruststoreType", "JKS");
+        params.put("kerberosInitCmd", "/usr/bin/kinit");
+        params.put("kerberosBeforeReloginMinTime", 60000);
+        params.put("kerberosRenewJitter", 0.05);
+        params.put("kerberosRenewWindowFactor", 0.8);
+        params.put("sslCipherSuites", "MAC");
+        params.put("sslEndpointAlgorithm", "test");
+        params.put("sslKeymanagerAlgorithm", "SunX509");
+        params.put("sslTrustmanagerAlgorithm", "PKIX");
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
index a6f59c0..9128c62 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
@@ -18,13 +18,12 @@ package org.apache.camel.component.kafka;
 
 import java.util.Properties;
 
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
 import org.apache.camel.Endpoint;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -34,39 +33,38 @@ public class KafkaConsumerBatchSizeTest extends 
BaseEmbeddedKafkaTest {
     public static final String TOPIC = "test";
 
     @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC
-            + "&zookeeperHost=localhost"
-            + "&zookeeperPort={{zookeeperPort}}"
             + "&groupId=group1"
-            + "&autoOffsetReset=smallest"
+            + "&autoOffsetReset=earliest"
             + "&autoCommitEnable=false"
             + "&batchSize=3"
             + "&consumerStreams=10"
-            // If set the consumerTiemout too small the test will fail in JDK7
-            + "&consumerTimeoutMs=300"
             + "&barrierAwaitTimeoutMs=1000"
-        )
+    )
     private Endpoint from;
 
     @EndpointInject(uri = "mock:result")
     private MockEndpoint to;
 
-    private Producer<String, String> producer;
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> 
producer;
 
     @Before
     public void before() {
         Properties props = new Properties();
-        props.put("metadata.broker.list", "localhost:" + getKarfkaPort());
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
-        props.put("partitioner.class", 
"org.apache.camel.component.kafka.SimplePartitioner");
-        props.put("request.required.acks", "1");
 
-        ProducerConfig config = new ProducerConfig(props);
-        producer = new Producer<String, String>(config);
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + 
getKarfkaPort());
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 
KafkaConstants.KAFKA_DEFAULT_PARTITIONER);
+        props.put(ProducerConfig.ACKS_CONFIG, "1");
+
+        producer = new org.apache.kafka.clients.producer.KafkaProducer<String, 
String>(props);
     }
 
     @After
     public void after() {
-        producer.close();
+        if (producer != null) {
+            producer.close();
+        }
     }
 
     @Override
@@ -74,47 +72,39 @@ public class KafkaConsumerBatchSizeTest extends 
BaseEmbeddedKafkaTest {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from(from).autoStartup(true).to(to).setId("First");
+                from(from).routeId("foo").to(to).setId("First");
             }
         };
     }
 
     @Test
     public void kafkaMessagesIsConsumedByCamel() throws Exception {
+
         //First 2 must not be committed since batch size is 3
         to.expectedBodiesReceivedInAnyOrder("m1", "m2");
         for (int k = 1; k <= 2; k++) {
             String msg = "m" + k;
-            KeyedMessage<String, String> data = new KeyedMessage<String, 
String>(TOPIC, "1", msg);
+            ProducerRecord<String, String> data = new ProducerRecord<String, 
String>(TOPIC, "1", msg);
             producer.send(data);
         }
-        to.assertIsSatisfied(3000);
-        
+        to.assertIsSatisfied();
+
         to.reset();
+
+        to.expectedBodiesReceivedInAnyOrder("m3", "m4", "m5", "m6", "m7", 
"m8", "m9", "m10");
+
         //Restart endpoint,
-        from.getCamelContext().stop();
-        from.getCamelContext().start();
-        
-        to.expectedBodiesReceivedInAnyOrder("m1", "m2", "m3", "m4", "m5", 
"m6", "m7", "m8", "m9", "m10");
+        context.stopRoute("foo");
+        context.startRoute("foo");
 
         //Second route must wake up and consume all from scratch and commit 9 
consumed
         for (int k = 3; k <= 10; k++) {
             String msg = "m" + k;
-            KeyedMessage<String, String> data = new KeyedMessage<String, 
String>(TOPIC, "1", msg);
+            ProducerRecord<String, String> data = new ProducerRecord<String, 
String>(TOPIC, "1", msg);
             producer.send(data);
         }
 
-        to.assertIsSatisfied(3000);
-
-        to.reset();
-        //Restart endpoint,
-        from.getCamelContext().stop();
-        from.getCamelContext().start();
-
-        
-        //Only one message should left to consume by this consumer group
-        to.expectedMessageCount(1);
-        to.assertIsSatisfied(3000);
+        to.assertIsSatisfied();
     }
 }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
index c49444b..4f8a0fd 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
@@ -19,46 +19,49 @@ package org.apache.camel.component.kafka;
 import java.io.IOException;
 import java.util.Properties;
 
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-
 import org.apache.camel.Endpoint;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest {
 
     public static final String TOPIC = "test";
 
-    @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC + 
"&zookeeperHost=localhost&zookeeperPort={{zookeeperPort}}"
-        + "&groupId=group1&autoOffsetReset=smallest")
+    @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC
+            + 
"&groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
+            + 
"valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+            + 
"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true")
     private Endpoint from;
 
     @EndpointInject(uri = "mock:result")
     private MockEndpoint to;
 
-    private Producer<String, String> producer;
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> 
producer;
 
     @Before
     public void before() {
         Properties props = new Properties();
-        props.put("metadata.broker.list", "localhost:" + getKarfkaPort());
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
-        props.put("partitioner.class", 
"org.apache.camel.component.kafka.SimplePartitioner");
-        props.put("request.required.acks", "1");
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + 
getKarfkaPort());
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
+        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 
KafkaConstants.KAFKA_DEFAULT_PARTITIONER);
+        props.put(ProducerConfig.ACKS_CONFIG, "1");
 
-        ProducerConfig config = new ProducerConfig(props);
-        producer = new Producer<String, String>(config);
+        producer = new org.apache.kafka.clients.producer.KafkaProducer<String, 
String>(props);
     }
 
     @After
     public void after() {
-        producer.close();
+        if (producer != null) {
+            producer.close();
+        }
     }
 
     @Override
@@ -78,10 +81,11 @@ public class KafkaConsumerFullTest extends 
BaseEmbeddedKafkaTest {
         to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", 
"message-2", "message-3", "message-4");
         for (int k = 0; k < 5; k++) {
             String msg = "message-" + k;
-            KeyedMessage<String, String> data = new KeyedMessage<String, 
String>(TOPIC, "1", msg);
+            ProducerRecord<String, String> data = new ProducerRecord<String, 
String>(TOPIC, "1", msg);
             producer.send(data);
         }
         to.assertIsSatisfied(3000);
     }
+
 }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
index b51c09e..86bc163 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
@@ -28,21 +28,21 @@ public class KafkaConsumerTest {
     private Processor processor = mock(Processor.class);
 
     @Test(expected = IllegalArgumentException.class)
-    public void consumerRequiresZookeeperConnect() throws Exception {
+    public void consumerRequiresBootstrapServers() throws Exception {
         Mockito.when(endpoint.getGroupId()).thenReturn("groupOne");
         new KafkaConsumer(endpoint, processor);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void consumerRequiresGroupId() throws Exception {
-        
Mockito.when(endpoint.getZookeeperConnect()).thenReturn("localhost:2181/chroot");
+        Mockito.when(endpoint.getBrokers()).thenReturn("localhost:1234");
         new KafkaConsumer(endpoint, processor);
     }
 
     @Test
-    public void consumerOnlyRequiresZookeeperConnectAndGroupId() throws 
Exception {
+    public void consumerOnlyRequiresBootstrapServersAndGroupId() throws 
Exception {
         Mockito.when(endpoint.getGroupId()).thenReturn("groupOne");
-        
Mockito.when(endpoint.getZookeeperConnect()).thenReturn("localhost:2181/chroot");
+        Mockito.when(endpoint.getBrokers()).thenReturn("localhost:2181");
         new KafkaConsumer(endpoint, processor);
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
index be16766..bd25886 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
@@ -18,11 +18,8 @@ package org.apache.camel.component.kafka;
 
 import java.net.URISyntaxException;
 
-import kafka.message.Message;
-import kafka.message.MessageAndMetadata;
-
-import kafka.serializer.DefaultDecoder;
 import org.apache.camel.Exchange;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -35,12 +32,8 @@ public class KafkaEndpointTest {
         KafkaEndpoint endpoint = new KafkaEndpoint("kafka:localhost", new 
KafkaComponent());
         endpoint.setBrokers("localhost");
 
-        Message message = new Message("mymessage".getBytes(), 
"somekey".getBytes());
-        DefaultDecoder decoder = new DefaultDecoder(null);
-        MessageAndMetadata<byte[], byte[]> mm =
-                new MessageAndMetadata<byte[], byte[]>("topic", 4, message, 
56, decoder, decoder);
-
-        Exchange exchange = endpoint.createKafkaExchange(mm);
+        ConsumerRecord<String, String> record = new ConsumerRecord<String, 
String>("topic", 4, 56, "somekey", "");
+        Exchange exchange = endpoint.createKafkaExchange(record);
         assertEquals("somekey", 
exchange.getIn().getHeader(KafkaConstants.KEY));
         assertEquals("topic", 
exchange.getIn().getHeader(KafkaConstants.TOPIC));
         assertEquals(4, exchange.getIn().getHeader(KafkaConstants.PARTITION));

Reply via email to