This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 76cd0fdc1b0 fix(KafkaRecordBatchingProcessor): improve handling of 
expired records and batch processing (#18343)
76cd0fdc1b0 is described below

commit 76cd0fdc1b04db00c58a67c1221a9dab6cd09605
Author: Adithya Kashyap H M <adithyahm...@gmail.com>
AuthorDate: Thu Jun 12 11:19:45 2025 +0530

    fix(KafkaRecordBatchingProcessor): improve handling of expired records and 
batch processing (#18343)
    
    Co-authored-by: Adithya Kashyap H M <adithya.kashyap....@walmart.com>
---
 .../consumer/support/batching/KafkaRecordBatchingProcessor.java      | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
index 44193f89c71..90e4ac25998 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
@@ -110,6 +110,7 @@ final class KafkaRecordBatchingProcessor extends 
KafkaRecordProcessor {
             timeoutWatch.takenAndRestart();
         }
 
+        // If timeout has expired, process current batch but continue to 
handle new records
         if (hasExpiredRecords(consumerRecords)) {
             LOG.debug(
                     "The polling timeout has expired with {} records in cache. 
Dispatching the incomplete batch for processing",
@@ -118,9 +119,10 @@ final class KafkaRecordBatchingProcessor extends 
KafkaRecordProcessor {
             // poll timeout has elapsed, so check for expired records
             processBatch(camelKafkaConsumer);
             exchangeList.clear();
-            return ProcessingResult.newUnprocessed();
+            timeoutWatch.takenAndRestart(); // restart timer after processing 
expired batch
         }
 
+        // Always add new records after handling any expiration
         for (ConsumerRecord<Object, Object> consumerRecord : consumerRecords) {
             TopicPartition tp = new TopicPartition(consumerRecord.topic(), 
consumerRecord.partition());
             Exchange childExchange = toExchange(camelKafkaConsumer, tp, 
consumerRecord);
@@ -130,6 +132,7 @@ final class KafkaRecordBatchingProcessor extends 
KafkaRecordProcessor {
             if (exchangeList.size() >= configuration.getMaxPollRecords()) {
                 processBatch(camelKafkaConsumer);
                 exchangeList.clear();
+                timeoutWatch.takenAndRestart(); // restart timer after batch 
processed
             }
         }
 

Reply via email to