This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch fix/CAMEL-20227
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 408dab75ccb6d03d91949d63525a2feb6102ed62
Author: Claus Ibsen <[email protected]>
AuthorDate: Sat Jun 6 07:42:36 2026 +0200

    CAMEL-20227: camel-kafka - Fix Pausable EIP losing messages due to offset 
advancement
    
    The Pausable EIP with Kafka consumer was losing messages because:
    1. afterConsume() only evaluated the resume predicate when already paused,
       but never called consumer.pause() or seeked back to committed offsets
    2. The poll loop continued immediately without pausing, causing auto-commit
       to advance offsets for unprocessed records
    
    Fix: Rewrite afterConsume() to always evaluate the predicate, call
    consumer.pause() and seek to the correct offset when pausing, and
    consumer.resume() when resuming. This ensures poll() returns empty
    records during pause and offsets are not advanced for unprocessed messages.
    
    Co-Authored-By: Claude <[email protected]>
    Signed-off-by: Claus Ibsen <[email protected]>
---
 .../errorhandler/KafkaConsumerListener.java        | 47 ++++++++++------------
 1 file changed, 21 insertions(+), 26 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java
index 9d3a2fd9ccb8..9c79052121db 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java
@@ -32,7 +32,6 @@ public class KafkaConsumerListener implements 
ConsumerListener<Object, Processin
     private SeekPolicy seekPolicy;
 
     private Predicate<?> afterConsumeEval;
-    private boolean paused;
 
     public Consumer<?, ?> getConsumer() {
         return consumer;
@@ -57,40 +56,36 @@ public class KafkaConsumerListener implements 
ConsumerListener<Object, Processin
 
     @Override
     public boolean afterConsume(@SuppressWarnings("unused") Object ignored) {
-        if (paused) {
-            if (afterConsumeEval.test(null)) {
-                LOG.warn("State changed, therefore resuming the consumer");
-                consumer.resume(consumer.assignment());
-
-                return true;
-            }
-
-            LOG.warn("The consumer is not yet resumable");
-            return false;
+        boolean resume = afterConsumeEval.test(null);
+        if (resume) {
+            LOG.debug("Resuming consumer");
+            consumer.resume(consumer.assignment());
+        } else {
+            LOG.debug("Pausing consumer");
+            consumer.pause(consumer.assignment());
+            seekConsumer();
         }
-
-        // It's not paused, so we can continue processing
-        return true;
+        return resume;
     }
 
     @Override
     public boolean afterProcess(ProcessingResult result) {
         if (result.isFailed()) {
-            LOG.warn("Pausing consumer due to error on the last processing");
+            LOG.debug("Pausing consumer due to last processing error");
             consumer.pause(consumer.assignment());
-            paused = true;
-
-            if (seekPolicy == SeekPolicy.BEGINNING) {
-                LOG.debug("Seeking from the beginning of topic");
-                consumer.seekToBeginning(consumer.assignment());
-            } else if (seekPolicy == SeekPolicy.END) {
-                LOG.debug("Seeking from the end off the topic");
-                consumer.seekToEnd(consumer.assignment());
-            }
-
+            seekConsumer();
             return false;
         }
-
         return true;
     }
+
+    protected void seekConsumer() {
+        if (seekPolicy == SeekPolicy.BEGINNING) {
+            LOG.debug("Seeking to beginning of topic");
+            consumer.seekToBeginning(consumer.assignment());
+        } else if (seekPolicy == SeekPolicy.END) {
+            LOG.debug("Seeking to end of topic");
+            consumer.seekToEnd(consumer.assignment());
+        }
+    }
 }

Reply via email to