Fixed CS
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1eeba05d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1eeba05d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1eeba05d Branch: refs/heads/master Commit: 1eeba05d48a0ea12cc8bb4741bb7163d73281022 Parents: a107781 Author: Andrea Cosentino <anco...@gmail.com> Authored: Fri Apr 29 10:50:57 2016 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Fri Apr 29 10:52:24 2016 +0200 ---------------------------------------------------------------------- .../camel/component/kafka/KafkaConsumer.java | 51 +++++++++----------- 1 file changed, 23 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1eeba05d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 82600e7..8649a46 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -94,7 +94,7 @@ public class KafkaConsumer extends DefaultConsumer { private final String threadId; private final Properties kafkaProps; - public KafkaFetchRecords(String topicName, String id, Properties kafkaProps) { + KafkaFetchRecords(String topicName, String id, Properties kafkaProps) { this.topicName = topicName; this.threadId = topicName + "-" + "Thread " + id; this.kafkaProps = kafkaProps; @@ -117,33 +117,28 @@ public class KafkaConsumer extends DefaultConsumer { } while (isRunAllowed() && !isSuspendingOrSuspended()) { ConsumerRecords<Object, Object> allRecords = consumer.poll(Long.MAX_VALUE); - // START : CAMEL-9823 - for (TopicPartition partition : allRecords.partitions()) { - List<ConsumerRecord<Object, Object>> partitionRecords = allRecords - .records(partition); - for (ConsumerRecord<Object, Object> record : partitionRecords) { - if (LOG.isTraceEnabled()) { - LOG.trace("partition = {}, offset = {}, key = {}, value = {}", record.partition(), record.offset(), record.key(), record.value()); - } - Exchange exchange = endpoint.createKafkaExchange(record); - try { - processor.process(exchange); - } catch (Exception e) { - getExceptionHandler().handleException("Error during processing", exchange, e); - } - } - // if autocommit is false - if (endpoint.isAutoCommitEnable() != null - && !endpoint.isAutoCommitEnable()) { - long partitionLastoffset = partitionRecords.get( - partitionRecords.size() - 1).offset(); - consumer.commitSync(Collections.singletonMap( - partition, new OffsetAndMetadata( - partitionLastoffset + 1))); - } - - } - // END : CAMEL-9823 + for (TopicPartition partition : allRecords.partitions()) { + List<ConsumerRecord<Object, Object>> partitionRecords = allRecords + .records(partition); + for (ConsumerRecord<Object, Object> record : partitionRecords) { + if (LOG.isTraceEnabled()) { + LOG.trace("partition = {}, offset = {}, key = {}, value = {}", record.partition(), record.offset(), record.key(), record.value()); + } + Exchange exchange = endpoint.createKafkaExchange(record); + try { + processor.process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException("Error during processing", exchange, e); + } + } + // if autocommit is false + if (endpoint.isAutoCommitEnable() != null + && !endpoint.isAutoCommitEnable()) { + long partitionLastoffset = partitionRecords.get(partitionRecords.size() - 1).offset(); + consumer.commitSync(Collections.singletonMap( + partition, new OffsetAndMetadata(partitionLastoffset + 1))); + } + } } LOG.debug("Unsubscribing {} from topic {}", threadId, topicName); consumer.unsubscribe();