This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new b38966f1d02 CAMEL-20380: fixed incomplete batching on poll timeout
b38966f1d02 is described below

commit b38966f1d027c62cfe54f06ff67ba643f1863ea1
Author: Otavio Rodolfo Piske <angusyo...@gmail.com>
AuthorDate: Fri Feb 2 15:28:21 2024 +0100

    CAMEL-20380: fixed incomplete batching on poll timeout
---
 .../batching/KafkaRecordBatchingProcessor.java     | 42 +++++++++++++++++++---
 .../batching/BatchingProcessingITSupport.java      |  5 +--
 2 files changed, 41 insertions(+), 6 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
index 082b80d7db9..d9f00d86db4 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
@@ -31,6 +31,7 @@ import 
org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor;
 import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.Synchronization;
+import org.apache.camel.util.StopWatch;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.TopicPartition;
@@ -43,6 +44,9 @@ final class KafkaRecordBatchingProcessor extends 
KafkaRecordProcessor {
     private final KafkaConfiguration configuration;
     private final Processor processor;
     private final CommitManager commitManager;
+    private final StopWatch watch = new StopWatch();
+    private List<Exchange> exchangeList;
+
 
     private final class CommitSynchronization implements Synchronization {
         private final ExceptionHandler exceptionHandler;
@@ -107,17 +111,47 @@ final class KafkaRecordBatchingProcessor extends 
KafkaRecordProcessor {
     }
 
     public ProcessingResult processExchange(KafkaConsumer camelKafkaConsumer, 
ConsumerRecords<Object, Object> consumerRecords) {
+        LOG.debug("There's {} records to process ... max poll is set to {}", 
consumerRecords.count(), configuration.getMaxPollRecords());
         // Aggregate all consumer records in a single exchange
-        List<Exchange> exchangeList = new ArrayList<>(consumerRecords.count());
+        if (exchangeList == null) {
+            exchangeList = new ArrayList<>(configuration.getMaxPollRecords());
+            watch.takenAndRestart();
+        }
+
+        if (hasExpiredRecords(consumerRecords)) {
+            LOG.debug("The polling timeout has expired with {} records in 
cache. Dispatching the incomplete batch for processing",
+                    exchangeList.size());
+
+            // poll timeout has elapsed, so check for expired records
+            processBatch(camelKafkaConsumer);
+            exchangeList = null;
+
+            return ProcessingResult.newUnprocessed();
+        }
 
-        // Create an inner exchange for every consumer record retrieved
         for (ConsumerRecord<Object, Object> consumerRecord : consumerRecords) {
             TopicPartition tp = new TopicPartition(consumerRecord.topic(), 
consumerRecord.partition());
-            Exchange exchange = toExchange(camelKafkaConsumer, tp, 
consumerRecord);
+            Exchange childExchange = toExchange(camelKafkaConsumer, tp, 
consumerRecord);
+
+            exchangeList.add(childExchange);
 
-            exchangeList.add(exchange);
+            if (exchangeList.size() == configuration.getMaxPollRecords()) {
+                processBatch(camelKafkaConsumer);
+                exchangeList = null;
+            }
         }
 
+        // None of the states provided by the processing result are relevant 
for batch processing. We can simply return the
+        // default state
+        return ProcessingResult.newUnprocessed();
+
+    }
+
+    private boolean hasExpiredRecords(ConsumerRecords<Object, Object> 
consumerRecords) {
+        return !exchangeList.isEmpty() && consumerRecords.isEmpty() && 
watch.taken() >= configuration.getPollTimeoutMs();
+    }
+
+    private ProcessingResult processBatch(KafkaConsumer camelKafkaConsumer) {
         // Create the bundle exchange
         final Exchange exchange = camelKafkaConsumer.createExchange(false);
         final Message message = exchange.getMessage();
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/BatchingProcessingITSupport.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/BatchingProcessingITSupport.java
index a54ccb5464f..58d002d4eec 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/BatchingProcessingITSupport.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/BatchingProcessingITSupport.java
@@ -81,9 +81,10 @@ abstract class BatchingProcessingITSupport extends 
BaseEmbeddedKafkaTestSupport
         // Second step: We shut down our route, we expect nothing will be 
recovered by our route
         
contextExtension.getContext().getRouteController().stopRoute("batching");
 
-        // Third step: While our route is stopped, we send 3 records more to a 
Kafka test topic
+        // Third step: While our route is stopped, we send 3 records more to a 
Kafka test topic.
+        // We should receive NO messages
         LOG.debug("Starting the third step");
-        to.expectedMessageCount(1);
+        to.expectedMessageCount(0);
         sendRecords(5, 8, topic);
 
         to.assertIsSatisfied(3000);

Reply via email to