This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch kaf2 in repository https://gitbox.apache.org/repos/asf/camel.git
commit cf16440efe585b389551e86ba99e3d6559379c31 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Feb 3 09:03:06 2025 +0100 CAMEL-21699: camel-kafka - Batch consumer should not require message body to keep being List<Exchange> --- .../camel/component/kafka/KafkaProducer.java | 42 ------------- .../kafka/KafkaTransactionSynchronization.java | 63 +++++++++++++++++++ .../batching/KafkaRecordBatchingProcessor.java | 70 ++++++++-------------- .../KafkaRecordBatchingProcessorFacade.java | 10 +--- 4 files changed, 91 insertions(+), 94 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index 807e550e716..572c8d1b751 100755 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -41,18 +41,15 @@ import org.apache.camel.health.HealthCheckHelper; import org.apache.camel.health.WritableHealthCheckRepository; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.support.DefaultAsyncProducer; -import org.apache.camel.support.SynchronizationAdapter; import org.apache.camel.util.KeyValueHolder; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ReflectionHelper; import org.apache.camel.util.URISupport; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.NetworkClient; -import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.slf4j.Logger; @@ -529,42 +526,3 @@ public class KafkaProducer extends DefaultAsyncProducer { exchange.getUnitOfWork().addSynchronization(new KafkaTransactionSynchronization(transactionId, kafkaProducer)); } } - -class KafkaTransactionSynchronization extends SynchronizationAdapter { - private static final Logger LOG = LoggerFactory.getLogger(KafkaTransactionSynchronization.class); - private final String transactionId; - private final Producer kafkaProducer; - - public KafkaTransactionSynchronization(String transactionId, Producer kafkaProducer) { - this.transactionId = transactionId; - this.kafkaProducer = kafkaProducer; - } - - @Override - public void onDone(Exchange exchange) { - try { - if (exchange.getException() != null || exchange.isRollbackOnly()) { - if (exchange.getException() instanceof KafkaException) { - LOG.warn("Catch {} and will close kafka producer with transaction {} ", exchange.getException(), - transactionId); - kafkaProducer.close(); - } else { - LOG.warn("Abort kafka transaction {} with exchange {}", transactionId, exchange.getExchangeId()); - kafkaProducer.abortTransaction(); - } - } else { - LOG.debug("Commit kafka transaction {} with exchange {}", transactionId, exchange.getExchangeId()); - kafkaProducer.commitTransaction(); - } - } catch (KafkaException e) { - exchange.setException(e); - } catch (Exception e) { - exchange.setException(e); - LOG.warn("Abort kafka transaction {} with exchange {} due to {} ", transactionId, exchange.getExchangeId(), - e.getMessage(), e); - kafkaProducer.abortTransaction(); - } finally { - exchange.getUnitOfWork().endTransactedBy(transactionId); - } - } -} diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaTransactionSynchronization.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaTransactionSynchronization.java new file mode 100644 index 00000000000..2525211f787 --- /dev/null +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaTransactionSynchronization.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka; + +import org.apache.camel.Exchange; +import org.apache.camel.support.SynchronizationAdapter; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.KafkaException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class KafkaTransactionSynchronization extends SynchronizationAdapter { + private static final Logger LOG = LoggerFactory.getLogger(KafkaTransactionSynchronization.class); + private final String transactionId; + private final Producer kafkaProducer; + + public KafkaTransactionSynchronization(String transactionId, Producer kafkaProducer) { + this.transactionId = transactionId; + this.kafkaProducer = kafkaProducer; + } + + @Override + public void onDone(Exchange exchange) { + try { + if (exchange.getException() != null || exchange.isRollbackOnly()) { + if (exchange.getException() instanceof KafkaException) { + LOG.warn("Catch {} and will close kafka producer with transaction {} ", exchange.getException(), + transactionId); + kafkaProducer.close(); + } else { + LOG.warn("Abort kafka transaction {} with exchange {}", transactionId, exchange.getExchangeId()); + kafkaProducer.abortTransaction(); + } + } else { + LOG.debug("Commit kafka transaction {} with exchange {}", transactionId, exchange.getExchangeId()); + kafkaProducer.commitTransaction(); + } + } catch (KafkaException e) { + exchange.setException(e); + } catch (Exception e) { + exchange.setException(e); + LOG.warn("Abort kafka transaction {} with exchange {} due to {} ", transactionId, exchange.getExchangeId(), + e.getMessage(), e); + kafkaProducer.abortTransaction(); + } finally { + exchange.getUnitOfWork().endTransactedBy(transactionId); + } + } +} diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java index 3e10a65e0ea..2568f3a202e 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java @@ -16,7 +16,6 @@ */ package org.apache.camel.component.kafka.consumer.support.batching; -import java.util.List; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; @@ -40,6 +39,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { + private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordBatchingProcessor.class); private final KafkaConfiguration configuration; @@ -50,25 +50,17 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { private final class CommitSynchronization implements Synchronization { private final ExceptionHandler exceptionHandler; - private ProcessingResult result; + private final int size; - public CommitSynchronization(ExceptionHandler exceptionHandler) { + public CommitSynchronization(ExceptionHandler exceptionHandler, int size) { this.exceptionHandler = exceptionHandler; + this.size = size; } @Override public void onComplete(Exchange exchange) { - final List<?> exchanges = exchange.getMessage().getBody(List.class); - - // Ensure we are actually receiving what we are asked for - if (exchanges == null || exchanges.isEmpty()) { - LOG.warn("The exchange is {}", exchanges == null ? "not of the expected type (null)" : "empty"); - return; - } - - LOG.debug("Calling commit on {} exchanges using {}", exchanges.size(), commitManager.getClass().getSimpleName()); + LOG.debug("Calling commit on {} exchanges using {}", size, commitManager.getClass().getSimpleName()); commitManager.commit(); - result = new ProcessingResult(false, false); } @Override @@ -81,8 +73,6 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { LOG.warn( "Skipping auto-commit on the batch because processing the exchanged has failed and the error was not correctly handled"); } - - result = new ProcessingResult(false, true); } } @@ -90,8 +80,7 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { this.configuration = configuration; this.processor = processor; this.commitManager = commitManager; - - this.exchangeList = new ArrayBlockingQueue<Exchange>(configuration.getMaxPollRecords()); + this.exchangeList = new ArrayBlockingQueue<>(configuration.getMaxPollRecords()); } public Exchange toExchange( @@ -105,7 +94,6 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { if (configuration.isAllowManualCommit()) { KafkaManualCommit manual = commitManager.getManualCommit(exchange, topicPartition, consumerRecord); - message.setHeader(KafkaConstants.MANUAL_COMMIT, manual); } @@ -115,6 +103,7 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { public ProcessingResult processExchange(KafkaConsumer camelKafkaConsumer, ConsumerRecords<Object, Object> consumerRecords) { LOG.debug("There's {} records to process ... max poll is set to {}", consumerRecords.count(), configuration.getMaxPollRecords()); + // Aggregate all consumer records in a single exchange if (exchangeList.isEmpty()) { watch.takenAndRestart(); @@ -138,7 +127,7 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { exchangeList.add(childExchange); - if (exchangeList.size() == configuration.getMaxPollRecords()) { + if (exchangeList.size() >= configuration.getMaxPollRecords()) { processBatch(camelKafkaConsumer); exchangeList.clear(); } @@ -153,17 +142,17 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { return !exchangeList.isEmpty() && consumerRecords.isEmpty() && watch.taken() >= configuration.getPollTimeoutMs(); } - private ProcessingResult processBatch(KafkaConsumer camelKafkaConsumer) { + private void processBatch(KafkaConsumer camelKafkaConsumer) { // Create the bundle exchange - final Exchange exchange = camelKafkaConsumer.createExchange(false); - final Message message = exchange.getMessage(); - message.setBody(exchangeList.stream().toList()); - + Exchange exchange = camelKafkaConsumer.createExchange(false); + Message message = exchange.getMessage(); + var exchanges = exchangeList.stream().toList(); + message.setBody(exchanges); try { if (configuration.isAllowManualCommit()) { - return manualCommitResultProcessing(camelKafkaConsumer, exchange); + manualCommitResultProcessing(camelKafkaConsumer, exchange); } else { - return autoCommitResultProcessing(camelKafkaConsumer, exchange); + autoCommitResultProcessing(camelKafkaConsumer, exchange, exchanges.size()); } } finally { // Release the exchange @@ -174,46 +163,37 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor { /* * The flow to execute when using auto-commit */ - private ProcessingResult autoCommitResultProcessing(KafkaConsumer camelKafkaConsumer, Exchange exchange) { - final ExceptionHandler exceptionHandler = camelKafkaConsumer.getExceptionHandler(); - final CommitSynchronization commitSynchronization = new CommitSynchronization(exceptionHandler); + private void autoCommitResultProcessing(KafkaConsumer camelKafkaConsumer, Exchange exchange, int size) { + ExceptionHandler exceptionHandler = camelKafkaConsumer.getExceptionHandler(); + CommitSynchronization commitSynchronization = new CommitSynchronization(exceptionHandler, size); exchange.getExchangeExtension().addOnCompletion(commitSynchronization); - try { processor.process(exchange); } catch (Exception e) { exchange.setException(e); } - - return commitSynchronization.result; + if (exchange.getException() != null) { + processException(exchange, exceptionHandler); + } } /* * The flow to execute when the integrations perform manual commit on their own */ - private ProcessingResult manualCommitResultProcessing(KafkaConsumer camelKafkaConsumer, Exchange exchange) { + private void manualCommitResultProcessing(KafkaConsumer camelKafkaConsumer, Exchange exchange) { try { processor.process(exchange); } catch (Exception e) { exchange.setException(e); } - - ProcessingResult result; if (exchange.getException() != null) { - LOG.debug("An exception was thrown for batch records"); - final ExceptionHandler exceptionHandler = camelKafkaConsumer.getExceptionHandler(); - boolean handled = processException(exchange, exceptionHandler); - result = new ProcessingResult(false, handled); - } else { - result = new ProcessingResult(false, false); + ExceptionHandler exceptionHandler = camelKafkaConsumer.getExceptionHandler(); + processException(exchange, exceptionHandler); } - - return result; } - private boolean processException(Exchange exchange, ExceptionHandler exceptionHandler) { + private void processException(Exchange exchange, ExceptionHandler exceptionHandler) { // will handle/log the exception and then continue to next exceptionHandler.handleException("Error during processing", exchange, exchange.getException()); - return true; } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessorFacade.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessorFacade.java index 0347249ebe0..06745b3506f 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessorFacade.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessorFacade.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.kafka.consumer.support.batching; import java.util.Set; @@ -30,16 +29,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaRecordBatchingProcessorFacade extends AbstractKafkaRecordProcessorFacade { + private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordBatchingProcessorFacade.class); + private final KafkaRecordBatchingProcessor kafkaRecordProcessor; - public KafkaRecordBatchingProcessorFacade( - KafkaConsumer camelKafkaConsumer, String threadId, + public KafkaRecordBatchingProcessorFacade(KafkaConsumer camelKafkaConsumer, String threadId, CommitManager commitManager, KafkaConsumerListener consumerListener) { super(camelKafkaConsumer, threadId, commitManager, consumerListener); - kafkaRecordProcessor = buildKafkaRecordProcessor(commitManager); - } private KafkaRecordBatchingProcessor buildKafkaRecordProcessor(CommitManager commitManager) { @@ -52,10 +50,8 @@ public class KafkaRecordBatchingProcessorFacade extends AbstractKafkaRecordProce @Override public ProcessingResult processPolledRecords(ConsumerRecords<Object, Object> allRecords) { logRecords(allRecords); - Set<TopicPartition> partitions = allRecords.partitions(); LOG.debug("Poll received records on {} partitions", partitions.size()); - return kafkaRecordProcessor.processExchange(camelKafkaConsumer, allRecords); }