This is an automated email from the ASF dual-hosted git repository. jpoth pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 26dd138bac5 CAMEL-20643: Add option to propagate Opentelemtry Context even when tracing is disabled for a Camel Processor 26dd138bac5 is described below commit 26dd138bac51761fb4e82684dbffb3e9c6195587 Author: John Poth <poth.j...@gmail.com> AuthorDate: Wed Apr 3 11:11:56 2024 +0200 CAMEL-20643: Add option to propagate Opentelemtry Context even when tracing is disabled for a Camel Processor --- .../OpenTelemetryTracingStrategy.java | 72 +++++++++++- .../CamelOpenTelemetryTestSupport.java | 4 + ...lemetryTracingStrategyPropagateContextTest.java | 128 +++++++++++++++++++++ 3 files changed, 201 insertions(+), 3 deletions(-) diff --git a/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategy.java b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategy.java index 7208d53dd8b..c827ca06c5a 100644 --- a/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategy.java +++ b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategy.java @@ -36,6 +36,8 @@ public class OpenTelemetryTracingStrategy implements InterceptStrategy { private final OpenTelemetryTracer tracer; + private boolean propagateContext; + public OpenTelemetryTracingStrategy(OpenTelemetryTracer tracer) { this.tracer = tracer; } @@ -45,11 +47,34 @@ public class OpenTelemetryTracingStrategy implements InterceptStrategy { CamelContext camelContext, NamedNode processorDefinition, Processor target, Processor nextTarget) throws Exception { - if (!shouldTrace(processorDefinition)) { + if (shouldTrace(processorDefinition)) { + return new PropagateContextAndCreateSpan(processorDefinition, target); + } else if (isPropagateContext()) { + return new PropagateContext(target); + } else { return new DelegateAsyncProcessor(target); } + } + + public boolean isPropagateContext() { + return propagateContext; + } + + public void setPropagateContext(boolean propagateContext) { + this.propagateContext = propagateContext; + } - return new DelegateAsyncProcessor((Exchange exchange) -> { + private class PropagateContextAndCreateSpan implements Processor { + private final NamedNode processorDefinition; + private final Processor target; + + public PropagateContextAndCreateSpan(NamedNode processorDefinition, Processor target) { + this.processorDefinition = processorDefinition; + this.target = target; + } + + @Override + public void process(Exchange exchange) throws Exception { Span span = null; OpenTelemetrySpanAdapter spanWrapper = (OpenTelemetrySpanAdapter) ActiveSpanManager.getSpan(exchange); if (spanWrapper != null) { @@ -86,7 +111,48 @@ public class OpenTelemetryTracingStrategy implements InterceptStrategy { processorSpan.end(); } - }); + } + } + + private class PropagateContext implements Processor { + private final Processor target; + + public PropagateContext(Processor target) { + this.target = target; + } + + @Override + public void process(Exchange exchange) throws Exception { + Span span = null; + OpenTelemetrySpanAdapter spanWrapper = (OpenTelemetrySpanAdapter) ActiveSpanManager.getSpan(exchange); + if (spanWrapper != null) { + span = spanWrapper.getOpenTelemetrySpan(); + } + + if (span == null) { + target.process(exchange); + return; + } + + boolean activateExchange = !(target instanceof GetCorrelationContextProcessor + || target instanceof SetCorrelationContextProcessor); + + if (activateExchange) { + ActiveSpanManager.activate(exchange, new OpenTelemetrySpanAdapter(span)); + } + + try { + target.process(exchange); + } catch (Exception ex) { + span.setStatus(StatusCode.ERROR); + span.recordException(ex); + throw ex; + } finally { + if (activateExchange) { + ActiveSpanManager.deactivate(exchange); + } + } + } } private static String getComponentName(NamedNode processorDefinition) { diff --git a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java index 1bc2bfd1ea4..029a9afdf27 100644 --- a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java +++ b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java @@ -94,6 +94,10 @@ class CamelOpenTelemetryTestSupport extends CamelTestSupport { return null; } + protected OpenTelemetryTracer getOttracer() { + return ottracer; + } + protected void verify() { verify(expected, false); } diff --git a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategyPropagateContextTest.java b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategyPropagateContextTest.java new file mode 100644 index 00000000000..bcf27705ab4 --- /dev/null +++ b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategyPropagateContextTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.opentelemetry; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.NotifyBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.spi.InterceptStrategy; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import static org.apache.camel.test.junit5.TestSupport.fileUri; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class OpenTelemetryTracingStrategyPropagateContextTest extends CamelOpenTelemetryTestSupport { + + @TempDir + private static Path tempDirectory; + + private final static SpanTestData[] testdata = { + new SpanTestData().setLabel("camel-process").setOperation("delayed") + .setParentId(2), + new SpanTestData().setLabel("camel-process").setOperation("WithSpan.secondMethod") + .setParentId(2), + new SpanTestData().setLabel("camel-process").setOperation("file").setKind(SpanKind.SERVER) + }; + + OpenTelemetryTracingStrategyPropagateContextTest() { + super(testdata); + } + + @BeforeAll + public static void createFile() throws IOException { + Files.createFile(tempDirectory.resolve("file.txt")); + } + + @Test + void testTracingOfProcessors() throws IOException, InterruptedException { + NotifyBuilder notify = new NotifyBuilder(context).whenDone(1).create(); + assertTrue(notify.matches(30, TimeUnit.SECONDS)); + verify(true); + } + + @Override + protected String getExcludePatterns() { + return "longRunningProcess"; + } + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from(fileUri(tempDirectory)).routeId("serviceA") + .process(exchange -> { + longRunningProcess(); + }).id("longRunningProcess") + .delay(simple("${random(0,500)}")).id("delayed"); + } + + private void longRunningProcess() { + firstMethod(); + secondMethod(); + } + + private void firstMethod() { + // no Span created by Camel + } + + // Simulate io.opentelemetry.instrumentation.annotations.@WithSpan + // in order to avoid having to start an HTTP sever just to collect the Spans + // see https://github.com/open-telemetry/opentelemetry-java-examples/tree/main/telemetry-testing + //@WithSpan + public void secondMethod() { + // The Context should be propagated + Assertions.assertNotSame(Context.root(), Context.current(), "OpenTelemetry was not propagated !"); + // build and start a custom Span similar to what @WithSpan would do + SpanBuilder builder = getOttracer().getTracer().spanBuilder("WithSpan.secondMethod"); + Span span = builder.setParent(Context.current()) + .setAttribute(COMPONENT_KEY, "custom") + .startSpan(); + //noinspection EmptyTryBlock + try (Scope ignored = span.makeCurrent()) { + // do work + } finally { + span.end(); + } + + } + }; + } + + @Override + protected Function<OpenTelemetryTracer, InterceptStrategy> getTracingStrategy() { + return (tracer) -> { + OpenTelemetryTracingStrategy strategy = new OpenTelemetryTracingStrategy(tracer); + strategy.setPropagateContext(true); + return strategy; + }; + } +}