This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 942dc647681 CAMEL-20790: prevent an NPE under high concurrency
942dc647681 is described below

commit 942dc647681693aefc9a3554200d4e00a0c3c89d
Author: Otavio Rodolfo Piske <angusyo...@gmail.com>
AuthorDate: Tue Jun 11 17:23:36 2024 +0200

    CAMEL-20790: prevent an NPE under high concurrency
---
 .../support/batching/KafkaRecordBatchingProcessor.java   | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
index b6bf0a6e2e8..ed924dc78dd 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
@@ -16,8 +16,9 @@
  */
 package org.apache.camel.component.kafka.consumer.support.batching;
 
-import java.util.ArrayList;
 import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -45,7 +46,7 @@ final class KafkaRecordBatchingProcessor extends 
KafkaRecordProcessor {
     private final Processor processor;
     private final CommitManager commitManager;
     private final StopWatch watch = new StopWatch();
-    private List<Exchange> exchangeList;
+    private final Queue<Exchange> exchangeList;
 
     private final class CommitSynchronization implements Synchronization {
         private final ExceptionHandler exceptionHandler;
@@ -89,6 +90,8 @@ final class KafkaRecordBatchingProcessor extends 
KafkaRecordProcessor {
         this.configuration = configuration;
         this.processor = processor;
         this.commitManager = commitManager;
+
+        this.exchangeList = new 
ArrayBlockingQueue<Exchange>(configuration.getMaxPollRecords());
     }
 
     public Exchange toExchange(
@@ -113,8 +116,7 @@ final class KafkaRecordBatchingProcessor extends 
KafkaRecordProcessor {
         LOG.debug("There's {} records to process ... max poll is set to {}", 
consumerRecords.count(),
                 configuration.getMaxPollRecords());
         // Aggregate all consumer records in a single exchange
-        if (exchangeList == null) {
-            exchangeList = new ArrayList<>(configuration.getMaxPollRecords());
+        if (exchangeList.isEmpty()) {
             watch.takenAndRestart();
         }
 
@@ -125,7 +127,7 @@ final class KafkaRecordBatchingProcessor extends 
KafkaRecordProcessor {
 
             // poll timeout has elapsed, so check for expired records
             processBatch(camelKafkaConsumer);
-            exchangeList = null;
+            exchangeList.clear();
 
             return ProcessingResult.newUnprocessed();
         }
@@ -138,7 +140,7 @@ final class KafkaRecordBatchingProcessor extends 
KafkaRecordProcessor {
 
             if (exchangeList.size() == configuration.getMaxPollRecords()) {
                 processBatch(camelKafkaConsumer);
-                exchangeList = null;
+                exchangeList.clear();
             }
         }
 
@@ -156,7 +158,7 @@ final class KafkaRecordBatchingProcessor extends 
KafkaRecordProcessor {
         // Create the bundle exchange
         final Exchange exchange = camelKafkaConsumer.createExchange(false);
         final Message message = exchange.getMessage();
-        message.setBody(exchangeList);
+        message.setBody(exchangeList.stream().toList());
 
         try {
             if (configuration.isAllowManualCommit()) {

Reply via email to