This is an automated email from the ASF dual-hosted git repository.

jpoth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 26dd138bac5 CAMEL-20643: Add option to propagate Opentelemtry Context 
even when tracing is disabled for a Camel Processor
26dd138bac5 is described below

commit 26dd138bac51761fb4e82684dbffb3e9c6195587
Author: John Poth <poth.j...@gmail.com>
AuthorDate: Wed Apr 3 11:11:56 2024 +0200

    CAMEL-20643: Add option to propagate Opentelemtry Context even when tracing 
is disabled for a Camel Processor
---
 .../OpenTelemetryTracingStrategy.java              |  72 +++++++++++-
 .../CamelOpenTelemetryTestSupport.java             |   4 +
 ...lemetryTracingStrategyPropagateContextTest.java | 128 +++++++++++++++++++++
 3 files changed, 201 insertions(+), 3 deletions(-)

diff --git 
a/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategy.java
 
b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategy.java
index 7208d53dd8b..c827ca06c5a 100644
--- 
a/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategy.java
+++ 
b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategy.java
@@ -36,6 +36,8 @@ public class OpenTelemetryTracingStrategy implements 
InterceptStrategy {
 
     private final OpenTelemetryTracer tracer;
 
+    private boolean propagateContext;
+
     public OpenTelemetryTracingStrategy(OpenTelemetryTracer tracer) {
         this.tracer = tracer;
     }
@@ -45,11 +47,34 @@ public class OpenTelemetryTracingStrategy implements 
InterceptStrategy {
             CamelContext camelContext,
             NamedNode processorDefinition, Processor target, Processor 
nextTarget)
             throws Exception {
-        if (!shouldTrace(processorDefinition)) {
+        if (shouldTrace(processorDefinition)) {
+            return new PropagateContextAndCreateSpan(processorDefinition, 
target);
+        } else if (isPropagateContext()) {
+            return new PropagateContext(target);
+        } else {
             return new DelegateAsyncProcessor(target);
         }
+    }
+
+    public boolean isPropagateContext() {
+        return propagateContext;
+    }
+
+    public void setPropagateContext(boolean propagateContext) {
+        this.propagateContext = propagateContext;
+    }
 
-        return new DelegateAsyncProcessor((Exchange exchange) -> {
+    private class PropagateContextAndCreateSpan implements Processor {
+        private final NamedNode processorDefinition;
+        private final Processor target;
+
+        public PropagateContextAndCreateSpan(NamedNode processorDefinition, 
Processor target) {
+            this.processorDefinition = processorDefinition;
+            this.target = target;
+        }
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
             Span span = null;
             OpenTelemetrySpanAdapter spanWrapper = (OpenTelemetrySpanAdapter) 
ActiveSpanManager.getSpan(exchange);
             if (spanWrapper != null) {
@@ -86,7 +111,48 @@ public class OpenTelemetryTracingStrategy implements 
InterceptStrategy {
 
                 processorSpan.end();
             }
-        });
+        }
+    }
+
+    private class PropagateContext implements Processor {
+        private final Processor target;
+
+        public PropagateContext(Processor target) {
+            this.target = target;
+        }
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
+            Span span = null;
+            OpenTelemetrySpanAdapter spanWrapper = (OpenTelemetrySpanAdapter) 
ActiveSpanManager.getSpan(exchange);
+            if (spanWrapper != null) {
+                span = spanWrapper.getOpenTelemetrySpan();
+            }
+
+            if (span == null) {
+                target.process(exchange);
+                return;
+            }
+
+            boolean activateExchange = !(target instanceof 
GetCorrelationContextProcessor
+                    || target instanceof SetCorrelationContextProcessor);
+
+            if (activateExchange) {
+                ActiveSpanManager.activate(exchange, new 
OpenTelemetrySpanAdapter(span));
+            }
+
+            try {
+                target.process(exchange);
+            } catch (Exception ex) {
+                span.setStatus(StatusCode.ERROR);
+                span.recordException(ex);
+                throw ex;
+            } finally {
+                if (activateExchange) {
+                    ActiveSpanManager.deactivate(exchange);
+                }
+            }
+        }
     }
 
     private static String getComponentName(NamedNode processorDefinition) {
diff --git 
a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java
 
b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java
index 1bc2bfd1ea4..029a9afdf27 100644
--- 
a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java
+++ 
b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java
@@ -94,6 +94,10 @@ class CamelOpenTelemetryTestSupport extends CamelTestSupport 
{
         return null;
     }
 
+    protected OpenTelemetryTracer getOttracer() {
+        return ottracer;
+    }
+
     protected void verify() {
         verify(expected, false);
     }
diff --git 
a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategyPropagateContextTest.java
 
b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategyPropagateContextTest.java
new file mode 100644
index 00000000000..bcf27705ab4
--- /dev/null
+++ 
b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategyPropagateContextTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.opentelemetry;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanBuilder;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.InterceptStrategy;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import static org.apache.camel.test.junit5.TestSupport.fileUri;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class OpenTelemetryTracingStrategyPropagateContextTest extends 
CamelOpenTelemetryTestSupport {
+
+    @TempDir
+    private static Path tempDirectory;
+
+    private final static SpanTestData[] testdata = {
+            new 
SpanTestData().setLabel("camel-process").setOperation("delayed")
+                    .setParentId(2),
+            new 
SpanTestData().setLabel("camel-process").setOperation("WithSpan.secondMethod")
+                    .setParentId(2),
+            new 
SpanTestData().setLabel("camel-process").setOperation("file").setKind(SpanKind.SERVER)
+    };
+
+    OpenTelemetryTracingStrategyPropagateContextTest() {
+        super(testdata);
+    }
+
+    @BeforeAll
+    public static void createFile() throws IOException {
+        Files.createFile(tempDirectory.resolve("file.txt"));
+    }
+
+    @Test
+    void testTracingOfProcessors() throws IOException, InterruptedException {
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(1).create();
+        assertTrue(notify.matches(30, TimeUnit.SECONDS));
+        verify(true);
+    }
+
+    @Override
+    protected String getExcludePatterns() {
+        return "longRunningProcess";
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from(fileUri(tempDirectory)).routeId("serviceA")
+                        .process(exchange -> {
+                            longRunningProcess();
+                        }).id("longRunningProcess")
+                        .delay(simple("${random(0,500)}")).id("delayed");
+            }
+
+            private void longRunningProcess() {
+                firstMethod();
+                secondMethod();
+            }
+
+            private void firstMethod() {
+                // no Span created by Camel
+            }
+
+            // Simulate io.opentelemetry.instrumentation.annotations.@WithSpan
+            // in order to avoid having to start an HTTP sever just to collect 
the Spans
+            // see 
https://github.com/open-telemetry/opentelemetry-java-examples/tree/main/telemetry-testing
+            //@WithSpan
+            public void secondMethod() {
+                // The Context should be propagated
+                Assertions.assertNotSame(Context.root(), Context.current(), 
"OpenTelemetry was not propagated !");
+                // build and start a custom Span similar to what @WithSpan 
would do
+                SpanBuilder builder = 
getOttracer().getTracer().spanBuilder("WithSpan.secondMethod");
+                Span span = builder.setParent(Context.current())
+                        .setAttribute(COMPONENT_KEY, "custom")
+                        .startSpan();
+                //noinspection EmptyTryBlock
+                try (Scope ignored = span.makeCurrent()) {
+                    // do work
+                } finally {
+                    span.end();
+                }
+
+            }
+        };
+    }
+
+    @Override
+    protected Function<OpenTelemetryTracer, InterceptStrategy> 
getTracingStrategy() {
+        return (tracer) -> {
+            OpenTelemetryTracingStrategy strategy = new 
OpenTelemetryTracingStrategy(tracer);
+            strategy.setPropagateContext(true);
+            return strategy;
+        };
+    }
+}

Reply via email to