This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 40f51d1 (chores) camel-kafka: cleanups related to CAMEL-16949 40f51d1 is described below commit 40f51d1cb83253407b859071085eed35464de1f1 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Mon Sep 20 14:41:54 2021 +0200 (chores) camel-kafka: cleanups related to CAMEL-16949 - avoid rebuilding the processor for every polled record - make the exception processor method private - remove unused variables - make the reconnect variable not volatile as it is unnecessary and local to the thread - make the retry variable not volatile as it is unnecessary - avoid recreating the poll duration variable - stop creating the lock in the inner loop and move it to the main loop - remove unnecessary parameter thread ID in some methods as it is already available as a member variable to the record processors - stop overwriting the local lastResult parameter - fix an incorrect point of commit --- .../camel/component/kafka/KafkaFetchRecords.java | 67 +++++++++++----------- .../consumer/support/KafkaRecordProcessor.java | 6 +- 2 files changed, 35 insertions(+), 38 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index 720ee8d..e96dc6a 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -32,7 +32,6 @@ import java.util.regex.Pattern; import org.apache.camel.Exchange; import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor; import org.apache.camel.component.kafka.consumer.support.PartitionAssignmentListener; -import org.apache.camel.component.kafka.consumer.support.ResumeStrategy; import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler; import org.apache.camel.util.IOHelper; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -60,9 +59,8 @@ class KafkaFetchRecords implements Runnable { private final ReentrantLock lock = new ReentrantLock(); private final AtomicBoolean stopping = new AtomicBoolean(false); - private volatile boolean retry = true; - private volatile boolean reconnect = true; - private ResumeStrategy resumeStrategy; + private boolean retry = true; + private boolean reconnect = true; KafkaFetchRecords(KafkaConsumer kafkaConsumer, PollExceptionStrategy pollExceptionStrategy, BridgeExceptionHandlerToErrorHandler bridge, String topicName, Pattern topicPattern, String id, @@ -157,13 +155,22 @@ class KafkaFetchRecords implements Runnable { long partitionLastOffset = -1; try { + /* + * We lock the processing of the record to avoid raising a WakeUpException as a result to a call + * to stop() or shutdown(). + */ + lock.lock(); + long pollTimeoutMs = kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs(); LOG.trace("Polling {} from topic: {} with timeout: {}", threadId, topicName, pollTimeoutMs); + KafkaRecordProcessor kafkaRecordProcessor = buildKafkaRecordProcessor(); + + Duration pollDuration = Duration.ofMillis(pollTimeoutMs); while (isKafkaConsumerRunnable() && isRetrying() && !isReconnecting()) { - ConsumerRecords<Object, Object> allRecords = consumer.poll(Duration.ofMillis(pollTimeoutMs)); + ConsumerRecords<Object, Object> allRecords = consumer.poll(pollDuration); - partitionLastOffset = processPolledRecords(allRecords); + partitionLastOffset = processPolledRecords(allRecords, kafkaRecordProcessor); } if (!isReconnecting()) { @@ -195,6 +202,8 @@ class KafkaFetchRecords implements Runnable { handleAccordingToStrategy(partitionLastOffset, e); } finally { + lock.unlock(); + // only close if not retry if (!isRetrying()) { LOG.debug("Closing consumer {}", threadId); @@ -292,7 +301,7 @@ class KafkaFetchRecords implements Runnable { return kafkaConsumer.getEndpoint().getCamelContext().isStopping() && !kafkaConsumer.isRunAllowed(); } - private long processPolledRecords(ConsumerRecords<Object, Object> allRecords) { + private long processPolledRecords(ConsumerRecords<Object, Object> allRecords, KafkaRecordProcessor kafkaRecordProcessor) { logRecords(allRecords); Set<TopicPartition> partitions = allRecords.partitions(); @@ -309,24 +318,17 @@ class KafkaFetchRecords implements Runnable { logRecordsInPartition(partitionRecords, partition); - KafkaRecordProcessor kafkaRecordProcessor = buildKafkaRecordProcessor(); - - try { - /* - * We lock the processing of the record to avoid raising a WakeUpException as a result to a call - * to stop() or shutdown(). - */ - lock.lock(); - - while (!lastResult.isBreakOnErrorHit() && recordIterator.hasNext() && !isStopping()) { - ConsumerRecord<Object, Object> record = recordIterator.next(); + while (!lastResult.isBreakOnErrorHit() && recordIterator.hasNext() && !isStopping()) { + ConsumerRecord<Object, Object> record = recordIterator.next(); - lastResult = processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(), lastResult, - kafkaRecordProcessor, record); + lastResult = processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(), lastResult, + kafkaRecordProcessor, record); + } - } - } finally { - lock.unlock(); + if (!lastResult.isBreakOnErrorHit()) { + LOG.debug("Committing offset on successful execution"); + // all records processed from partition so commit them + kafkaRecordProcessor.commitOffset(partition, lastResult.getPartitionLastOffset(), false, false); } } @@ -357,7 +359,7 @@ class KafkaFetchRecords implements Runnable { TopicPartition partition, boolean partitionHasNext, boolean recordHasNext, - KafkaRecordProcessor.ProcessResult lastResult, + final KafkaRecordProcessor.ProcessResult lastResult, KafkaRecordProcessor kafkaRecordProcessor, ConsumerRecord<Object, Object> record) { @@ -365,23 +367,18 @@ class KafkaFetchRecords implements Runnable { Exchange exchange = kafkaConsumer.createExchange(false); - lastResult = kafkaRecordProcessor.processExchange(exchange, partition, partitionHasNext, - recordHasNext, record, lastResult, kafkaConsumer.getExceptionHandler()); + KafkaRecordProcessor.ProcessResult currentResult + = kafkaRecordProcessor.processExchange(exchange, partition, partitionHasNext, + recordHasNext, record, lastResult, kafkaConsumer.getExceptionHandler()); - if (!lastResult.isBreakOnErrorHit()) { - lastProcessedOffset.put(serializeOffsetKey(partition), lastResult.getPartitionLastOffset()); + if (!currentResult.isBreakOnErrorHit()) { + lastProcessedOffset.put(serializeOffsetKey(partition), currentResult.getPartitionLastOffset()); } // success so release the exchange kafkaConsumer.releaseExchange(exchange, false); - if (!lastResult.isBreakOnErrorHit()) { - LOG.debug("Committing offset on successful execution"); - // all records processed from partition so commit them - kafkaRecordProcessor.commitOffset(partition, lastResult.getPartitionLastOffset(), false, false, - threadId); - } - return lastResult; + return currentResult; } private void logRecord(ConsumerRecord<Object, Object> record) { diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java index f1a70b0..734861e 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java @@ -159,7 +159,7 @@ public class KafkaRecordProcessor { return new ProcessResult(false, record.offset()); } - public boolean processException( + private boolean processException( Exchange exchange, TopicPartition partition, long partitionLastOffset, ExceptionHandler exceptionHandler) { @@ -170,7 +170,7 @@ public class KafkaRecordProcessor { LOG.warn("Will seek consumer to offset {} and start polling again.", partitionLastOffset); // force commit, so we resume on next poll where we failed - commitOffset(partition, partitionLastOffset, false, true, threadId); + commitOffset(partition, partitionLastOffset, false, true); // continue to next partition return true; @@ -183,7 +183,7 @@ public class KafkaRecordProcessor { } public void commitOffset( - TopicPartition partition, long partitionLastOffset, boolean stopping, boolean forceCommit, String threadId) { + TopicPartition partition, long partitionLastOffset, boolean stopping, boolean forceCommit) { commitOffset(configuration, consumer, partition, partitionLastOffset, stopping, forceCommit, threadId); }