Repository: camel Updated Branches: refs/heads/master 5fee9dd8e -> 1eeba05d4
CAMEL-9823: Exploring Consumer groups feature in Camel-kafka consumer side. Thanks to Anbumani Balusamy for the patch. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a107781b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a107781b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a107781b Branch: refs/heads/master Commit: a107781bab8c95e03e31a2c5824381d9ea78efeb Parents: 5fee9dd Author: Andrea Cosentino <anco...@gmail.com> Authored: Fri Apr 29 10:40:17 2016 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Fri Apr 29 10:51:36 2016 +0200 ---------------------------------------------------------------------- .../camel/component/kafka/KafkaConsumer.java | 54 ++++++++++++-------- 1 file changed, 33 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/a107781b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 7733231..82600e7 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -17,6 +17,8 @@ package org.apache.camel.component.kafka; import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutorService; @@ -26,6 +28,8 @@ import org.apache.camel.impl.DefaultConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InterruptException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,7 +94,7 @@ public class KafkaConsumer extends DefaultConsumer { private final String threadId; private final Properties kafkaProps; - KafkaFetchRecords(String topicName, String id, Properties kafkaProps) { + public KafkaFetchRecords(String topicName, String id, Properties kafkaProps) { this.topicName = topicName; this.threadId = topicName + "-" + "Thread " + id; this.kafkaProps = kafkaProps; @@ -112,26 +116,34 @@ public class KafkaConsumer extends DefaultConsumer { consumer.seekToBeginning(); } while (isRunAllowed() && !isSuspendingOrSuspended()) { - ConsumerRecords<Object, Object> records = consumer.poll(Long.MAX_VALUE); - for (ConsumerRecord<Object, Object> record : records) { - if (LOG.isTraceEnabled()) { - LOG.trace("offset = {}, key = {}, value = {}", record.offset(), record.key(), record.value()); - } - Exchange exchange = endpoint.createKafkaExchange(record); - try { - processor.process(exchange); - } catch (Exception e) { - getExceptionHandler().handleException("Error during processing", exchange, e); - } - processed++; - // if autocommit is false - if (endpoint.isAutoCommitEnable() != null && !endpoint.isAutoCommitEnable()) { - if (processed >= endpoint.getBatchSize()) { - consumer.commitSync(); - processed = 0; - } - } - } + ConsumerRecords<Object, Object> allRecords = consumer.poll(Long.MAX_VALUE); + // START : CAMEL-9823 + for (TopicPartition partition : allRecords.partitions()) { + List<ConsumerRecord<Object, Object>> partitionRecords = allRecords + .records(partition); + for (ConsumerRecord<Object, Object> record : partitionRecords) { + if (LOG.isTraceEnabled()) { + LOG.trace("partition = {}, offset = {}, key = {}, value = {}", record.partition(), record.offset(), record.key(), record.value()); + } + Exchange exchange = endpoint.createKafkaExchange(record); + try { + processor.process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException("Error during processing", exchange, e); + } + } + // if autocommit is false + if (endpoint.isAutoCommitEnable() != null + && !endpoint.isAutoCommitEnable()) { + long partitionLastoffset = partitionRecords.get( + partitionRecords.size() - 1).offset(); + consumer.commitSync(Collections.singletonMap( + partition, new OffsetAndMetadata( + partitionLastoffset + 1))); + } + + } + // END : CAMEL-9823 } LOG.debug("Unsubscribing {} from topic {}", threadId, topicName); consumer.unsubscribe();