This is an automated email from the ASF dual-hosted git repository. klease pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 853477a2d0e CAMEL-18350: fix bug causing all messages to be reconsumed (#8447) 853477a2d0e is described below commit 853477a2d0e38dcbc0d399baa3c3e4a56c973b42 Author: klease <38634989+kle...@users.noreply.github.com> AuthorDate: Thu Sep 29 09:47:19 2022 +0200 CAMEL-18350: fix bug causing all messages to be reconsumed (#8447) When "breakOnFirstError" = "true", if an error occurs on the first message in a set of polled messages, then the offset is set to -1, causing all messages to be refetched and not only the one which failed. Modify KafkaFetchRecords to remember the result returned from the the processing of the previous batch when processing a new batch of polled messages. --- .../java/org/apache/camel/component/kafka/KafkaFetchRecords.java | 5 ++++- .../kafka/consumer/support/KafkaRecordProcessorFacade.java | 7 ++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index 8cab5bcd76a..5684375bbdd 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -307,6 +307,7 @@ public class KafkaFetchRecords implements Runnable { kafkaConsumer, threadId, commitManager, consumerListener); Duration pollDuration = Duration.ofMillis(pollTimeoutMs); + ProcessingResult lastResult = null; while (isKafkaConsumerRunnableAndNotStopped() && isConnected() && pollExceptionStrategy.canContinue()) { ConsumerRecords<Object, Object> allRecords = consumer.poll(pollDuration); if (consumerListener != null) { @@ -315,13 +316,15 @@ public class KafkaFetchRecords implements Runnable { } } - ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords); + ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords, lastResult); if (result.isBreakOnErrorHit()) { LOG.debug("We hit an error ... setting flags to force reconnect"); // force re-connect setReconnect(true); setConnected(false); + } else { + lastResult = result; } updateTaskState(); diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java index 53e519523e7..fbf6f3d09a8 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java @@ -54,16 +54,17 @@ public class KafkaRecordProcessorFacade { return camelKafkaConsumer.isStopping(); } - public ProcessingResult processPolledRecords(ConsumerRecords<Object, Object> allRecords) { + public ProcessingResult processPolledRecords( + ConsumerRecords<Object, Object> allRecords, ProcessingResult resultFromPreviousPoll) { logRecords(allRecords); Set<TopicPartition> partitions = allRecords.partitions(); Iterator<TopicPartition> partitionIterator = partitions.iterator(); - ProcessingResult lastResult = ProcessingResult.newUnprocessed(); + ProcessingResult lastResult + = resultFromPreviousPoll == null ? ProcessingResult.newUnprocessed() : resultFromPreviousPoll; while (partitionIterator.hasNext() && !isStopping()) { - lastResult = ProcessingResult.newUnprocessed(); TopicPartition partition = partitionIterator.next(); List<ConsumerRecord<Object, Object>> partitionRecords = allRecords.records(partition);