This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-3.18.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit d407362d0571cedcb2dfd4d04d83263b8da729d3 Author: geekr <geek.ru...@gmail.com> AuthorDate: Mon Oct 10 15:38:32 2022 -0400 [camel-18588] Added condition to only commit offset if it is not -1 --- .../component/kafka/consumer/support/KafkaRecordProcessor.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java index 38763385b78..1afe53cbe2b 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java @@ -23,6 +23,7 @@ import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.component.kafka.KafkaConfiguration; import org.apache.camel.component.kafka.KafkaConstants; +import org.apache.camel.component.kafka.consumer.AbstractCommitManager; import org.apache.camel.component.kafka.consumer.CommitManager; import org.apache.camel.component.kafka.consumer.KafkaManualCommit; import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer; @@ -131,8 +132,11 @@ public class KafkaRecordProcessor { LOG.warn("Will seek consumer to offset {} and start polling again.", partitionLastOffset); } - // force commit, so we resume on next poll where we failed - commitManager.forceCommit(partition, partitionLastOffset); + // force commit, so we resume on next poll where we failed except when the failure happened + // at the first message in a poll + if (partitionLastOffset != AbstractCommitManager.START_OFFSET) { + commitManager.forceCommit(partition, partitionLastOffset); + } // continue to next partition return true;