This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-4.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 9d3b6daa8fd845be6a49688f4f1a494bb02612da
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Thu Oct 3 13:16:14 2024 +0200

    CAMEL-21309: camel-cxf - Force using sync client when using 
tracing/opentelemetry as otherwise spans are not working correctly. (#15816)
    
    * CAMEL-21309: camel-cxf - Force using sync client when using 
tracing/opentelemetry as otherwise spans are not working correctly.
---
 .../camel/component/cxf/jaxrs/CxfRsProducer.java   | 24 ++++++++++++++++++++--
 .../camel/component/cxf/jaxws/CxfProducer.java     | 17 +++++++++++++--
 2 files changed, 37 insertions(+), 4 deletions(-)

diff --git 
a/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
 
b/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
index 35029ec7c6f..aa22b81a0cf 100644
--- 
a/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
+++ 
b/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
@@ -42,6 +42,7 @@ import jakarta.ws.rs.core.Response;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.Message;
 import org.apache.camel.component.cxf.common.CxfOperationException;
 import org.apache.camel.component.cxf.common.message.CxfConstants;
@@ -59,6 +60,8 @@ import org.apache.cxf.jaxrs.client.WebClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.camel.Exchange.ACTIVE_SPAN;
+
 /**
  * CxfRsProducer binds a Camel exchange to a CXF exchange, acts as a CXF JAXRS 
client, it will turn the normal Object
  * invocation to a RESTful request according to resource annotation. Any 
response will be bound to Camel exchange.
@@ -108,6 +111,17 @@ public class CxfRsProducer extends DefaultAsyncProducer {
 
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
+        // if using camel-tracer then execute this synchronously due to 
CXF-9063
+        if (exchange.getProperty(ExchangePropertyKey.ACTIVE_SPAN) != null) {
+            try {
+                process(exchange);
+            } catch (Exception e) {
+                exchange.setException(e);
+            }
+            callback.done(true);
+            return true;
+        }
+
         try {
             Message inMessage = exchange.getIn();
             Boolean httpClientAPI = 
inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_USING_HTTP_API, Boolean.class);
@@ -122,7 +136,6 @@ public class CxfRsProducer extends DefaultAsyncProducer {
             }
             return false;
         } catch (Exception exception) {
-            LOG.error("Error invoking request", exception);
             exchange.setException(exception);
             callback.done(true);
             return true;
@@ -130,6 +143,8 @@ public class CxfRsProducer extends DefaultAsyncProducer {
     }
 
     protected void invokeAsyncHttpClient(Exchange exchange, final 
AsyncCallback callback) throws Exception {
+        LOG.trace("Process exchange: {} (asynchronously)", exchange);
+
         Message inMessage = exchange.getIn();
         JAXRSClientFactoryBean cfb = 
clientFactoryBeanCache.get(CxfRsEndpointUtils
                 .getEffectiveAddress(exchange, ((CxfRsEndpoint) 
getEndpoint()).getAddress()));
@@ -192,6 +207,8 @@ public class CxfRsProducer extends DefaultAsyncProducer {
     }
 
     protected void invokeAsyncProxyClient(Exchange exchange, final 
AsyncCallback callback) throws Exception {
+        LOG.trace("Process exchange: {} (asynchronously)", exchange);
+
         Message inMessage = exchange.getIn();
         Object[] varValues = 
inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_VAR_VALUES, Object[].class);
         String methodName = inMessage.getHeader(CxfConstants.OPERATION_NAME, 
String.class);
@@ -267,7 +284,6 @@ public class CxfRsProducer extends DefaultAsyncProducer {
     }
 
     protected void setupClientMatrix(WebClient client, Exchange exchange) 
throws Exception {
-
         org.apache.cxf.message.Message cxfMessage
                 = (org.apache.cxf.message.Message) 
exchange.getIn().getHeader(CxfConstants.CAMEL_CXF_MESSAGE);
         if (cxfMessage != null) {
@@ -298,6 +314,8 @@ public class CxfRsProducer extends DefaultAsyncProducer {
     }
 
     protected void invokeHttpClient(Exchange exchange) throws Exception {
+        LOG.trace("Process exchange: {} (synchronously)", exchange);
+
         Message inMessage = exchange.getIn();
         JAXRSClientFactoryBean cfb = 
clientFactoryBeanCache.get(CxfRsEndpointUtils
                 .getEffectiveAddress(exchange, ((CxfRsEndpoint) 
getEndpoint()).getAddress()));
@@ -429,6 +447,8 @@ public class CxfRsProducer extends DefaultAsyncProducer {
     }
 
     protected void invokeProxyClient(Exchange exchange) throws Exception {
+        LOG.trace("Process exchange: {} (synchronously)", exchange);
+
         Message inMessage = exchange.getIn();
         Object[] varValues = 
inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_VAR_VALUES, Object[].class);
         String methodName = inMessage.getHeader(CxfConstants.OPERATION_NAME, 
String.class);
diff --git 
a/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java
 
b/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java
index fb8801cd612..036985148b6 100644
--- 
a/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java
+++ 
b/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java
@@ -32,6 +32,7 @@ import javax.xml.namespace.QName;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.cxf.common.CxfPayload;
 import org.apache.camel.component.cxf.common.DataFormat;
@@ -51,6 +52,8 @@ import org.apache.cxf.service.model.BindingOperationInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.camel.Exchange.ACTIVE_SPAN;
+
 /**
  * CxfProducer binds a Camel exchange to a CXF exchange, acts as a CXF client, 
and sends the request to a CXF to a
  * server. Any response will be bound to Camel exchange.
@@ -99,8 +102,18 @@ public class CxfProducer extends DefaultAsyncProducer {
     // so we don't delegate the sync process call to the async process
     @Override
     public boolean process(Exchange camelExchange, AsyncCallback callback) {
-        LOG.trace("Process exchange: {} in an async way.", camelExchange);
+        // if using camel-tracer then execute this synchronously due to 
CXF-9063
+        if (camelExchange.getProperty(ExchangePropertyKey.ACTIVE_SPAN) != 
null) {
+            try {
+                process(camelExchange);
+            } catch (Exception e) {
+                camelExchange.setException(e);
+            }
+            callback.done(true);
+            return true;
+        }
 
+        LOG.trace("Process exchange: {} (asynchronously)", camelExchange);
         try {
             // create CXF exchange
             ExchangeImpl cxfExchange = new ExchangeImpl();
@@ -137,7 +150,7 @@ public class CxfProducer extends DefaultAsyncProducer {
      */
     @Override
     public void process(Exchange camelExchange) throws Exception {
-        LOG.trace("Process exchange: {} in sync way.", camelExchange);
+        LOG.trace("Process exchange: {} (synchronously)", camelExchange);
 
         // create CXF exchange
         ExchangeImpl cxfExchange = new ExchangeImpl();

Reply via email to