This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 8788f722922 Camel 21302 (#16008)
8788f722922 is described below

commit 8788f722922d066cc314c44ed4e60735109e9f07
Author: Freeman(Yue) Fang <freeman.f...@gmail.com>
AuthorDate: Tue Oct 22 05:30:50 2024 -0400

    Camel 21302 (#16008)
    
    * camel-opentelemetry: add async CXF reproducer AsyncCxfTest
    
    (cherry picked from commit 7d83a62b8e442dc9ac6fd79b153192add940301e)
    
    * [CAMEL-21302]camel-opentelemetry context leak with direct async producer
    
    * Revert "CAMEL-21309: camel-cxf - Force using sync client when using 
tracing/opentelemetry as otherwise spans are not working correctly. (#15816)"
    
    This reverts commit 45961c941382cbb43fe27cd861ab7c4e9fc86d8f.
    
    * Revert "CAMEL-21309: camel-cxf - Force using sync client when using 
tracing/opentelemetry as otherwise spans are not working correctly."
    
    This reverts commit f5e2b26fdbeb5ffd494b3e4513bacc8ee53ad354.
    
    * [CAMEL-21302]remove unnecessary package import
    
    ---------
    
    Co-authored-by: John Poth <poth.j...@gmail.com>
---
 .../camel/component/cxf/jaxrs/CxfRsProducer.java   | 22 +------
 .../camel/component/cxf/jaxws/CxfProducer.java     | 15 +----
 .../camel/component/direct/DirectProducer.java     |  4 ++
 components/camel-opentelemetry/pom.xml             | 37 +++++++++++
 .../apache/camel/opentelemetry/AsyncCxfTest.java   | 75 ++++++++++++++++++++++
 .../camel/opentelemetry/CurrentSpanTest.java       | 25 ++++++++
 .../apache/camel/tracing/ActiveSpanManager.java    |  7 ++
 .../org/apache/camel/ExchangeConstantProvider.java |  3 +-
 .../src/main/java/org/apache/camel/Exchange.java   |  1 +
 .../java/org/apache/camel/ExchangePropertyKey.java |  1 +
 10 files changed, 156 insertions(+), 34 deletions(-)

diff --git 
a/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
 
b/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
index 20b407fee6d..d03dfcee08f 100644
--- 
a/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
+++ 
b/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
@@ -42,7 +42,6 @@ import jakarta.ws.rs.core.Response;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.Message;
 import org.apache.camel.component.cxf.common.CxfOperationException;
 import org.apache.camel.component.cxf.common.message.CxfConstants;
@@ -109,17 +108,6 @@ public class CxfRsProducer extends DefaultAsyncProducer {
 
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
-        // if using camel-tracer then execute this synchronously due to 
CXF-9063
-        if (exchange.getProperty(ExchangePropertyKey.ACTIVE_SPAN) != null) {
-            try {
-                process(exchange);
-            } catch (Exception e) {
-                exchange.setException(e);
-            }
-            callback.done(true);
-            return true;
-        }
-
         try {
             Message inMessage = exchange.getIn();
             Boolean httpClientAPI = 
inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_USING_HTTP_API, Boolean.class);
@@ -134,6 +122,7 @@ public class CxfRsProducer extends DefaultAsyncProducer {
             }
             return false;
         } catch (Exception exception) {
+            LOG.error("Error invoking request", exception);
             exchange.setException(exception);
             callback.done(true);
             return true;
@@ -141,8 +130,6 @@ public class CxfRsProducer extends DefaultAsyncProducer {
     }
 
     protected void invokeAsyncHttpClient(Exchange exchange, final 
AsyncCallback callback) throws Exception {
-        LOG.trace("Process exchange: {} (asynchronously)", exchange);
-
         Message inMessage = exchange.getIn();
         JAXRSClientFactoryBean cfb = 
clientFactoryBeanCache.get(CxfRsEndpointUtils
                 .getEffectiveAddress(exchange, ((CxfRsEndpoint) 
getEndpoint()).getAddress()));
@@ -201,8 +188,6 @@ public class CxfRsProducer extends DefaultAsyncProducer {
     }
 
     protected void invokeAsyncProxyClient(Exchange exchange, final 
AsyncCallback callback) throws Exception {
-        LOG.trace("Process exchange: {} (asynchronously)", exchange);
-
         Message inMessage = exchange.getIn();
         Object[] varValues = 
inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_VAR_VALUES, Object[].class);
         String methodName = inMessage.getHeader(CxfConstants.OPERATION_NAME, 
String.class);
@@ -278,6 +263,7 @@ public class CxfRsProducer extends DefaultAsyncProducer {
     }
 
     protected void setupClientMatrix(WebClient client, Exchange exchange) 
throws Exception {
+
         org.apache.cxf.message.Message cxfMessage
                 = (org.apache.cxf.message.Message) 
exchange.getIn().getHeader(CxfConstants.CAMEL_CXF_MESSAGE);
         if (cxfMessage != null) {
@@ -308,8 +294,6 @@ public class CxfRsProducer extends DefaultAsyncProducer {
     }
 
     protected void invokeHttpClient(Exchange exchange) throws Exception {
-        LOG.trace("Process exchange: {} (synchronously)", exchange);
-
         Message inMessage = exchange.getIn();
         JAXRSClientFactoryBean cfb = 
clientFactoryBeanCache.get(CxfRsEndpointUtils
                 .getEffectiveAddress(exchange, ((CxfRsEndpoint) 
getEndpoint()).getAddress()));
@@ -457,8 +441,6 @@ public class CxfRsProducer extends DefaultAsyncProducer {
     }
 
     protected void invokeProxyClient(Exchange exchange) throws Exception {
-        LOG.trace("Process exchange: {} (synchronously)", exchange);
-
         Message inMessage = exchange.getIn();
         Object[] varValues = 
inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_VAR_VALUES, Object[].class);
         String methodName = inMessage.getHeader(CxfConstants.OPERATION_NAME, 
String.class);
diff --git 
a/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java
 
b/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java
index 03a499c2c36..ca713e0ebe9 100644
--- 
a/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java
+++ 
b/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java
@@ -32,7 +32,6 @@ import javax.xml.namespace.QName;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.cxf.common.CxfPayload;
 import org.apache.camel.component.cxf.common.DataFormat;
@@ -100,18 +99,8 @@ public class CxfProducer extends DefaultAsyncProducer {
     // so we don't delegate the sync process call to the async process
     @Override
     public boolean process(Exchange camelExchange, AsyncCallback callback) {
-        // if using camel-tracer then execute this synchronously due to 
CXF-9063
-        if (camelExchange.getProperty(ExchangePropertyKey.ACTIVE_SPAN) != 
null) {
-            try {
-                process(camelExchange);
-            } catch (Exception e) {
-                camelExchange.setException(e);
-            }
-            callback.done(true);
-            return true;
-        }
+        LOG.trace("Process exchange: {} in an async way.", camelExchange);
 
-        LOG.trace("Process exchange: {} (asynchronously)", camelExchange);
         try {
             // create CXF exchange
             ExchangeImpl cxfExchange = new ExchangeImpl();
@@ -148,7 +137,7 @@ public class CxfProducer extends DefaultAsyncProducer {
      */
     @Override
     public void process(Exchange camelExchange) throws Exception {
-        LOG.trace("Process exchange: {} (synchronously)", camelExchange);
+        LOG.trace("Process exchange: {} in sync way.", camelExchange);
 
         // create CXF exchange
         ExchangeImpl cxfExchange = new ExchangeImpl();
diff --git 
a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java
 
b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java
index 714d00e673e..221cb70ef1e 100644
--- 
a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java
+++ 
b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.direct;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.support.DefaultAsyncProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -93,6 +94,9 @@ public class DirectProducer extends DefaultAsyncProducer {
                     callback.done(true);
                     return true;
                 } else {
+                    //Ensure we can close the CLIENT Scope created by this 
DirectProducer
+                    //in the same thread
+                    
exchange.setProperty(ExchangePropertyKey.CLOSE_CLIENT_SCOPE, Boolean.TRUE);
                     return consumer.getAsyncProcessor().process(exchange, 
callback);
                 }
             }
diff --git a/components/camel-opentelemetry/pom.xml 
b/components/camel-opentelemetry/pom.xml
index b62c3901e81..403aa9fb0a9 100644
--- a/components/camel-opentelemetry/pom.xml
+++ b/components/camel-opentelemetry/pom.xml
@@ -135,6 +135,43 @@
             <version>${junit-pioneer-version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-cxf-rest</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-cxf-common</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-transports-http-undertow</artifactId>
+            <version>${cxf-version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>io.undertow</groupId>
+                    <artifactId>undertow-servlet</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.undertow</groupId>
+                    <artifactId>undertow-servlet-jakarta</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.undertow</groupId>
+                    <artifactId>undertow-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-undertow</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/AsyncCxfTest.java
 
b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/AsyncCxfTest.java
new file mode 100644
index 00000000000..a7d22656c96
--- /dev/null
+++ 
b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/AsyncCxfTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.opentelemetry;
+
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.cxf.common.CXFTestSupport;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Test;
+
+class AsyncCxfTest extends CamelOpenTelemetryTestSupport {
+
+    private static int port1 = CXFTestSupport.getPort1();
+
+    private static SpanTestData[] testdata = {}; // not used yet, fix context 
leak first
+
+    AsyncCxfTest() {
+        super(testdata);
+    }
+
+    @Test
+    void testRoute() throws InterruptedException {
+        MockEndpoint mock = getMockEndpoint("mock:end");
+        mock.expectedMessageCount(4);
+        int num = 4;
+        for (int i = 0; i < num; i++) {
+            template.requestBody("direct:start", "foo");
+        }
+        mock.assertIsSatisfied(5000);
+        verifyTraceSpanNumbers(num, 7);
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:start").routeId("myRoute")
+                        .to("direct:send")
+                        .end();
+
+                from("direct:send")
+                        .log("message")
+                        .to("cxfrs:http://localhost:"; + port1
+                            + 
"/rest/helloservice/sayHello?synchronous=false"); // setting to 'true' resolves 
the issue
+
+                restConfiguration()
+                        .port(port1);
+
+                rest("/rest/helloservice")
+                        .post("/sayHello").routeId("rest-GET-say-hi")
+                        .to("direct:sayHi");
+
+                from("direct:sayHi")
+                        .routeId("mock-GET-say-hi")
+                        .log("example")
+                        .to("mock:end");
+            }
+        };
+    }
+}
diff --git 
a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CurrentSpanTest.java
 
b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CurrentSpanTest.java
index b2a70500593..5e7e7d2bcda 100644
--- 
a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CurrentSpanTest.java
+++ 
b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CurrentSpanTest.java
@@ -106,6 +106,27 @@ class CurrentSpanTest extends 
CamelOpenTelemetryTestSupport {
 
     }
 
+    @Test
+    void testDirectToDirectToAsync() {
+        SpanTestData[] expectedSpans = {
+                new 
SpanTestData().setLabel("asyncmock1:result").setUri("asyncmock1://result").setOperation("asyncmock1")
+                        .setKind(SpanKind.CLIENT),
+                new 
SpanTestData().setLabel("direct:foo2").setUri("direct://foo2").setOperation("foo2"),
+                new 
SpanTestData().setLabel("direct:foo2").setUri("direct://foo2").setOperation("foo2")
+                        .setKind(SpanKind.CLIENT),
+                new 
SpanTestData().setLabel("direct:foo1").setUri("direct://foo1").setOperation("foo1"),
+                new 
SpanTestData().setLabel("direct:foo1").setUri("direct://foo1").setOperation("foo1").setKind(SpanKind.CLIENT)
+        };
+
+        // direct to direct to async pipeline
+        template.sendBody("direct:foo1", "Hello World");
+        awaitInvalidSpanContext();
+
+        List<SpanData> spans = verify(expectedSpans, false);
+        assertEquals(spans.get(0).getParentSpanId(), spans.get(1).getSpanId());
+
+    }
+
     @Test
     void testAsyncToSync() {
         // direct client spans (event spans) are not created, so we saw only 
two spans in previous tests
@@ -201,6 +222,10 @@ class CurrentSpanTest extends 
CamelOpenTelemetryTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() {
+                // direct to direct to async pipeline
+                from("direct:foo1").to("direct:foo2");
+                from("direct:foo2").to("asyncmock1:result");
+
                 // sync pipeline
                 from("direct:bar").to("syncmock:result");
 
diff --git 
a/components/camel-tracing/src/main/java/org/apache/camel/tracing/ActiveSpanManager.java
 
b/components/camel-tracing/src/main/java/org/apache/camel/tracing/ActiveSpanManager.java
index e7288ff32e8..db4ac8b73d6 100644
--- 
a/components/camel-tracing/src/main/java/org/apache/camel/tracing/ActiveSpanManager.java
+++ 
b/components/camel-tracing/src/main/java/org/apache/camel/tracing/ActiveSpanManager.java
@@ -56,6 +56,13 @@ public final class ActiveSpanManager {
      * @param span     The span
      */
     public static void activate(Exchange exchange, SpanAdapter span) {
+        if (exchange.getProperty(ExchangePropertyKey.CLOSE_CLIENT_SCOPE, 
Boolean.FALSE, Boolean.class)) {
+            //Check if we need to close the CLIENT scope created by
+            //DirectProducer in async mode before we create a new INTERNAL 
scope
+            //for the next DirectConsumer
+            endScope(exchange);
+            exchange.setProperty(ExchangePropertyKey.CLOSE_CLIENT_SCOPE, 
Boolean.FALSE);
+        }
         exchange.setProperty(ExchangePropertyKey.ACTIVE_SPAN,
                 new 
Holder(exchange.getProperty(ExchangePropertyKey.ACTIVE_SPAN, Holder.class), 
span));
         if (Boolean.TRUE.equals(exchange.getContext().isUseMDCLogging())) {
diff --git 
a/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
 
b/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
index a9f44b186c7..40b33cd2a98 100644
--- 
a/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
+++ 
b/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
@@ -13,7 +13,7 @@ public class ExchangeConstantProvider {
 
     private static final Map<String, String> MAP;
     static {
-        Map<String, String> map = new HashMap<>(159);
+        Map<String, String> map = new HashMap<>(160);
         map.put("ACCEPT_CONTENT_TYPE", "CamelAcceptContentType");
         map.put("ACTIVE_SPAN", "OpenTracing.activeSpan");
         map.put("AGGREGATED_COLLECTION_GUARD", 
"CamelAggregatedCollectionGuard");
@@ -37,6 +37,7 @@ public class ExchangeConstantProvider {
         map.put("CHARSET_NAME", "CamelCharsetName");
         map.put("CIRCUIT_BREAKER_STATE", "CamelCircuitBreakerState");
         map.put("CLAIM_CHECK_REPOSITORY", "CamelClaimCheckRepository");
+        map.put("CLOSE_CLIENT_SCOPE", "OpenTracing.closeClientScope");
         map.put("COMPILE_SCRIPT", "CamelCompileScript");
         map.put("CONTENT_ENCODING", "Content-Encoding");
         map.put("CONTENT_LENGTH", "Content-Length");
diff --git a/core/camel-api/src/main/java/org/apache/camel/Exchange.java 
b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
index 3d21ca663ee..028c6773a3c 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Exchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
@@ -68,6 +68,7 @@ import org.apache.camel.spi.annotations.ConstantProvider;
 public interface Exchange extends VariableAware {
 
     String ACTIVE_SPAN = "OpenTracing.activeSpan";
+    String CLOSE_CLIENT_SCOPE = "OpenTracing.closeClientScope";
     String AUTHENTICATION = "CamelAuthentication";
     String AUTHENTICATION_FAILURE_POLICY_ID = 
"CamelAuthenticationFailurePolicyId";
     @Deprecated(since = "2.20.0")
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/ExchangePropertyKey.java 
b/core/camel-api/src/main/java/org/apache/camel/ExchangePropertyKey.java
index fb04bedf8fa..9146fd8780d 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExchangePropertyKey.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExchangePropertyKey.java
@@ -24,6 +24,7 @@ import org.apache.camel.spi.CircuitBreakerConstants;
 public enum ExchangePropertyKey {
 
     ACTIVE_SPAN(Exchange.ACTIVE_SPAN),
+    CLOSE_CLIENT_SCOPE(Exchange.CLOSE_CLIENT_SCOPE),
     AGGREGATED_COMPLETED_BY(Exchange.AGGREGATED_COMPLETED_BY),
     AGGREGATED_CORRELATION_KEY(Exchange.AGGREGATED_CORRELATION_KEY),
     AGGREGATED_SIZE(Exchange.AGGREGATED_SIZE),

Reply via email to