This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new f70b7119468 CAMEL-22261 Reuse existing transaction if already transacted f70b7119468 is described below commit f70b7119468b3530d0f6a8896475c04b596f5a8a Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Jul 23 21:30:02 2025 +0200 CAMEL-22261 Reuse existing transaction if already transacted --- .../apache/camel/component/kafka/KafkaProducer.java | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index 1ea7c459a21..c8af6d224a9 100755 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -530,20 +530,15 @@ public class KafkaProducer extends DefaultAsyncProducer implements RouteIdAware private void startKafkaTransaction(Exchange exchange) { UnitOfWork uow = exchange.getUnitOfWork(); - if (uow.isTransactedBy(transactionId)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Not starting kafka transaction {} with exchange {} (UOW hash code {}) since one is already started.", - transactionId, exchange.getExchangeId(), uow.hashCode()); - } - return; - } else if (LOG.isDebugEnabled()) { - LOG.debug("Starting kafka transaction {} with exchange {} (UOW hash code {})", transactionId, - exchange.getExchangeId(), uow.hashCode()); + if (!uow.isTransactedBy(transactionId)) { + LOG.debug("Starting kafka transaction {} with exchange {}", transactionId, exchange.getExchangeId()); + uow.beginTransactedBy(transactionId); + kafkaProducer.beginTransaction(); + uow.addSynchronization(new KafkaTransactionSynchronization(transactionId, kafkaProducer)); + } else { + LOG.debug("Using existing kafka transaction {} with exchange {}.", + transactionId, exchange.getExchangeId()); } - - uow.beginTransactedBy(transactionId); - kafkaProducer.beginTransaction(); - uow.addSynchronization(new KafkaTransactionSynchronization(transactionId, kafkaProducer)); } @Override