This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 40f51d1  (chores) camel-kafka: cleanups related to CAMEL-16949
40f51d1 is described below

commit 40f51d1cb83253407b859071085eed35464de1f1
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Mon Sep 20 14:41:54 2021 +0200

    (chores) camel-kafka: cleanups related to CAMEL-16949
    
    - avoid rebuilding the processor for every polled record
    - make the exception processor method private
    - remove unused variables
    - make the reconnect variable not volatile as it is unnecessary and
      local to the thread
    - make the retry variable not volatile as it is unnecessary
    - avoid recreating the poll duration variable
    - stop creating the lock in the inner loop and move it to the main loop
    - remove unnecessary parameter thread ID in some methods as it is already 
available as a member variable to the record processors
    - stop overwriting the local lastResult parameter
    - fix an incorrect point of commit
---
 .../camel/component/kafka/KafkaFetchRecords.java   | 67 +++++++++++-----------
 .../consumer/support/KafkaRecordProcessor.java     |  6 +-
 2 files changed, 35 insertions(+), 38 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 720ee8d..e96dc6a 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -32,7 +32,6 @@ import java.util.regex.Pattern;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor;
 import 
org.apache.camel.component.kafka.consumer.support.PartitionAssignmentListener;
-import org.apache.camel.component.kafka.consumer.support.ResumeStrategy;
 import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
 import org.apache.camel.util.IOHelper;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -60,9 +59,8 @@ class KafkaFetchRecords implements Runnable {
     private final ReentrantLock lock = new ReentrantLock();
     private final AtomicBoolean stopping = new AtomicBoolean(false);
 
-    private volatile boolean retry = true;
-    private volatile boolean reconnect = true;
-    private ResumeStrategy resumeStrategy;
+    private boolean retry = true;
+    private boolean reconnect = true;
 
     KafkaFetchRecords(KafkaConsumer kafkaConsumer, PollExceptionStrategy 
pollExceptionStrategy,
                       BridgeExceptionHandlerToErrorHandler bridge, String 
topicName, Pattern topicPattern, String id,
@@ -157,13 +155,22 @@ class KafkaFetchRecords implements Runnable {
         long partitionLastOffset = -1;
 
         try {
+            /*
+             * We lock the processing of the record to avoid raising a 
WakeUpException as a result to a call
+             * to stop() or shutdown().
+             */
+            lock.lock();
+
             long pollTimeoutMs = 
kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
             LOG.trace("Polling {} from topic: {} with timeout: {}", threadId, 
topicName, pollTimeoutMs);
 
+            KafkaRecordProcessor kafkaRecordProcessor = 
buildKafkaRecordProcessor();
+
+            Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
             while (isKafkaConsumerRunnable() && isRetrying() && 
!isReconnecting()) {
-                ConsumerRecords<Object, Object> allRecords = 
consumer.poll(Duration.ofMillis(pollTimeoutMs));
+                ConsumerRecords<Object, Object> allRecords = 
consumer.poll(pollDuration);
 
-                partitionLastOffset = processPolledRecords(allRecords);
+                partitionLastOffset = processPolledRecords(allRecords, 
kafkaRecordProcessor);
             }
 
             if (!isReconnecting()) {
@@ -195,6 +202,8 @@ class KafkaFetchRecords implements Runnable {
 
             handleAccordingToStrategy(partitionLastOffset, e);
         } finally {
+            lock.unlock();
+
             // only close if not retry
             if (!isRetrying()) {
                 LOG.debug("Closing consumer {}", threadId);
@@ -292,7 +301,7 @@ class KafkaFetchRecords implements Runnable {
         return kafkaConsumer.getEndpoint().getCamelContext().isStopping() && 
!kafkaConsumer.isRunAllowed();
     }
 
-    private long processPolledRecords(ConsumerRecords<Object, Object> 
allRecords) {
+    private long processPolledRecords(ConsumerRecords<Object, Object> 
allRecords, KafkaRecordProcessor kafkaRecordProcessor) {
         logRecords(allRecords);
 
         Set<TopicPartition> partitions = allRecords.partitions();
@@ -309,24 +318,17 @@ class KafkaFetchRecords implements Runnable {
 
             logRecordsInPartition(partitionRecords, partition);
 
-            KafkaRecordProcessor kafkaRecordProcessor = 
buildKafkaRecordProcessor();
-
-            try {
-                /*
-                 * We lock the processing of the record to avoid raising a 
WakeUpException as a result to a call
-                 * to stop() or shutdown().
-                 */
-                lock.lock();
-
-                while (!lastResult.isBreakOnErrorHit() && 
recordIterator.hasNext() && !isStopping()) {
-                    ConsumerRecord<Object, Object> record = 
recordIterator.next();
+            while (!lastResult.isBreakOnErrorHit() && recordIterator.hasNext() 
&& !isStopping()) {
+                ConsumerRecord<Object, Object> record = recordIterator.next();
 
-                    lastResult = processRecord(partition, 
partitionIterator.hasNext(), recordIterator.hasNext(), lastResult,
-                            kafkaRecordProcessor, record);
+                lastResult = processRecord(partition, 
partitionIterator.hasNext(), recordIterator.hasNext(), lastResult,
+                        kafkaRecordProcessor, record);
+            }
 
-                }
-            } finally {
-                lock.unlock();
+            if (!lastResult.isBreakOnErrorHit()) {
+                LOG.debug("Committing offset on successful execution");
+                // all records processed from partition so commit them
+                kafkaRecordProcessor.commitOffset(partition, 
lastResult.getPartitionLastOffset(), false, false);
             }
         }
 
@@ -357,7 +359,7 @@ class KafkaFetchRecords implements Runnable {
             TopicPartition partition,
             boolean partitionHasNext,
             boolean recordHasNext,
-            KafkaRecordProcessor.ProcessResult lastResult,
+            final KafkaRecordProcessor.ProcessResult lastResult,
             KafkaRecordProcessor kafkaRecordProcessor,
             ConsumerRecord<Object, Object> record) {
 
@@ -365,23 +367,18 @@ class KafkaFetchRecords implements Runnable {
 
         Exchange exchange = kafkaConsumer.createExchange(false);
 
-        lastResult = kafkaRecordProcessor.processExchange(exchange, partition, 
partitionHasNext,
-                recordHasNext, record, lastResult, 
kafkaConsumer.getExceptionHandler());
+        KafkaRecordProcessor.ProcessResult currentResult
+                = kafkaRecordProcessor.processExchange(exchange, partition, 
partitionHasNext,
+                        recordHasNext, record, lastResult, 
kafkaConsumer.getExceptionHandler());
 
-        if (!lastResult.isBreakOnErrorHit()) {
-            lastProcessedOffset.put(serializeOffsetKey(partition), 
lastResult.getPartitionLastOffset());
+        if (!currentResult.isBreakOnErrorHit()) {
+            lastProcessedOffset.put(serializeOffsetKey(partition), 
currentResult.getPartitionLastOffset());
         }
 
         // success so release the exchange
         kafkaConsumer.releaseExchange(exchange, false);
 
-        if (!lastResult.isBreakOnErrorHit()) {
-            LOG.debug("Committing offset on successful execution");
-            // all records processed from partition so commit them
-            kafkaRecordProcessor.commitOffset(partition, 
lastResult.getPartitionLastOffset(), false, false,
-                    threadId);
-        }
-        return lastResult;
+        return currentResult;
     }
 
     private void logRecord(ConsumerRecord<Object, Object> record) {
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
index f1a70b0..734861e 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
@@ -159,7 +159,7 @@ public class KafkaRecordProcessor {
         return new ProcessResult(false, record.offset());
     }
 
-    public boolean processException(
+    private boolean processException(
             Exchange exchange, TopicPartition partition, long 
partitionLastOffset,
             ExceptionHandler exceptionHandler) {
 
@@ -170,7 +170,7 @@ public class KafkaRecordProcessor {
             LOG.warn("Will seek consumer to offset {} and start polling 
again.", partitionLastOffset);
 
             // force commit, so we resume on next poll where we failed
-            commitOffset(partition, partitionLastOffset, false, true, 
threadId);
+            commitOffset(partition, partitionLastOffset, false, true);
 
             // continue to next partition
             return true;
@@ -183,7 +183,7 @@ public class KafkaRecordProcessor {
     }
 
     public void commitOffset(
-            TopicPartition partition, long partitionLastOffset, boolean 
stopping, boolean forceCommit, String threadId) {
+            TopicPartition partition, long partitionLastOffset, boolean 
stopping, boolean forceCommit) {
         commitOffset(configuration, consumer, partition, partitionLastOffset, 
stopping, forceCommit, threadId);
     }
 

Reply via email to