Repository: camel
Updated Branches:
  refs/heads/master 5fee9dd8e -> 1eeba05d4


CAMEL-9823: Exploring Consumer groups feature in Camel-kafka consumer side. 
Thanks to Anbumani Balusamy for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a107781b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a107781b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a107781b

Branch: refs/heads/master
Commit: a107781bab8c95e03e31a2c5824381d9ea78efeb
Parents: 5fee9dd
Author: Andrea Cosentino <anco...@gmail.com>
Authored: Fri Apr 29 10:40:17 2016 +0200
Committer: Andrea Cosentino <anco...@gmail.com>
Committed: Fri Apr 29 10:51:36 2016 +0200

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaConsumer.java    | 54 ++++++++++++--------
 1 file changed, 33 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a107781b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 7733231..82600e7 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -17,6 +17,8 @@
 package org.apache.camel.component.kafka;
 
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 
@@ -26,6 +28,8 @@ import org.apache.camel.impl.DefaultConsumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -90,7 +94,7 @@ public class KafkaConsumer extends DefaultConsumer {
         private final String threadId;
         private final Properties kafkaProps;
 
-        KafkaFetchRecords(String topicName, String id, Properties kafkaProps) {
+        public KafkaFetchRecords(String topicName, String id, Properties 
kafkaProps) {
             this.topicName = topicName;
             this.threadId = topicName + "-" + "Thread " + id;
             this.kafkaProps = kafkaProps;
@@ -112,26 +116,34 @@ public class KafkaConsumer extends DefaultConsumer {
                     consumer.seekToBeginning();
                 }
                 while (isRunAllowed() && !isSuspendingOrSuspended()) {
-                    ConsumerRecords<Object, Object> records = 
consumer.poll(Long.MAX_VALUE);
-                    for (ConsumerRecord<Object, Object> record : records) {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("offset = {}, key = {}, value = {}", 
record.offset(), record.key(), record.value());
-                        }
-                        Exchange exchange = 
endpoint.createKafkaExchange(record);
-                        try {
-                            processor.process(exchange);
-                        } catch (Exception e) {
-                            getExceptionHandler().handleException("Error 
during processing", exchange, e);
-                        }
-                        processed++;
-                        // if autocommit is false
-                        if (endpoint.isAutoCommitEnable() != null && 
!endpoint.isAutoCommitEnable()) {
-                            if (processed >= endpoint.getBatchSize()) {
-                                consumer.commitSync();
-                                processed = 0;
-                            }
-                        }
-                    }
+                    ConsumerRecords<Object, Object> allRecords = 
consumer.poll(Long.MAX_VALUE);
+                    // START : CAMEL-9823
+                                       for (TopicPartition partition : 
allRecords.partitions()) {
+                                               List<ConsumerRecord<Object, 
Object>> partitionRecords = allRecords
+                                                               
.records(partition);
+                           for (ConsumerRecord<Object, Object> record : 
partitionRecords) {
+                               if (LOG.isTraceEnabled()) {
+                                   LOG.trace("partition = {}, offset = {}, key 
= {}, value = {}", record.partition(), record.offset(), record.key(), 
record.value());
+                               }
+                               Exchange exchange = 
endpoint.createKafkaExchange(record);
+                               try {
+                                   processor.process(exchange);
+                               } catch (Exception e) {
+                                   
getExceptionHandler().handleException("Error during processing", exchange, e);
+                               }
+                           }
+                                               // if autocommit is false
+                                               if 
(endpoint.isAutoCommitEnable() != null
+                                                               && 
!endpoint.isAutoCommitEnable()) {
+                                                       long 
partitionLastoffset = partitionRecords.get(
+                                                                       
partitionRecords.size() - 1).offset();
+                                                       
consumer.commitSync(Collections.singletonMap(
+                                                                       
partition, new OffsetAndMetadata(
+                                                                               
        partitionLastoffset + 1)));
+                                               }
+                           
+                                       }
+                                       // END : CAMEL-9823
                 }
                 LOG.debug("Unsubscribing {} from topic {}", threadId, 
topicName);
                 consumer.unsubscribe();

Reply via email to