This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-4.8.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.8.x by this push: new cc9a6af8ff7 CAMEL-21309: camel-cxf - Force using sync client when using tracing/opentelemetry as otherwise spans are not working correctly. (#15816) cc9a6af8ff7 is described below commit cc9a6af8ff7ad6d73d1e2bd97eb5e9db7ebb0d3f Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Oct 3 13:16:14 2024 +0200 CAMEL-21309: camel-cxf - Force using sync client when using tracing/opentelemetry as otherwise spans are not working correctly. (#15816) * CAMEL-21309: camel-cxf - Force using sync client when using tracing/opentelemetry as otherwise spans are not working correctly. --- .../camel/component/cxf/jaxrs/CxfRsProducer.java | 24 ++++++++++++++++++++-- .../camel/component/cxf/jaxws/CxfProducer.java | 17 +++++++++++++-- 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java b/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java index d03dfcee08f..952f38c2632 100644 --- a/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java +++ b/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java @@ -42,6 +42,7 @@ import jakarta.ws.rs.core.Response; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; +import org.apache.camel.ExchangePropertyKey; import org.apache.camel.Message; import org.apache.camel.component.cxf.common.CxfOperationException; import org.apache.camel.component.cxf.common.message.CxfConstants; @@ -59,6 +60,8 @@ import org.apache.cxf.jaxrs.client.WebClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.camel.Exchange.ACTIVE_SPAN; + /** * CxfRsProducer binds a Camel exchange to a CXF exchange, acts as a CXF JAXRS client, it will turn the normal Object * invocation to a RESTful request according to resource annotation. Any response will be bound to Camel exchange. @@ -108,6 +111,17 @@ public class CxfRsProducer extends DefaultAsyncProducer { @Override public boolean process(Exchange exchange, AsyncCallback callback) { + // if using camel-tracer then execute this synchronously due to CXF-9063 + if (exchange.getProperty(ExchangePropertyKey.ACTIVE_SPAN) != null) { + try { + process(exchange); + } catch (Exception e) { + exchange.setException(e); + } + callback.done(true); + return true; + } + try { Message inMessage = exchange.getIn(); Boolean httpClientAPI = inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_USING_HTTP_API, Boolean.class); @@ -122,7 +136,6 @@ public class CxfRsProducer extends DefaultAsyncProducer { } return false; } catch (Exception exception) { - LOG.error("Error invoking request", exception); exchange.setException(exception); callback.done(true); return true; @@ -130,6 +143,8 @@ public class CxfRsProducer extends DefaultAsyncProducer { } protected void invokeAsyncHttpClient(Exchange exchange, final AsyncCallback callback) throws Exception { + LOG.trace("Process exchange: {} (asynchronously)", exchange); + Message inMessage = exchange.getIn(); JAXRSClientFactoryBean cfb = clientFactoryBeanCache.get(CxfRsEndpointUtils .getEffectiveAddress(exchange, ((CxfRsEndpoint) getEndpoint()).getAddress())); @@ -188,6 +203,8 @@ public class CxfRsProducer extends DefaultAsyncProducer { } protected void invokeAsyncProxyClient(Exchange exchange, final AsyncCallback callback) throws Exception { + LOG.trace("Process exchange: {} (asynchronously)", exchange); + Message inMessage = exchange.getIn(); Object[] varValues = inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_VAR_VALUES, Object[].class); String methodName = inMessage.getHeader(CxfConstants.OPERATION_NAME, String.class); @@ -263,7 +280,6 @@ public class CxfRsProducer extends DefaultAsyncProducer { } protected void setupClientMatrix(WebClient client, Exchange exchange) throws Exception { - org.apache.cxf.message.Message cxfMessage = (org.apache.cxf.message.Message) exchange.getIn().getHeader(CxfConstants.CAMEL_CXF_MESSAGE); if (cxfMessage != null) { @@ -294,6 +310,8 @@ public class CxfRsProducer extends DefaultAsyncProducer { } protected void invokeHttpClient(Exchange exchange) throws Exception { + LOG.trace("Process exchange: {} (synchronously)", exchange); + Message inMessage = exchange.getIn(); JAXRSClientFactoryBean cfb = clientFactoryBeanCache.get(CxfRsEndpointUtils .getEffectiveAddress(exchange, ((CxfRsEndpoint) getEndpoint()).getAddress())); @@ -441,6 +459,8 @@ public class CxfRsProducer extends DefaultAsyncProducer { } protected void invokeProxyClient(Exchange exchange) throws Exception { + LOG.trace("Process exchange: {} (synchronously)", exchange); + Message inMessage = exchange.getIn(); Object[] varValues = inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_VAR_VALUES, Object[].class); String methodName = inMessage.getHeader(CxfConstants.OPERATION_NAME, String.class); diff --git a/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java b/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java index ca713e0ebe9..a666b632923 100644 --- a/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java +++ b/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java @@ -32,6 +32,7 @@ import javax.xml.namespace.QName; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; +import org.apache.camel.ExchangePropertyKey; import org.apache.camel.RuntimeCamelException; import org.apache.camel.component.cxf.common.CxfPayload; import org.apache.camel.component.cxf.common.DataFormat; @@ -51,6 +52,8 @@ import org.apache.cxf.service.model.BindingOperationInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.camel.Exchange.ACTIVE_SPAN; + /** * CxfProducer binds a Camel exchange to a CXF exchange, acts as a CXF client, and sends the request to a CXF to a * server. Any response will be bound to Camel exchange. @@ -99,8 +102,18 @@ public class CxfProducer extends DefaultAsyncProducer { // so we don't delegate the sync process call to the async process @Override public boolean process(Exchange camelExchange, AsyncCallback callback) { - LOG.trace("Process exchange: {} in an async way.", camelExchange); + // if using camel-tracer then execute this synchronously due to CXF-9063 + if (camelExchange.getProperty(ExchangePropertyKey.ACTIVE_SPAN) != null) { + try { + process(camelExchange); + } catch (Exception e) { + camelExchange.setException(e); + } + callback.done(true); + return true; + } + LOG.trace("Process exchange: {} (asynchronously)", camelExchange); try { // create CXF exchange ExchangeImpl cxfExchange = new ExchangeImpl(); @@ -137,7 +150,7 @@ public class CxfProducer extends DefaultAsyncProducer { */ @Override public void process(Exchange camelExchange) throws Exception { - LOG.trace("Process exchange: {} in sync way.", camelExchange); + LOG.trace("Process exchange: {} (synchronously)", camelExchange); // create CXF exchange ExchangeImpl cxfExchange = new ExchangeImpl();