Fixed CS

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1eeba05d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1eeba05d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1eeba05d

Branch: refs/heads/master
Commit: 1eeba05d48a0ea12cc8bb4741bb7163d73281022
Parents: a107781
Author: Andrea Cosentino <anco...@gmail.com>
Authored: Fri Apr 29 10:50:57 2016 +0200
Committer: Andrea Cosentino <anco...@gmail.com>
Committed: Fri Apr 29 10:52:24 2016 +0200

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaConsumer.java    | 51 +++++++++-----------
 1 file changed, 23 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1eeba05d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 82600e7..8649a46 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -94,7 +94,7 @@ public class KafkaConsumer extends DefaultConsumer {
         private final String threadId;
         private final Properties kafkaProps;
 
-        public KafkaFetchRecords(String topicName, String id, Properties 
kafkaProps) {
+        KafkaFetchRecords(String topicName, String id, Properties kafkaProps) {
             this.topicName = topicName;
             this.threadId = topicName + "-" + "Thread " + id;
             this.kafkaProps = kafkaProps;
@@ -117,33 +117,28 @@ public class KafkaConsumer extends DefaultConsumer {
                 }
                 while (isRunAllowed() && !isSuspendingOrSuspended()) {
                     ConsumerRecords<Object, Object> allRecords = 
consumer.poll(Long.MAX_VALUE);
-                    // START : CAMEL-9823
-                                       for (TopicPartition partition : 
allRecords.partitions()) {
-                                               List<ConsumerRecord<Object, 
Object>> partitionRecords = allRecords
-                                                               
.records(partition);
-                           for (ConsumerRecord<Object, Object> record : 
partitionRecords) {
-                               if (LOG.isTraceEnabled()) {
-                                   LOG.trace("partition = {}, offset = {}, key 
= {}, value = {}", record.partition(), record.offset(), record.key(), 
record.value());
-                               }
-                               Exchange exchange = 
endpoint.createKafkaExchange(record);
-                               try {
-                                   processor.process(exchange);
-                               } catch (Exception e) {
-                                   
getExceptionHandler().handleException("Error during processing", exchange, e);
-                               }
-                           }
-                                               // if autocommit is false
-                                               if 
(endpoint.isAutoCommitEnable() != null
-                                                               && 
!endpoint.isAutoCommitEnable()) {
-                                                       long 
partitionLastoffset = partitionRecords.get(
-                                                                       
partitionRecords.size() - 1).offset();
-                                                       
consumer.commitSync(Collections.singletonMap(
-                                                                       
partition, new OffsetAndMetadata(
-                                                                               
        partitionLastoffset + 1)));
-                                               }
-                           
-                                       }
-                                       // END : CAMEL-9823
+                    for (TopicPartition partition : allRecords.partitions()) {
+                        List<ConsumerRecord<Object, Object>> partitionRecords 
= allRecords
+                            .records(partition);
+                        for (ConsumerRecord<Object, Object> record : 
partitionRecords) {
+                            if (LOG.isTraceEnabled()) {
+                                LOG.trace("partition = {}, offset = {}, key = 
{}, value = {}", record.partition(), record.offset(), record.key(), 
record.value());
+                            }
+                            Exchange exchange = 
endpoint.createKafkaExchange(record);
+                            try {
+                                processor.process(exchange);
+                            } catch (Exception e) {
+                                getExceptionHandler().handleException("Error 
during processing", exchange, e);
+                            }
+                        }
+                        // if autocommit is false
+                        if (endpoint.isAutoCommitEnable() != null
+                            && !endpoint.isAutoCommitEnable()) {
+                            long partitionLastoffset = 
partitionRecords.get(partitionRecords.size() - 1).offset();
+                            consumer.commitSync(Collections.singletonMap(
+                                partition, new 
OffsetAndMetadata(partitionLastoffset + 1)));
+                        }
+                    }
                 }
                 LOG.debug("Unsubscribing {} from topic {}", threadId, 
topicName);
                 consumer.unsubscribe();

Reply via email to