This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new f70b7119468 CAMEL-22261 Reuse existing transaction if already 
transacted
f70b7119468 is described below

commit f70b7119468b3530d0f6a8896475c04b596f5a8a
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed Jul 23 21:30:02 2025 +0200

    CAMEL-22261 Reuse existing transaction if already transacted
---
 .../apache/camel/component/kafka/KafkaProducer.java | 21 ++++++++-------------
 1 file changed, 8 insertions(+), 13 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 1ea7c459a21..c8af6d224a9 100755
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -530,20 +530,15 @@ public class KafkaProducer extends DefaultAsyncProducer 
implements RouteIdAware
     private void startKafkaTransaction(Exchange exchange) {
         UnitOfWork uow = exchange.getUnitOfWork();
 
-        if (uow.isTransactedBy(transactionId)) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Not starting kafka transaction {} with exchange {} 
(UOW hash code {}) since one is already started.",
-                        transactionId, exchange.getExchangeId(), 
uow.hashCode());
-            }
-            return;
-        } else if (LOG.isDebugEnabled()) {
-            LOG.debug("Starting kafka transaction {} with exchange {} (UOW 
hash code {})", transactionId,
-                    exchange.getExchangeId(), uow.hashCode());
+        if (!uow.isTransactedBy(transactionId)) {
+            LOG.debug("Starting kafka transaction {} with exchange {}", 
transactionId, exchange.getExchangeId());
+            uow.beginTransactedBy(transactionId);
+            kafkaProducer.beginTransaction();
+            uow.addSynchronization(new 
KafkaTransactionSynchronization(transactionId, kafkaProducer));
+        } else {
+            LOG.debug("Using existing kafka transaction {} with exchange {}.",
+                    transactionId, exchange.getExchangeId());
         }
-
-        uow.beginTransactedBy(transactionId);
-        kafkaProducer.beginTransaction();
-        uow.addSynchronization(new 
KafkaTransactionSynchronization(transactionId, kafkaProducer));
     }
 
     @Override

Reply via email to