This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch kaf2
in repository https://gitbox.apache.org/repos/asf/camel.git

commit cf16440efe585b389551e86ba99e3d6559379c31
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Mon Feb 3 09:03:06 2025 +0100

    CAMEL-21699: camel-kafka - Batch consumer should not require message body 
to keep being List<Exchange>
---
 .../camel/component/kafka/KafkaProducer.java       | 42 -------------
 .../kafka/KafkaTransactionSynchronization.java     | 63 +++++++++++++++++++
 .../batching/KafkaRecordBatchingProcessor.java     | 70 ++++++++--------------
 .../KafkaRecordBatchingProcessorFacade.java        | 10 +---
 4 files changed, 91 insertions(+), 94 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 807e550e716..572c8d1b751 100755
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -41,18 +41,15 @@ import org.apache.camel.health.HealthCheckHelper;
 import org.apache.camel.health.WritableHealthCheckRepository;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.support.DefaultAsyncProducer;
-import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.util.KeyValueHolder;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ReflectionHelper;
 import org.apache.camel.util.URISupport;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.NetworkClient;
-import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.slf4j.Logger;
@@ -529,42 +526,3 @@ public class KafkaProducer extends DefaultAsyncProducer {
         exchange.getUnitOfWork().addSynchronization(new 
KafkaTransactionSynchronization(transactionId, kafkaProducer));
     }
 }
-
-class KafkaTransactionSynchronization extends SynchronizationAdapter {
-    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaTransactionSynchronization.class);
-    private final String transactionId;
-    private final Producer kafkaProducer;
-
-    public KafkaTransactionSynchronization(String transactionId, Producer 
kafkaProducer) {
-        this.transactionId = transactionId;
-        this.kafkaProducer = kafkaProducer;
-    }
-
-    @Override
-    public void onDone(Exchange exchange) {
-        try {
-            if (exchange.getException() != null || exchange.isRollbackOnly()) {
-                if (exchange.getException() instanceof KafkaException) {
-                    LOG.warn("Catch {} and will close kafka producer with 
transaction {} ", exchange.getException(),
-                            transactionId);
-                    kafkaProducer.close();
-                } else {
-                    LOG.warn("Abort kafka transaction {} with exchange {}", 
transactionId, exchange.getExchangeId());
-                    kafkaProducer.abortTransaction();
-                }
-            } else {
-                LOG.debug("Commit kafka transaction {} with exchange {}", 
transactionId, exchange.getExchangeId());
-                kafkaProducer.commitTransaction();
-            }
-        } catch (KafkaException e) {
-            exchange.setException(e);
-        } catch (Exception e) {
-            exchange.setException(e);
-            LOG.warn("Abort kafka transaction {} with exchange {} due to {} ", 
transactionId, exchange.getExchangeId(),
-                    e.getMessage(), e);
-            kafkaProducer.abortTransaction();
-        } finally {
-            exchange.getUnitOfWork().endTransactedBy(transactionId);
-        }
-    }
-}
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaTransactionSynchronization.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaTransactionSynchronization.java
new file mode 100644
index 00000000000..2525211f787
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaTransactionSynchronization.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.support.SynchronizationAdapter;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.KafkaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KafkaTransactionSynchronization extends SynchronizationAdapter {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaTransactionSynchronization.class);
+    private final String transactionId;
+    private final Producer kafkaProducer;
+
+    public KafkaTransactionSynchronization(String transactionId, Producer 
kafkaProducer) {
+        this.transactionId = transactionId;
+        this.kafkaProducer = kafkaProducer;
+    }
+
+    @Override
+    public void onDone(Exchange exchange) {
+        try {
+            if (exchange.getException() != null || exchange.isRollbackOnly()) {
+                if (exchange.getException() instanceof KafkaException) {
+                    LOG.warn("Catch {} and will close kafka producer with 
transaction {} ", exchange.getException(),
+                            transactionId);
+                    kafkaProducer.close();
+                } else {
+                    LOG.warn("Abort kafka transaction {} with exchange {}", 
transactionId, exchange.getExchangeId());
+                    kafkaProducer.abortTransaction();
+                }
+            } else {
+                LOG.debug("Commit kafka transaction {} with exchange {}", 
transactionId, exchange.getExchangeId());
+                kafkaProducer.commitTransaction();
+            }
+        } catch (KafkaException e) {
+            exchange.setException(e);
+        } catch (Exception e) {
+            exchange.setException(e);
+            LOG.warn("Abort kafka transaction {} with exchange {} due to {} ", 
transactionId, exchange.getExchangeId(),
+                    e.getMessage(), e);
+            kafkaProducer.abortTransaction();
+        } finally {
+            exchange.getUnitOfWork().endTransactedBy(transactionId);
+        }
+    }
+}
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
index 3e10a65e0ea..2568f3a202e 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.component.kafka.consumer.support.batching;
 
