This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-3.18.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 8aebc9d7ad659b4238aefa215c63d6b438bb9af5 Author: geekr <geek.ru...@gmail.com> AuthorDate: Tue Oct 11 15:01:11 2022 -0400 [camel-18588] Added condition to only commit offset if it is not alreday paused --- .../java/org/apache/camel/component/kafka/KafkaFetchRecords.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index 5684375bbdd..edf39671436 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -317,8 +317,8 @@ public class KafkaFetchRecords implements Runnable { } ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords, lastResult); - - if (result.isBreakOnErrorHit()) { + updateTaskState(); + if (result.isBreakOnErrorHit() && !this.state.equals(State.PAUSED)) { LOG.debug("We hit an error ... setting flags to force reconnect"); // force re-connect setReconnect(true); @@ -327,7 +327,6 @@ public class KafkaFetchRecords implements Runnable { lastResult = result; } - updateTaskState(); } if (!isConnected()) {