This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-4.4.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 9d3b6daa8fd845be6a49688f4f1a494bb02612da Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Oct 3 13:16:14 2024 +0200 CAMEL-21309: camel-cxf - Force using sync client when using tracing/opentelemetry as otherwise spans are not working correctly. (#15816) * CAMEL-21309: camel-cxf - Force using sync client when using tracing/opentelemetry as otherwise spans are not working correctly. --- .../camel/component/cxf/jaxrs/CxfRsProducer.java | 24 ++++++++++++++++++++-- .../camel/component/cxf/jaxws/CxfProducer.java | 17 +++++++++++++-- 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java b/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java index 35029ec7c6f..aa22b81a0cf 100644 --- a/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java +++ b/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java @@ -42,6 +42,7 @@ import jakarta.ws.rs.core.Response; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; +import org.apache.camel.ExchangePropertyKey; import org.apache.camel.Message; import org.apache.camel.component.cxf.common.CxfOperationException; import org.apache.camel.component.cxf.common.message.CxfConstants; @@ -59,6 +60,8 @@ import org.apache.cxf.jaxrs.client.WebClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.camel.Exchange.ACTIVE_SPAN; + /** * CxfRsProducer binds a Camel exchange to a CXF exchange, acts as a CXF JAXRS client, it will turn the normal Object * invocation to a RESTful request according to resource annotation. Any response will be bound to Camel exchange. @@ -108,6 +111,17 @@ public class CxfRsProducer extends DefaultAsyncProducer { @Override public boolean process(Exchange exchange, AsyncCallback callback) { + // if using camel-tracer then execute this synchronously due to CXF-9063 + if (exchange.getProperty(ExchangePropertyKey.ACTIVE_SPAN) != null) { + try { + process(exchange); + } catch (Exception e) { + exchange.setException(e); + } + callback.done(true); + return true; + } + try { Message inMessage = exchange.getIn(); Boolean httpClientAPI = inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_USING_HTTP_API, Boolean.class); @@ -122,7 +136,6 @@ public class CxfRsProducer extends DefaultAsyncProducer { } return false; } catch (Exception exception) { - LOG.error("Error invoking request", exception); exchange.setException(exception); callback.done(true); return true; @@ -130,6 +143,8 @@ public class CxfRsProducer extends DefaultAsyncProducer { } protected void invokeAsyncHttpClient(Exchange exchange, final AsyncCallback callback) throws Exception { + LOG.trace("Process exchange: {} (asynchronously)", exchange); + Message inMessage = exchange.getIn(); JAXRSClientFactoryBean cfb = clientFactoryBeanCache.get(CxfRsEndpointUtils .getEffectiveAddress(exchange, ((CxfRsEndpoint) getEndpoint()).getAddress())); @@ -192,6 +207,8 @@ public class CxfRsProducer extends DefaultAsyncProducer { } protected void invokeAsyncProxyClient(Exchange exchange, final AsyncCallback callback) throws Exception { + LOG.trace("Process exchange: {} (asynchronously)", exchange); + Message inMessage = exchange.getIn(); Object[] varValues = inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_VAR_VALUES, Object[].class); String methodName = inMessage.getHeader(CxfConstants.OPERATION_NAME, String.class); @@ -267,7 +284,6 @@ public class CxfRsProducer extends DefaultAsyncProducer { } protected void setupClientMatrix(WebClient client, Exchange exchange) throws Exception { - org.apache.cxf.message.Message cxfMessage = (org.apache.cxf.message.Message) exchange.getIn().getHeader(CxfConstants.CAMEL_CXF_MESSAGE); if (cxfMessage != null) { @@ -298,6 +314,8 @@ public class CxfRsProducer extends DefaultAsyncProducer { } protected void invokeHttpClient(Exchange exchange) throws Exception { + LOG.trace("Process exchange: {} (synchronously)", exchange); + Message inMessage = exchange.getIn(); JAXRSClientFactoryBean cfb = clientFactoryBeanCache.get(CxfRsEndpointUtils .getEffectiveAddress(exchange, ((CxfRsEndpoint) getEndpoint()).getAddress())); @@ -429,6 +447,8 @@ public class CxfRsProducer extends DefaultAsyncProducer { } protected void invokeProxyClient(Exchange exchange) throws Exception { + LOG.trace("Process exchange: {} (synchronously)", exchange); + Message inMessage = exchange.getIn(); Object[] varValues = inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_VAR_VALUES, Object[].class); String methodName = inMessage.getHeader(CxfConstants.OPERATION_NAME, String.class); diff --git a/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java b/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java index fb8801cd612..036985148b6 100644 --- a/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java +++ b/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java @@ -32,6 +32,7 @@ import javax.xml.namespace.QName; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; +import org.apache.camel.ExchangePropertyKey; import org.apache.camel.RuntimeCamelException; import org.apache.camel.component.cxf.common.CxfPayload; import org.apache.camel.component.cxf.common.DataFormat; @@ -51,6 +52,8 @@ import org.apache.cxf.service.model.BindingOperationInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.camel.Exchange.ACTIVE_SPAN; + /** * CxfProducer binds a Camel exchange to a CXF exchange, acts as a CXF client, and sends the request to a CXF to a * server. Any response will be bound to Camel exchange. @@ -99,8 +102,18 @@ public class CxfProducer extends DefaultAsyncProducer { // so we don't delegate the sync process call to the async process @Override public boolean process(Exchange camelExchange, AsyncCallback callback) { - LOG.trace("Process exchange: {} in an async way.", camelExchange); + // if using camel-tracer then execute this synchronously due to CXF-9063 + if (camelExchange.getProperty(ExchangePropertyKey.ACTIVE_SPAN) != null) { + try { + process(camelExchange); + } catch (Exception e) { + camelExchange.setException(e); + } + callback.done(true); + return true; + } + LOG.trace("Process exchange: {} (asynchronously)", camelExchange); try { // create CXF exchange ExchangeImpl cxfExchange = new ExchangeImpl(); @@ -137,7 +150,7 @@ public class CxfProducer extends DefaultAsyncProducer { */ @Override public void process(Exchange camelExchange) throws Exception { - LOG.trace("Process exchange: {} in sync way.", camelExchange); + LOG.trace("Process exchange: {} (synchronously)", camelExchange); // create CXF exchange ExchangeImpl cxfExchange = new ExchangeImpl();