This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-4.10.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.10.x by this push: new 0b7654b0841 fix(KafkaRecordBatchingProcessor): improve handling of expired records and batch processing (#18343) 0b7654b0841 is described below commit 0b7654b08410e377ebe754da250f9441789c31ef Author: Adithya Kashyap H M <adithyahm...@gmail.com> AuthorDate: Thu Jun 12 11:19:45 2025 +0530 fix(KafkaRecordBatchingProcessor): improve handling of expired records and batch processing (#18343) Co-authored-by: Adithya Kashyap H M <adithya.kashyap....@walmart.com> --- .../consumer/support/batching/KafkaRecordBatchingProcessor.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java index 09247cddc39..ccf4b5478c2 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java @@ -110,6 +110,7 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { timeoutWatch.takenAndRestart(); } + // If timeout has expired, process current batch but continue to handle new records if (hasExpiredRecords(consumerRecords)) { LOG.debug( "The polling timeout has expired with {} records in cache. Dispatching the incomplete batch for processing", @@ -118,9 +119,10 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { // poll timeout has elapsed, so check for expired records processBatch(camelKafkaConsumer); exchangeList.clear(); - return ProcessingResult.newUnprocessed(); + timeoutWatch.takenAndRestart(); // restart timer after processing expired batch } + // Always add new records after handling any expiration for (ConsumerRecord<Object, Object> consumerRecord : consumerRecords) { TopicPartition tp = new TopicPartition(consumerRecord.topic(), consumerRecord.partition()); Exchange childExchange = toExchange(camelKafkaConsumer, tp, consumerRecord); @@ -130,6 +132,7 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { if (exchangeList.size() >= configuration.getMaxPollRecords()) { processBatch(camelKafkaConsumer); exchangeList.clear(); + timeoutWatch.takenAndRestart(); // restart timer after batch processed } }