This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new b38966f1d02 CAMEL-20380: fixed incomplete batching on poll timeout b38966f1d02 is described below commit b38966f1d027c62cfe54f06ff67ba643f1863ea1 Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Fri Feb 2 15:28:21 2024 +0100 CAMEL-20380: fixed incomplete batching on poll timeout --- .../batching/KafkaRecordBatchingProcessor.java | 42 +++++++++++++++++++--- .../batching/BatchingProcessingITSupport.java | 5 +-- 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java index 082b80d7db9..d9f00d86db4 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java @@ -31,6 +31,7 @@ import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor; import org.apache.camel.component.kafka.consumer.support.ProcessingResult; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.Synchronization; +import org.apache.camel.util.StopWatch; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; @@ -43,6 +44,9 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { private final KafkaConfiguration configuration; private final Processor processor; private final CommitManager commitManager; + private final StopWatch watch = new StopWatch(); + private List<Exchange> exchangeList; + private final class CommitSynchronization implements Synchronization { private final ExceptionHandler exceptionHandler; @@ -107,17 +111,47 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { } public ProcessingResult processExchange(KafkaConsumer camelKafkaConsumer, ConsumerRecords<Object, Object> consumerRecords) { + LOG.debug("There's {} records to process ... max poll is set to {}", consumerRecords.count(), configuration.getMaxPollRecords()); // Aggregate all consumer records in a single exchange - List<Exchange> exchangeList = new ArrayList<>(consumerRecords.count()); + if (exchangeList == null) { + exchangeList = new ArrayList<>(configuration.getMaxPollRecords()); + watch.takenAndRestart(); + } + + if (hasExpiredRecords(consumerRecords)) { + LOG.debug("The polling timeout has expired with {} records in cache. Dispatching the incomplete batch for processing", + exchangeList.size()); + + // poll timeout has elapsed, so check for expired records + processBatch(camelKafkaConsumer); + exchangeList = null; + + return ProcessingResult.newUnprocessed(); + } - // Create an inner exchange for every consumer record retrieved for (ConsumerRecord<Object, Object> consumerRecord : consumerRecords) { TopicPartition tp = new TopicPartition(consumerRecord.topic(), consumerRecord.partition()); - Exchange exchange = toExchange(camelKafkaConsumer, tp, consumerRecord); + Exchange childExchange = toExchange(camelKafkaConsumer, tp, consumerRecord); + + exchangeList.add(childExchange); - exchangeList.add(exchange); + if (exchangeList.size() == configuration.getMaxPollRecords()) { + processBatch(camelKafkaConsumer); + exchangeList = null; + } } + // None of the states provided by the processing result are relevant for batch processing. We can simply return the + // default state + return ProcessingResult.newUnprocessed(); + + } + + private boolean hasExpiredRecords(ConsumerRecords<Object, Object> consumerRecords) { + return !exchangeList.isEmpty() && consumerRecords.isEmpty() && watch.taken() >= configuration.getPollTimeoutMs(); + } + + private ProcessingResult processBatch(KafkaConsumer camelKafkaConsumer) { // Create the bundle exchange final Exchange exchange = camelKafkaConsumer.createExchange(false); final Message message = exchange.getMessage(); diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/BatchingProcessingITSupport.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/BatchingProcessingITSupport.java index a54ccb5464f..58d002d4eec 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/BatchingProcessingITSupport.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/BatchingProcessingITSupport.java @@ -81,9 +81,10 @@ abstract class BatchingProcessingITSupport extends BaseEmbeddedKafkaTestSupport // Second step: We shut down our route, we expect nothing will be recovered by our route contextExtension.getContext().getRouteController().stopRoute("batching"); - // Third step: While our route is stopped, we send 3 records more to a Kafka test topic + // Third step: While our route is stopped, we send 3 records more to a Kafka test topic. + // We should receive NO messages LOG.debug("Starting the third step"); - to.expectedMessageCount(1); + to.expectedMessageCount(0); sendRecords(5, 8, topic); to.assertIsSatisfied(3000);