This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new b7ff890  Camel-Kafka: Fixed CS
b7ff890 is described below

commit b7ff890a4fb07b1c1ffd9fd299caa16048c9f88f
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Mon Mar 25 08:16:57 2019 +0100

    Camel-Kafka: Fixed CS
---
 .../camel/component/kafka/KafkaConsumer.java       | 97 +++++++++++++---------
 .../KafkaConsumerRebalancePartitionRevokeTest.java |  8 +-
 .../kafka/KafkaConsumerRebalanceTest.java          |  6 +-
 3 files changed, 66 insertions(+), 45 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 9417dc7..e644a10 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -106,14 +106,13 @@ public class KafkaConsumer extends DefaultConsumer {
 
     @Override
     protected void doStart() throws Exception {
-        log.info("Starting Kafka consumer on topic: {} with breakOnFirstError: 
{}",
-                endpoint.getConfiguration().getTopic(), 
endpoint.getConfiguration().isBreakOnFirstError());
+        log.info("Starting Kafka consumer on topic: {} with breakOnFirstError: 
{}", endpoint.getConfiguration().getTopic(), 
endpoint.getConfiguration().isBreakOnFirstError());
         super.doStart();
 
         // is the offset repository already started?
         StateRepository repo = 
endpoint.getConfiguration().getOffsetRepository();
         if (repo instanceof ServiceSupport) {
-            boolean started = ((ServiceSupport) repo).isStarted();
+            boolean started = ((ServiceSupport)repo).isStarted();
             // if not already started then we would do that and also stop it
             if (!started) {
                 stopOffsetRepo = true;
@@ -132,7 +131,8 @@ public class KafkaConsumer extends DefaultConsumer {
 
         for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); 
i++) {
             KafkaFetchRecords task = new KafkaFetchRecords(topic, pattern, i + 
"", getProps());
-            // pre-initialize task during startup so if there is any error we 
have it thrown asap
+            // pre-initialize task during startup so if there is any error we
+            // have it thrown asap
             task.preInit();
             executor.submit(task);
             tasks.add(task);
@@ -190,7 +190,8 @@ public class KafkaConsumer extends DefaultConsumer {
             while (reConnect) {
                 try {
                     if (!first) {
-                        // re-initialize on re-connect so we have a fresh 
consumer
+                        // re-initialize on re-connect so we have a fresh
+                        // consumer
                         doInit();
                     }
                 } catch (Throwable e) {
@@ -211,7 +212,8 @@ public class KafkaConsumer extends DefaultConsumer {
 
                 first = false;
 
-                // doRun keeps running until we either shutdown or is told to 
re-connect
+                // doRun keeps running until we either shutdown or is told to
+                // re-connect
                 reConnect = doRun();
             }
         }
@@ -224,9 +226,11 @@ public class KafkaConsumer extends DefaultConsumer {
             // create consumer
             ClassLoader threadClassLoader = 
Thread.currentThread().getContextClassLoader();
             try {
-                // Kafka uses reflection for loading authentication settings, 
use its classloader
+                // Kafka uses reflection for loading authentication settings,
+                // use its classloader
                 
Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
-                // this may throw an exception if something is wrong with 
kafka consumer
+                // this may throw an exception if something is wrong with kafka
+                // consumer
                 this.consumer = new 
org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
             } finally {
                 
Thread.currentThread().setContextClassLoader(threadClassLoader);
@@ -235,7 +239,8 @@ public class KafkaConsumer extends DefaultConsumer {
 
         @SuppressWarnings("unchecked")
         protected boolean doRun() {
-            // allow to re-connect thread in case we use that to retry failed 
messages
+            // allow to re-connect thread in case we use that to retry failed
+            // messages
             boolean reConnect = false;
             boolean unsubscribing = false;
 
@@ -250,19 +255,23 @@ public class KafkaConsumer extends DefaultConsumer {
 
                 StateRepository<String, String> offsetRepository = 
endpoint.getConfiguration().getOffsetRepository();
                 if (offsetRepository != null) {
-                    // This poll to ensures we have an assigned partition 
otherwise seek won't work
+                    // This poll to ensures we have an assigned partition
+                    // otherwise seek won't work
                     ConsumerRecords poll = consumer.poll(100);
 
-                    for (TopicPartition topicPartition : (Set<TopicPartition>) 
consumer.assignment()) {
+                    for (TopicPartition topicPartition : 
(Set<TopicPartition>)consumer.assignment()) {
                         String offsetState = 
offsetRepository.getState(serializeOffsetKey(topicPartition));
                         if (offsetState != null && !offsetState.isEmpty()) {
-                            // The state contains the last read offset so you 
need to seek from the next one
+                            // The state contains the last read offset so you
+                            // need to seek from the next one
                             long offset = deserializeOffsetValue(offsetState) 
+ 1;
                             log.debug("Resuming partition {} from offset {} 
from state", topicPartition.partition(), offset);
                             consumer.seek(topicPartition, offset);
                         } else {
-                            // If the init poll has returned some data of a 
currently unknown topic/partition in the state
-                            // then resume from their offset in order to avoid 
losing data
+                            // If the init poll has returned some data of a
+                            // currently unknown topic/partition in the state
+                            // then resume from their offset in order to avoid
+                            // losing data
                             List<ConsumerRecord<Object, Object>> 
partitionRecords = poll.records(topicPartition);
                             if (!partitionRecords.isEmpty()) {
                                 long offset = partitionRecords.get(0).offset();
@@ -274,12 +283,14 @@ public class KafkaConsumer extends DefaultConsumer {
                 } else if (endpoint.getConfiguration().getSeekTo() != null) {
                     if 
(endpoint.getConfiguration().getSeekTo().equals("beginning")) {
                         log.debug("{} is seeking to the beginning on topic 
{}", threadId, topicName);
-                        // This poll to ensures we have an assigned partition 
otherwise seek won't work
+                        // This poll to ensures we have an assigned partition
+                        // otherwise seek won't work
                         consumer.poll(100);
                         consumer.seekToBeginning(consumer.assignment());
                     } else if 
(endpoint.getConfiguration().getSeekTo().equals("end")) {
                         log.debug("{} is seeking to the end on topic {}", 
threadId, topicName);
-                        // This poll to ensures we have an assigned partition 
otherwise seek won't work
+                        // This poll to ensures we have an assigned partition
+                        // otherwise seek won't work
                         consumer.poll(100);
                         consumer.seekToEnd(consumer.assignment());
                     }
@@ -304,21 +315,23 @@ public class KafkaConsumer extends DefaultConsumer {
                             while (!breakOnErrorHit && 
recordIterator.hasNext()) {
                                 record = recordIterator.next();
                                 if (log.isTraceEnabled()) {
-                                    log.trace("Partition = {}, offset = {}, 
key = {}, value = {}", record.partition(), record.offset(), record.key(),
-                                            record.value());
+                                    log.trace("Partition = {}, offset = {}, 
key = {}, value = {}", record.partition(), record.offset(), record.key(), 
record.value());
                                 }
                                 Exchange exchange = 
endpoint.createKafkaExchange(record);
 
                                 propagateHeaders(record, exchange, 
endpoint.getConfiguration());
 
-                                // if not auto commit then we have additional 
information on the exchange
+                                // if not auto commit then we have additional
+                                // information on the exchange
                                 if (!isAutoCommitEnabled()) {
                                     
exchange.getIn().setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, 
!recordIterator.hasNext());
                                 }
                                 if 
(endpoint.getConfiguration().isAllowManualCommit()) {
-                                    // allow Camel users to access the Kafka 
consumer API to be able to do for example manual commits
+                                    // allow Camel users to access the Kafka
+                                    // consumer API to be able to do for 
example
+                                    // manual commits
                                     KafkaManualCommit manual = 
endpoint.getComponent().getKafkaManualCommitFactory().newInstance(exchange, 
consumer, topicName, threadId,
-                                            offsetRepository, partition, 
record.offset());
+                                                                               
                                                  offsetRepository, partition, 
record.offset());
                                     
exchange.getIn().setHeader(KafkaConstants.MANUAL_COMMIT, manual);
                                 }
 
@@ -329,29 +342,36 @@ public class KafkaConsumer extends DefaultConsumer {
                                 }
 
                                 if (exchange.getException() != null) {
-                                    // processing failed due to an unhandled 
exception, what should we do
+                                    // processing failed due to an unhandled
+                                    // exception, what should we do
                                     if 
(endpoint.getConfiguration().isBreakOnFirstError()) {
-                                        // we are failing and we should break 
out
-                                        log.warn("Error during processing {} 
from topic: {}. Will seek consumer to offset: {} and re-connect and start 
polling again.",
-                                                exchange, topicName, 
partitionLastOffset);
-                                        // force commit so we resume on next 
poll where we failed
+                                        // we are failing and we should break
+                                        // out
+                                        log.warn("Error during processing {} 
from topic: {}. Will seek consumer to offset: {} and re-connect and start 
polling again.", exchange,
+                                                 topicName, 
partitionLastOffset);
+                                        // force commit so we resume on next
+                                        // poll where we failed
                                         commitOffset(offsetRepository, 
partition, partitionLastOffset, true);
                                         // continue to next partition
                                         breakOnErrorHit = true;
                                     } else {
-                                        // will handle/log the exception and 
then continue to next
+                                        // will handle/log the exception and
+                                        // then continue to next
                                         
getExceptionHandler().handleException("Error during processing", exchange, 
exchange.getException());
                                     }
                                 } else {
                                     // record was success so remember its 
offset
                                     partitionLastOffset = record.offset();
-                                    //lastOffsetProcessed would be used by 
Consumer re-balance listener to preserve offset state upon partition revoke
+                                    // lastOffsetProcessed would be used by
+                                    // Consumer re-balance listener to preserve
+                                    // offset state upon partition revoke
                                     
lastProcessedOffset.put(serializeOffsetKey(partition), partitionLastOffset);
                                 }
                             }
 
                             if (!breakOnErrorHit) {
-                                // all records processed from partition so 
commit them
+                                // all records processed from partition so
+                                // commit them
                                 commitOffset(offsetRepository, partition, 
partitionLastOffset, false);
                             }
                         }
@@ -385,7 +405,8 @@ public class KafkaConsumer extends DefaultConsumer {
                 consumer.unsubscribe();
                 Thread.currentThread().interrupt();
             } catch (KafkaException e) {
-                // some kind of error in kafka, it may happen during 
unsubscribing or during normal processing
+                // some kind of error in kafka, it may happen during
+                // unsubscribing or during normal processing
                 if (unsubscribing) {
                     getExceptionHandler().handleException("Error unsubscribing 
" + threadId + " from kafka topic " + topicName, e);
                 } else {
@@ -415,7 +436,8 @@ public class KafkaConsumer extends DefaultConsumer {
         }
 
         private void shutdown() {
-            // As advised in the KAFKA-1894 ticket, calling this wakeup method 
breaks the infinite loop
+            // As advised in the KAFKA-1894 ticket, calling this wakeup method
+            // breaks the infinite loop
             consumer.wakeup();
         }
 
@@ -424,11 +446,11 @@ public class KafkaConsumer extends DefaultConsumer {
             log.debug("onPartitionsRevoked: {} from topic {}", threadId, 
topicName);
 
             StateRepository<String, String> offsetRepository = 
endpoint.getConfiguration().getOffsetRepository();
-                for (TopicPartition partition : partitions) {
+            for (TopicPartition partition : partitions) {
                 String offsetKey = serializeOffsetKey(partition);
                 Long offset = lastProcessedOffset.get(offsetKey);
                 if (offset == null) {
-                    offset = -1l;
+                    offset = -1L;
                 }
                 log.debug("Saving offset repository state {} from offsetKey {} 
with offset: {}", threadId, offsetKey, offset);
                 commitOffset(offsetRepository, partition, offset, true);
@@ -445,7 +467,8 @@ public class KafkaConsumer extends DefaultConsumer {
                 for (TopicPartition partition : partitions) {
                     String offsetState = 
offsetRepository.getState(serializeOffsetKey(partition));
                     if (offsetState != null && !offsetState.isEmpty()) {
-                        // The state contains the last read offset so you need 
to seek from the next one
+                        // The state contains the last read offset so you need
+                        // to seek from the next one
                         long offset = deserializeOffsetValue(offsetState) + 1;
                         log.debug("Resuming partition {} from offset {} from 
state", partition.partition(), offset);
                         consumer.seek(partition, offset);
@@ -458,9 +481,8 @@ public class KafkaConsumer extends DefaultConsumer {
     private void propagateHeaders(ConsumerRecord<Object, Object> record, 
Exchange exchange, KafkaConfiguration kafkaConfiguration) {
         HeaderFilterStrategy headerFilterStrategy = 
kafkaConfiguration.getHeaderFilterStrategy();
         KafkaHeaderDeserializer headerDeserializer = 
kafkaConfiguration.getKafkaHeaderDeserializer();
-        StreamSupport.stream(record.headers().spliterator(), false)
-                .filter(header -> shouldBeFiltered(header, exchange, 
headerFilterStrategy))
-                .forEach(header -> exchange.getIn().setHeader(header.key(), 
headerDeserializer.deserialize(header.key(), header.value())));
+        StreamSupport.stream(record.headers().spliterator(), 
false).filter(header -> shouldBeFiltered(header, exchange, 
headerFilterStrategy))
+            .forEach(header -> exchange.getIn().setHeader(header.key(), 
headerDeserializer.deserialize(header.key(), header.value())));
     }
 
     private boolean shouldBeFiltered(Header header, Exchange exchange, 
HeaderFilterStrategy headerFilterStrategy) {
@@ -483,4 +505,3 @@ public class KafkaConsumer extends DefaultConsumer {
         return Long.parseLong(offset);
     }
 }
-
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalancePartitionRevokeTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalancePartitionRevokeTest.java
index 385e658..4556f13 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalancePartitionRevokeTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalancePartitionRevokeTest.java
@@ -63,8 +63,8 @@ public class KafkaConsumerRebalancePartitionRevokeTest 
extends BaseEmbeddedKafka
     @Test
     public void ensurePartitionRevokeCallsWithLastProcessedOffset() throws 
Exception {
         boolean partitionRevokeCalled = messagesLatch.await(30000, 
TimeUnit.MILLISECONDS);
-        assertTrue("StateRepository.setState should have been called with 
offset >= 0 for topic" + TOPIC + 
-                ". Remaining count : " + messagesLatch.getCount(), 
partitionRevokeCalled);
+        assertTrue("StateRepository.setState should have been called with 
offset >= 0 for topic" + TOPIC 
+                + ". Remaining count : " + messagesLatch.getCount(), 
partitionRevokeCalled);
     }
 
     @Override
@@ -92,7 +92,7 @@ public class KafkaConsumerRebalancePartitionRevokeTest 
extends BaseEmbeddedKafka
     }
 
     public class OffsetStateRepository extends MemoryStateRepository {
-        CountDownLatch messagesLatch = null;
+        CountDownLatch messagesLatch;
         
         public OffsetStateRepository(CountDownLatch messagesLatch) {
             this.messagesLatch = messagesLatch;
@@ -114,7 +114,7 @@ public class KafkaConsumerRebalancePartitionRevokeTest 
extends BaseEmbeddedKafka
         @Override
         public void setState(String key, String value) {
             if (key.contains(TOPIC) && messagesLatch.getCount() > 0
-                       && Long.parseLong(value) >= 0) {
+                && Long.parseLong(value) >= 0) {
                 messagesLatch.countDown();
             }
             super.setState(key, value);
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalanceTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalanceTest.java
index 7deef47..077d689 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalanceTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalanceTest.java
@@ -44,8 +44,8 @@ public class KafkaConsumerRebalanceTest extends 
BaseEmbeddedKafkaTest {
     @Test
     public void offsetGetStateMustHaveBeenCalledTwice() throws Exception {
         boolean offsetGetStateCalled = messagesLatch.await(30000, 
TimeUnit.MILLISECONDS);
-        assertTrue("StateRepository.getState should have been called twice for 
topic " + TOPIC + 
-                ". Remaining count : " + messagesLatch.getCount(), 
offsetGetStateCalled);
+        assertTrue("StateRepository.getState should have been called twice for 
topic " + TOPIC  
+                + ". Remaining count : " + messagesLatch.getCount(), 
offsetGetStateCalled);
     }
 
     @Override
@@ -73,7 +73,7 @@ public class KafkaConsumerRebalanceTest extends 
BaseEmbeddedKafkaTest {
     }
 
     public class OffsetStateRepository implements StateRepository<String, 
String> {
-        CountDownLatch messagesLatch = null;
+        CountDownLatch messagesLatch;
         
         public OffsetStateRepository(CountDownLatch messagesLatch) {
             this.messagesLatch = messagesLatch;

Reply via email to