This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 9e05482d845 CAMEL-21940: camel-kafka - KafkaProducerCallBack must use 
atomic decrement and compare pattern to avoid executing the continuation twice 
from the onCompletion and allSent methods as those run in different threads in 
async mode. (#17693)
9e05482d845 is described below

commit 9e05482d84506b5c6aadc94bdf6915786756314a
Author: Pavel Bořík <32341334+pavel-bo...@users.noreply.github.com>
AuthorDate: Tue Apr 8 11:41:43 2025 +0200

    CAMEL-21940: camel-kafka - KafkaProducerCallBack must use atomic decrement 
and compare pattern to avoid executing the continuation twice from the 
onCompletion and allSent methods as those run in different threads in async 
mode. (#17693)
---
 .../kafka/producer/support/KafkaProducerCallBack.java      | 14 +++++---------
 1 file changed, 5 insertions(+), 9 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerCallBack.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerCallBack.java
index 2b7de255f49..c97ada3e8b9 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerCallBack.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerCallBack.java
@@ -19,7 +19,7 @@ package org.apache.camel.component.kafka.producer.support;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.util.ObjectHelper;
@@ -36,7 +36,7 @@ public final class KafkaProducerCallBack implements Callback {
 
     private final Object body;
     private final AsyncCallback callback;
-    private final LongAdder count = new LongAdder();
+    private final AtomicInteger count = new AtomicInteger(1);
     private final ExecutorService workerPool;
     private final boolean record;
     private final List<RecordMetadata> recordMetadataList = new ArrayList<>();
@@ -49,7 +49,6 @@ public final class KafkaProducerCallBack implements Callback {
         // is merely a safeguard
         this.workerPool = ObjectHelper.notNull(workerPool, "workerPool");
         this.record = record;
-        count.increment();
 
         if (record) {
             setRecordMetadata(body, recordMetadataList);
@@ -57,12 +56,11 @@ public final class KafkaProducerCallBack implements 
Callback {
     }
 
     public void increment() {
-        count.increment();
+        count.incrementAndGet();
     }
 
     public boolean allSent() {
-        count.decrement();
-        if (count.intValue() == 0) {
+        if (count.decrementAndGet() == 0) {
             LOG.trace("All messages sent, continue routing.");
             // was able to get all the work done while queuing the requests
             callback.done(true);
@@ -81,8 +79,7 @@ public final class KafkaProducerCallBack implements Callback {
             recordMetadataList.add(recordMetadata);
         }
 
-        count.decrement();
-        if (count.intValue() == 0) {
+        if (count.decrementAndGet() == 0) {
             // use worker pool to continue routing the exchange
             // as this thread is from Kafka Callback and should not be used
             // by Camel routing
@@ -94,5 +91,4 @@ public final class KafkaProducerCallBack implements Callback {
         LOG.trace("All messages sent, continue routing (within thread).");
         callback.done(false);
     }
-
 }

Reply via email to