-import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
 
@@ -40,6 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor {
+
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaRecordBatchingProcessor.class);
 
     private final KafkaConfiguration configuration;
@@ -50,25 +50,17 @@ final class KafkaRecordBatchingProcessor extends 
KafkaRecordProcessor {
 
     private final class CommitSynchronization implements Synchronization {
         private final ExceptionHandler exceptionHandler;
-        private ProcessingResult result;
+        private final int size;
 
-        public CommitSynchronization(ExceptionHandler exceptionHandler) {
+        public CommitSynchronization(ExceptionHandler exceptionHandler, int 
size) {
             this.exceptionHandler = exceptionHandler;
+            this.size = size;
         }
 
         @Override
         public void onComplete(Exchange exchange) {
-            final List<?> exchanges = 
exchange.getMessage().getBody(List.class);
-
-            // Ensure we are actually receiving what we are asked for
-            if (exchanges == null || exchanges.isEmpty()) {
-                LOG.warn("The exchange is {}", exchanges == null ? "not of the 
expected type (null)" : "empty");
-                return;
-            }
-
-            LOG.debug("Calling commit on {} exchanges using {}", 
exchanges.size(), commitManager.getClass().getSimpleName());
+            LOG.debug("Calling commit on {} exchanges using {}", size, 
commitManager.getClass().getSimpleName());
             commitManager.commit();
-            result = new ProcessingResult(false, false);
         }
 
         @Override
@@ -81,8 +73,6 @@ final class KafkaRecordBatchingProcessor extends 
KafkaRecordProcessor {
                 LOG.warn(
                         "Skipping auto-commit on the batch because processing 
the exchanged has failed and the error was not correctly handled");
             }
-
-            result = new ProcessingResult(false, true);
         }
     }
 
@@ -90,8 +80,7 @@ final class KafkaRecordBatchingProcessor extends 
KafkaRecordProcessor {
         this.configuration = configuration;
         this.processor = processor;
         this.commitManager = commitManager;
-
-        this.exchangeList = new 
ArrayBlockingQueue<Exchange>(configuration.getMaxPollRecords());
+        this.exchangeList = new 
ArrayBlockingQueue<>(configuration.getMaxPollRecords());
     }
 
     public Exchange toExchange(
@@ -105,7 +94,6 @@ final class KafkaRecordBatchingProcessor extends 
KafkaRecordProcessor {
 
         if (configuration.isAllowManualCommit()) {
             KafkaManualCommit manual = commitManager.getManualCommit(exchange, 
topicPartition, consumerRecord);
-
             message.setHeader(KafkaConstants.MANUAL_COMMIT, manual);
         }
 
@@ -115,6 +103,7 @@ final class KafkaRecordBatchingProcessor extends 
KafkaRecordProcessor {
     public ProcessingResult processExchange(KafkaConsumer camelKafkaConsumer, 
ConsumerRecords<Object, Object> consumerRecords) {
         LOG.debug("There's {} records to process ... max poll is set to {}", 
consumerRecords.count(),
                 configuration.getMaxPollRecords());
+
         // Aggregate all consumer records in a single exchange
         if (exchangeList.isEmpty()) {
             watch.takenAndRestart();
@@ -138,7 +127,7 @@ final class KafkaRecordBatchingProcessor extends 
KafkaRecordProcessor {
 
             exchangeList.add(childExchange);
 
-            if (exchangeList.size() == configuration.getMaxPollRecords()) {
+            if (exchangeList.size() >= configuration.getMaxPollRecords()) {
                 processBatch(camelKafkaConsumer);
                 exchangeList.clear();
             }
@@ -153,17 +142,17 @@ final class KafkaRecordBatchingProcessor extends 
KafkaRecordProcessor {
         return !exchangeList.isEmpty() && consumerRecords.isEmpty() && 
watch.taken() >= configuration.getPollTimeoutMs();
     }
 
-    private ProcessingResult processBatch(KafkaConsumer camelKafkaConsumer) {
+    private void processBatch(KafkaConsumer camelKafkaConsumer) {
         // Create the bundle exchange
-        final Exchange exchange = camelKafkaConsumer.createExchange(false);
-        final Message message = exchange.getMessage();
-        message.setBody(exchangeList.stream().toList());
-
+        Exchange exchange = camelKafkaConsumer.createExchange(false);
+        Message message = exchange.getMessage();
+        var exchanges = exchangeList.stream().toList();
+        message.setBody(exchanges);
         try {
             if (configuration.isAllowManualCommit()) {
-                return manualCommitResultProcessing(camelKafkaConsumer, 
exchange);
+                manualCommitResultProcessing(camelKafkaConsumer, exchange);
             } else {
-                return autoCommitResultProcessing(camelKafkaConsumer, 
exchange);
+                autoCommitResultProcessing(camelKafkaConsumer, exchange, 
exchanges.size());
             }
         } finally {
             // Release the exchange
@@ -174,46 +163,37 @@ final class KafkaRecordBatchingProcessor extends 
KafkaRecordProcessor {
     /*
      * The flow to execute when using auto-commit
      */
-    private ProcessingResult autoCommitResultProcessing(KafkaConsumer 
camelKafkaConsumer, Exchange exchange) {
-        final ExceptionHandler exceptionHandler = 
camelKafkaConsumer.getExceptionHandler();
-        final CommitSynchronization commitSynchronization = new 
CommitSynchronization(exceptionHandler);
+    private void autoCommitResultProcessing(KafkaConsumer camelKafkaConsumer, 
Exchange exchange, int size) {
+        ExceptionHandler exceptionHandler = 
camelKafkaConsumer.getExceptionHandler();
+        CommitSynchronization commitSynchronization = new 
CommitSynchronization(exceptionHandler, size);
         exchange.getExchangeExtension().addOnCompletion(commitSynchronization);
-
         try {
             processor.process(exchange);
         } catch (Exception e) {
             exchange.setException(e);
         }
-
-        return commitSynchronization.result;
+        if (exchange.getException() != null) {
+            processException(exchange, exceptionHandler);
+        }
     }
 
     /*
      * The flow to execute when the integrations perform manual commit on 
their own
      */
-    private ProcessingResult manualCommitResultProcessing(KafkaConsumer 
camelKafkaConsumer, Exchange exchange) {
+    private void manualCommitResultProcessing(KafkaConsumer 
camelKafkaConsumer, Exchange exchange) {
         try {
             processor.process(exchange);
         } catch (Exception e) {
             exchange.setException(e);
         }
-
-        ProcessingResult result;
         if (exchange.getException() != null) {
-            LOG.debug("An exception was thrown for batch records");
-            final ExceptionHandler exceptionHandler = 
camelKafkaConsumer.getExceptionHandler();
-            boolean handled = processException(exchange, exceptionHandler);
-            result = new ProcessingResult(false, handled);
-        } else {
-            result = new ProcessingResult(false, false);
+            ExceptionHandler exceptionHandler = 
camelKafkaConsumer.getExceptionHandler();
+            processException(exchange, exceptionHandler);
         }
-
-        return result;
     }
 
-    private boolean processException(Exchange exchange, ExceptionHandler 
exceptionHandler) {
+    private void processException(Exchange exchange, ExceptionHandler 
exceptionHandler) {
         // will handle/log the exception and then continue to next
         exceptionHandler.handleException("Error during processing", exchange, 
exchange.getException());
-        return true;
     }
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessorFacade.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessorFacade.java
index 0347249ebe0..06745b3506f 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessorFacade.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessorFacade.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.consumer.support.batching;
 
 import java.util.Set;
@@ -30,16 +29,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaRecordBatchingProcessorFacade extends 
AbstractKafkaRecordProcessorFacade {
+
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaRecordBatchingProcessorFacade.class);
+
     private final KafkaRecordBatchingProcessor kafkaRecordProcessor;
 
-    public KafkaRecordBatchingProcessorFacade(
-                                              KafkaConsumer 
camelKafkaConsumer, String threadId,
+    public KafkaRecordBatchingProcessorFacade(KafkaConsumer 
camelKafkaConsumer, String threadId,
                                               CommitManager commitManager, 
KafkaConsumerListener consumerListener) {
         super(camelKafkaConsumer, threadId, commitManager, consumerListener);
-
         kafkaRecordProcessor = buildKafkaRecordProcessor(commitManager);
-
     }
 
     private KafkaRecordBatchingProcessor 
buildKafkaRecordProcessor(CommitManager commitManager) {
@@ -52,10 +50,8 @@ public class KafkaRecordBatchingProcessorFacade extends 
AbstractKafkaRecordProce
     @Override
     public ProcessingResult processPolledRecords(ConsumerRecords<Object, 
Object> allRecords) {
         logRecords(allRecords);
-
         Set<TopicPartition> partitions = allRecords.partitions();
         LOG.debug("Poll received records on {} partitions", partitions.size());
-
         return kafkaRecordProcessor.processExchange(camelKafkaConsumer, 
allRecords);
     }
 

Reply via email to