This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 76cd0fdc1b0 fix(KafkaRecordBatchingProcessor): improve handling of expired records and batch processing (#18343) 76cd0fdc1b0 is described below commit 76cd0fdc1b04db00c58a67c1221a9dab6cd09605 Author: Adithya Kashyap H M <adithyahm...@gmail.com> AuthorDate: Thu Jun 12 11:19:45 2025 +0530 fix(KafkaRecordBatchingProcessor): improve handling of expired records and batch processing (#18343) Co-authored-by: Adithya Kashyap H M <adithya.kashyap....@walmart.com> --- .../consumer/support/batching/KafkaRecordBatchingProcessor.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java index 44193f89c71..90e4ac25998 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java @@ -110,6 +110,7 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { timeoutWatch.takenAndRestart(); } + // If timeout has expired, process current batch but continue to handle new records if (hasExpiredRecords(consumerRecords)) { LOG.debug( "The polling timeout has expired with {} records in cache. Dispatching the incomplete batch for processing", @@ -118,9 +119,10 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { // poll timeout has elapsed, so check for expired records processBatch(camelKafkaConsumer); exchangeList.clear(); - return ProcessingResult.newUnprocessed(); + timeoutWatch.takenAndRestart(); // restart timer after processing expired batch } + // Always add new records after handling any expiration for (ConsumerRecord<Object, Object> consumerRecord : consumerRecords) { TopicPartition tp = new TopicPartition(consumerRecord.topic(), consumerRecord.partition()); Exchange childExchange = toExchange(camelKafkaConsumer, tp, consumerRecord); @@ -130,6 +132,7 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { if (exchangeList.size() >= configuration.getMaxPollRecords()) { processBatch(camelKafkaConsumer); exchangeList.clear(); + timeoutWatch.takenAndRestart(); // restart timer after batch processed } }