This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 8788f722922 Camel 21302 (#16008) 8788f722922 is described below commit 8788f722922d066cc314c44ed4e60735109e9f07 Author: Freeman(Yue) Fang <freeman.f...@gmail.com> AuthorDate: Tue Oct 22 05:30:50 2024 -0400 Camel 21302 (#16008) * camel-opentelemetry: add async CXF reproducer AsyncCxfTest (cherry picked from commit 7d83a62b8e442dc9ac6fd79b153192add940301e) * [CAMEL-21302]camel-opentelemetry context leak with direct async producer * Revert "CAMEL-21309: camel-cxf - Force using sync client when using tracing/opentelemetry as otherwise spans are not working correctly. (#15816)" This reverts commit 45961c941382cbb43fe27cd861ab7c4e9fc86d8f. * Revert "CAMEL-21309: camel-cxf - Force using sync client when using tracing/opentelemetry as otherwise spans are not working correctly." This reverts commit f5e2b26fdbeb5ffd494b3e4513bacc8ee53ad354. * [CAMEL-21302]remove unnecessary package import --------- Co-authored-by: John Poth <poth.j...@gmail.com> --- .../camel/component/cxf/jaxrs/CxfRsProducer.java | 22 +------ .../camel/component/cxf/jaxws/CxfProducer.java | 15 +---- .../camel/component/direct/DirectProducer.java | 4 ++ components/camel-opentelemetry/pom.xml | 37 +++++++++++ .../apache/camel/opentelemetry/AsyncCxfTest.java | 75 ++++++++++++++++++++++ .../camel/opentelemetry/CurrentSpanTest.java | 25 ++++++++ .../apache/camel/tracing/ActiveSpanManager.java | 7 ++ .../org/apache/camel/ExchangeConstantProvider.java | 3 +- .../src/main/java/org/apache/camel/Exchange.java | 1 + .../java/org/apache/camel/ExchangePropertyKey.java | 1 + 10 files changed, 156 insertions(+), 34 deletions(-) diff --git a/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java b/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java index 20b407fee6d..d03dfcee08f 100644 --- a/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java +++ b/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java @@ -42,7 +42,6 @@ import jakarta.ws.rs.core.Response; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; -import org.apache.camel.ExchangePropertyKey; import org.apache.camel.Message; import org.apache.camel.component.cxf.common.CxfOperationException; import org.apache.camel.component.cxf.common.message.CxfConstants; @@ -109,17 +108,6 @@ public class CxfRsProducer extends DefaultAsyncProducer { @Override public boolean process(Exchange exchange, AsyncCallback callback) { - // if using camel-tracer then execute this synchronously due to CXF-9063 - if (exchange.getProperty(ExchangePropertyKey.ACTIVE_SPAN) != null) { - try { - process(exchange); - } catch (Exception e) { - exchange.setException(e); - } - callback.done(true); - return true; - } - try { Message inMessage = exchange.getIn(); Boolean httpClientAPI = inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_USING_HTTP_API, Boolean.class); @@ -134,6 +122,7 @@ public class CxfRsProducer extends DefaultAsyncProducer { } return false; } catch (Exception exception) { + LOG.error("Error invoking request", exception); exchange.setException(exception); callback.done(true); return true; @@ -141,8 +130,6 @@ public class CxfRsProducer extends DefaultAsyncProducer { } protected void invokeAsyncHttpClient(Exchange exchange, final AsyncCallback callback) throws Exception { - LOG.trace("Process exchange: {} (asynchronously)", exchange); - Message inMessage = exchange.getIn(); JAXRSClientFactoryBean cfb = clientFactoryBeanCache.get(CxfRsEndpointUtils .getEffectiveAddress(exchange, ((CxfRsEndpoint) getEndpoint()).getAddress())); @@ -201,8 +188,6 @@ public class CxfRsProducer extends DefaultAsyncProducer { } protected void invokeAsyncProxyClient(Exchange exchange, final AsyncCallback callback) throws Exception { - LOG.trace("Process exchange: {} (asynchronously)", exchange); - Message inMessage = exchange.getIn(); Object[] varValues = inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_VAR_VALUES, Object[].class); String methodName = inMessage.getHeader(CxfConstants.OPERATION_NAME, String.class); @@ -278,6 +263,7 @@ public class CxfRsProducer extends DefaultAsyncProducer { } protected void setupClientMatrix(WebClient client, Exchange exchange) throws Exception { + org.apache.cxf.message.Message cxfMessage = (org.apache.cxf.message.Message) exchange.getIn().getHeader(CxfConstants.CAMEL_CXF_MESSAGE); if (cxfMessage != null) { @@ -308,8 +294,6 @@ public class CxfRsProducer extends DefaultAsyncProducer { } protected void invokeHttpClient(Exchange exchange) throws Exception { - LOG.trace("Process exchange: {} (synchronously)", exchange); - Message inMessage = exchange.getIn(); JAXRSClientFactoryBean cfb = clientFactoryBeanCache.get(CxfRsEndpointUtils .getEffectiveAddress(exchange, ((CxfRsEndpoint) getEndpoint()).getAddress())); @@ -457,8 +441,6 @@ public class CxfRsProducer extends DefaultAsyncProducer { } protected void invokeProxyClient(Exchange exchange) throws Exception { - LOG.trace("Process exchange: {} (synchronously)", exchange); - Message inMessage = exchange.getIn(); Object[] varValues = inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_VAR_VALUES, Object[].class); String methodName = inMessage.getHeader(CxfConstants.OPERATION_NAME, String.class); diff --git a/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java b/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java index 03a499c2c36..ca713e0ebe9 100644 --- a/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java +++ b/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java @@ -32,7 +32,6 @@ import javax.xml.namespace.QName; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; -import org.apache.camel.ExchangePropertyKey; import org.apache.camel.RuntimeCamelException; import org.apache.camel.component.cxf.common.CxfPayload; import org.apache.camel.component.cxf.common.DataFormat; @@ -100,18 +99,8 @@ public class CxfProducer extends DefaultAsyncProducer { // so we don't delegate the sync process call to the async process @Override public boolean process(Exchange camelExchange, AsyncCallback callback) { - // if using camel-tracer then execute this synchronously due to CXF-9063 - if (camelExchange.getProperty(ExchangePropertyKey.ACTIVE_SPAN) != null) { - try { - process(camelExchange); - } catch (Exception e) { - camelExchange.setException(e); - } - callback.done(true); - return true; - } + LOG.trace("Process exchange: {} in an async way.", camelExchange); - LOG.trace("Process exchange: {} (asynchronously)", camelExchange); try { // create CXF exchange ExchangeImpl cxfExchange = new ExchangeImpl(); @@ -148,7 +137,7 @@ public class CxfProducer extends DefaultAsyncProducer { */ @Override public void process(Exchange camelExchange) throws Exception { - LOG.trace("Process exchange: {} (synchronously)", camelExchange); + LOG.trace("Process exchange: {} in sync way.", camelExchange); // create CXF exchange ExchangeImpl cxfExchange = new ExchangeImpl(); diff --git a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java index 714d00e673e..221cb70ef1e 100644 --- a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java +++ b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java @@ -18,6 +18,7 @@ package org.apache.camel.component.direct; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; +import org.apache.camel.ExchangePropertyKey; import org.apache.camel.support.DefaultAsyncProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,6 +94,9 @@ public class DirectProducer extends DefaultAsyncProducer { callback.done(true); return true; } else { + //Ensure we can close the CLIENT Scope created by this DirectProducer + //in the same thread + exchange.setProperty(ExchangePropertyKey.CLOSE_CLIENT_SCOPE, Boolean.TRUE); return consumer.getAsyncProcessor().process(exchange, callback); } } diff --git a/components/camel-opentelemetry/pom.xml b/components/camel-opentelemetry/pom.xml index b62c3901e81..403aa9fb0a9 100644 --- a/components/camel-opentelemetry/pom.xml +++ b/components/camel-opentelemetry/pom.xml @@ -135,6 +135,43 @@ <version>${junit-pioneer-version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-cxf-rest</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-cxf-common</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-transports-http-undertow</artifactId> + <version>${cxf-version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>io.undertow</groupId> + <artifactId>undertow-servlet</artifactId> + </exclusion> + <exclusion> + <groupId>io.undertow</groupId> + <artifactId>undertow-servlet-jakarta</artifactId> + </exclusion> + <exclusion> + <groupId>io.undertow</groupId> + <artifactId>undertow-core</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-undertow</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/AsyncCxfTest.java b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/AsyncCxfTest.java new file mode 100644 index 00000000000..a7d22656c96 --- /dev/null +++ b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/AsyncCxfTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.opentelemetry; + +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.cxf.common.CXFTestSupport; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.jupiter.api.Test; + +class AsyncCxfTest extends CamelOpenTelemetryTestSupport { + + private static int port1 = CXFTestSupport.getPort1(); + + private static SpanTestData[] testdata = {}; // not used yet, fix context leak first + + AsyncCxfTest() { + super(testdata); + } + + @Test + void testRoute() throws InterruptedException { + MockEndpoint mock = getMockEndpoint("mock:end"); + mock.expectedMessageCount(4); + int num = 4; + for (int i = 0; i < num; i++) { + template.requestBody("direct:start", "foo"); + } + mock.assertIsSatisfied(5000); + verifyTraceSpanNumbers(num, 7); + } + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start").routeId("myRoute") + .to("direct:send") + .end(); + + from("direct:send") + .log("message") + .to("cxfrs:http://localhost:" + port1 + + "/rest/helloservice/sayHello?synchronous=false"); // setting to 'true' resolves the issue + + restConfiguration() + .port(port1); + + rest("/rest/helloservice") + .post("/sayHello").routeId("rest-GET-say-hi") + .to("direct:sayHi"); + + from("direct:sayHi") + .routeId("mock-GET-say-hi") + .log("example") + .to("mock:end"); + } + }; + } +} diff --git a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CurrentSpanTest.java b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CurrentSpanTest.java index b2a70500593..5e7e7d2bcda 100644 --- a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CurrentSpanTest.java +++ b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CurrentSpanTest.java @@ -106,6 +106,27 @@ class CurrentSpanTest extends CamelOpenTelemetryTestSupport { } + @Test + void testDirectToDirectToAsync() { + SpanTestData[] expectedSpans = { + new SpanTestData().setLabel("asyncmock1:result").setUri("asyncmock1://result").setOperation("asyncmock1") + .setKind(SpanKind.CLIENT), + new SpanTestData().setLabel("direct:foo2").setUri("direct://foo2").setOperation("foo2"), + new SpanTestData().setLabel("direct:foo2").setUri("direct://foo2").setOperation("foo2") + .setKind(SpanKind.CLIENT), + new SpanTestData().setLabel("direct:foo1").setUri("direct://foo1").setOperation("foo1"), + new SpanTestData().setLabel("direct:foo1").setUri("direct://foo1").setOperation("foo1").setKind(SpanKind.CLIENT) + }; + + // direct to direct to async pipeline + template.sendBody("direct:foo1", "Hello World"); + awaitInvalidSpanContext(); + + List<SpanData> spans = verify(expectedSpans, false); + assertEquals(spans.get(0).getParentSpanId(), spans.get(1).getSpanId()); + + } + @Test void testAsyncToSync() { // direct client spans (event spans) are not created, so we saw only two spans in previous tests @@ -201,6 +222,10 @@ class CurrentSpanTest extends CamelOpenTelemetryTestSupport { return new RouteBuilder() { @Override public void configure() { + // direct to direct to async pipeline + from("direct:foo1").to("direct:foo2"); + from("direct:foo2").to("asyncmock1:result"); + // sync pipeline from("direct:bar").to("syncmock:result"); diff --git a/components/camel-tracing/src/main/java/org/apache/camel/tracing/ActiveSpanManager.java b/components/camel-tracing/src/main/java/org/apache/camel/tracing/ActiveSpanManager.java index e7288ff32e8..db4ac8b73d6 100644 --- a/components/camel-tracing/src/main/java/org/apache/camel/tracing/ActiveSpanManager.java +++ b/components/camel-tracing/src/main/java/org/apache/camel/tracing/ActiveSpanManager.java @@ -56,6 +56,13 @@ public final class ActiveSpanManager { * @param span The span */ public static void activate(Exchange exchange, SpanAdapter span) { + if (exchange.getProperty(ExchangePropertyKey.CLOSE_CLIENT_SCOPE, Boolean.FALSE, Boolean.class)) { + //Check if we need to close the CLIENT scope created by + //DirectProducer in async mode before we create a new INTERNAL scope + //for the next DirectConsumer + endScope(exchange); + exchange.setProperty(ExchangePropertyKey.CLOSE_CLIENT_SCOPE, Boolean.FALSE); + } exchange.setProperty(ExchangePropertyKey.ACTIVE_SPAN, new Holder(exchange.getProperty(ExchangePropertyKey.ACTIVE_SPAN, Holder.class), span)); if (Boolean.TRUE.equals(exchange.getContext().isUseMDCLogging())) { diff --git a/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java b/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java index a9f44b186c7..40b33cd2a98 100644 --- a/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java +++ b/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java @@ -13,7 +13,7 @@ public class ExchangeConstantProvider { private static final Map<String, String> MAP; static { - Map<String, String> map = new HashMap<>(159); + Map<String, String> map = new HashMap<>(160); map.put("ACCEPT_CONTENT_TYPE", "CamelAcceptContentType"); map.put("ACTIVE_SPAN", "OpenTracing.activeSpan"); map.put("AGGREGATED_COLLECTION_GUARD", "CamelAggregatedCollectionGuard"); @@ -37,6 +37,7 @@ public class ExchangeConstantProvider { map.put("CHARSET_NAME", "CamelCharsetName"); map.put("CIRCUIT_BREAKER_STATE", "CamelCircuitBreakerState"); map.put("CLAIM_CHECK_REPOSITORY", "CamelClaimCheckRepository"); + map.put("CLOSE_CLIENT_SCOPE", "OpenTracing.closeClientScope"); map.put("COMPILE_SCRIPT", "CamelCompileScript"); map.put("CONTENT_ENCODING", "Content-Encoding"); map.put("CONTENT_LENGTH", "Content-Length"); diff --git a/core/camel-api/src/main/java/org/apache/camel/Exchange.java b/core/camel-api/src/main/java/org/apache/camel/Exchange.java index 3d21ca663ee..028c6773a3c 100644 --- a/core/camel-api/src/main/java/org/apache/camel/Exchange.java +++ b/core/camel-api/src/main/java/org/apache/camel/Exchange.java @@ -68,6 +68,7 @@ import org.apache.camel.spi.annotations.ConstantProvider; public interface Exchange extends VariableAware { String ACTIVE_SPAN = "OpenTracing.activeSpan"; + String CLOSE_CLIENT_SCOPE = "OpenTracing.closeClientScope"; String AUTHENTICATION = "CamelAuthentication"; String AUTHENTICATION_FAILURE_POLICY_ID = "CamelAuthenticationFailurePolicyId"; @Deprecated(since = "2.20.0") diff --git a/core/camel-api/src/main/java/org/apache/camel/ExchangePropertyKey.java b/core/camel-api/src/main/java/org/apache/camel/ExchangePropertyKey.java index fb04bedf8fa..9146fd8780d 100644 --- a/core/camel-api/src/main/java/org/apache/camel/ExchangePropertyKey.java +++ b/core/camel-api/src/main/java/org/apache/camel/ExchangePropertyKey.java @@ -24,6 +24,7 @@ import org.apache.camel.spi.CircuitBreakerConstants; public enum ExchangePropertyKey { ACTIVE_SPAN(Exchange.ACTIVE_SPAN), + CLOSE_CLIENT_SCOPE(Exchange.CLOSE_CLIENT_SCOPE), AGGREGATED_COMPLETED_BY(Exchange.AGGREGATED_COMPLETED_BY), AGGREGATED_CORRELATION_KEY(Exchange.AGGREGATED_CORRELATION_KEY), AGGREGATED_SIZE(Exchange.AGGREGATED_SIZE),