This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 9e05482d845 CAMEL-21940: camel-kafka - KafkaProducerCallBack must use atomic decrement and compare pattern to avoid executing the continuation twice from the onCompletion and allSent methods as those run in different threads in async mode. (#17693) 9e05482d845 is described below commit 9e05482d84506b5c6aadc94bdf6915786756314a Author: Pavel Bořík <32341334+pavel-bo...@users.noreply.github.com> AuthorDate: Tue Apr 8 11:41:43 2025 +0200 CAMEL-21940: camel-kafka - KafkaProducerCallBack must use atomic decrement and compare pattern to avoid executing the continuation twice from the onCompletion and allSent methods as those run in different threads in async mode. (#17693) --- .../kafka/producer/support/KafkaProducerCallBack.java | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerCallBack.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerCallBack.java index 2b7de255f49..c97ada3e8b9 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerCallBack.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KafkaProducerCallBack.java @@ -19,7 +19,7 @@ package org.apache.camel.component.kafka.producer.support; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.AsyncCallback; import org.apache.camel.util.ObjectHelper; @@ -36,7 +36,7 @@ public final class KafkaProducerCallBack implements Callback { private final Object body; private final AsyncCallback callback; - private final LongAdder count = new LongAdder(); + private final AtomicInteger count = new AtomicInteger(1); private final ExecutorService workerPool; private final boolean record; private final List<RecordMetadata> recordMetadataList = new ArrayList<>(); @@ -49,7 +49,6 @@ public final class KafkaProducerCallBack implements Callback { // is merely a safeguard this.workerPool = ObjectHelper.notNull(workerPool, "workerPool"); this.record = record; - count.increment(); if (record) { setRecordMetadata(body, recordMetadataList); @@ -57,12 +56,11 @@ public final class KafkaProducerCallBack implements Callback { } public void increment() { - count.increment(); + count.incrementAndGet(); } public boolean allSent() { - count.decrement(); - if (count.intValue() == 0) { + if (count.decrementAndGet() == 0) { LOG.trace("All messages sent, continue routing."); // was able to get all the work done while queuing the requests callback.done(true); @@ -81,8 +79,7 @@ public final class KafkaProducerCallBack implements Callback { recordMetadataList.add(recordMetadata); } - count.decrement(); - if (count.intValue() == 0) { + if (count.decrementAndGet() == 0) { // use worker pool to continue routing the exchange // as this thread is from Kafka Callback and should not be used // by Camel routing @@ -94,5 +91,4 @@ public final class KafkaProducerCallBack implements Callback { LOG.trace("All messages sent, continue routing (within thread)."); callback.done(false); } - }