This is an automated email from the ASF dual-hosted git repository. klease pushed a commit to branch backport-camel-18350 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 0b7622386107ad163e2caefb5b14c0c661d8038e Author: klease <kle...@cegetel.net> AuthorDate: Thu Sep 29 16:47:32 2022 +0200 CAMEL-18350: backport the fix for Kafka "breakOnFirst" error --- .../apache/camel/component/kafka/KafkaFetchRecords.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index db425700532..f0f9413880a 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -156,8 +156,7 @@ class KafkaFetchRecords implements Runnable { } protected void startPolling() { - long partitionLastOffset = -1; - + KafkaRecordProcessor.ProcessResult lastResult = null; try { /* * We lock the processing of the record to avoid raising a WakeUpException as a result to a call @@ -179,7 +178,7 @@ class KafkaFetchRecords implements Runnable { processAsyncCommits(); - partitionLastOffset = processPolledRecords(allRecords, kafkaRecordProcessor); + lastResult = processPolledRecords(allRecords, kafkaRecordProcessor, lastResult); } if (!isConnected()) { @@ -213,7 +212,7 @@ class KafkaFetchRecords implements Runnable { e.getClass().getName(), threadId, getPrintableTopic(), lastProcessedOffset, e.getMessage()); } - handleAccordingToStrategy(partitionLastOffset, e); + handleAccordingToStrategy(lastResult.getPartitionLastOffset(), e); } finally { lock.unlock(); @@ -338,16 +337,18 @@ class KafkaFetchRecords implements Runnable { return kafkaConsumer.getEndpoint().getCamelContext().isStopping() && !kafkaConsumer.isRunAllowed(); } - private long processPolledRecords(ConsumerRecords<Object, Object> allRecords, KafkaRecordProcessor kafkaRecordProcessor) { + private KafkaRecordProcessor.ProcessResult processPolledRecords( + ConsumerRecords<Object, Object> allRecords, KafkaRecordProcessor kafkaRecordProcessor, + KafkaRecordProcessor.ProcessResult resultFromPreviousPoll) { logRecords(allRecords); Set<TopicPartition> partitions = allRecords.partitions(); Iterator<TopicPartition> partitionIterator = partitions.iterator(); - KafkaRecordProcessor.ProcessResult lastResult = KafkaRecordProcessor.ProcessResult.newUnprocessed(); + KafkaRecordProcessor.ProcessResult lastResult + = resultFromPreviousPoll == null ? KafkaRecordProcessor.ProcessResult.newUnprocessed() : resultFromPreviousPoll; while (partitionIterator.hasNext() && !isStopping()) { - lastResult = KafkaRecordProcessor.ProcessResult.newUnprocessed(); TopicPartition partition = partitionIterator.next(); List<ConsumerRecord<Object, Object>> partitionRecords = allRecords.records(partition); @@ -377,7 +378,7 @@ class KafkaFetchRecords implements Runnable { setRetry(false); // to close the current consumer } - return lastResult.getPartitionLastOffset(); + return lastResult; } private void logRecordsInPartition(List<ConsumerRecord<Object, Object>> partitionRecords, TopicPartition partition) {