This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch fix/CAMEL-23564 in repository https://gitbox.apache.org/repos/asf/camel.git
commit a3dc187cf54ff7cc6309caeb0d0d133bc52f5039 Author: Claus Ibsen <[email protected]> AuthorDate: Mon Jun 8 11:09:48 2026 +0200 CAMEL-23564: Make OTEL_BAGGAGE_* headers visible in Baggage.current() without traceProcessors The TraceProcessorsOtelInterceptStrategy only read baggage from exchange properties (CamelBaggage_*), but not from exchange headers (OTEL_BAGGAGE_*). This meant that baggage set via .setHeader("OTEL_BAGGAGE_X", ...) was only visible during span creation but not inside subsequent processors via Baggage.current(). The fix adds getBaggageFromHeaders() to also collect OTEL_BAGGAGE_* headers when setting up the processor scope, so both mechanisms now work regardless of the traceProcessors setting. Co-Authored-By: Claude Opus 4.6 <[email protected]> Signed-off-by: Claus Ibsen <[email protected]> --- .../TraceProcessorsOtelInterceptStrategy.java | 33 ++++++--- .../BaggageHeaderWithoutProcessorSpansTest.java | 80 ++++++++++++++++++++++ 2 files changed, 102 insertions(+), 11 deletions(-) diff --git a/components/camel-opentelemetry2/src/main/java/org/apache/camel/opentelemetry2/TraceProcessorsOtelInterceptStrategy.java b/components/camel-opentelemetry2/src/main/java/org/apache/camel/opentelemetry2/TraceProcessorsOtelInterceptStrategy.java index 23e0165cdb19..9748e4498cbe 100644 --- a/components/camel-opentelemetry2/src/main/java/org/apache/camel/opentelemetry2/TraceProcessorsOtelInterceptStrategy.java +++ b/components/camel-opentelemetry2/src/main/java/org/apache/camel/opentelemetry2/TraceProcessorsOtelInterceptStrategy.java @@ -16,6 +16,7 @@ */ package org.apache.camel.opentelemetry2; +import java.util.Map; import java.util.concurrent.CompletableFuture; import io.opentelemetry.api.baggage.Baggage; @@ -37,6 +38,8 @@ import org.apache.camel.telemetry.SpanStorageManagerExchange; */ public class TraceProcessorsOtelInterceptStrategy implements InterceptStrategy { + private static final String BAGGAGE_HEADER_PREFIX = "OTEL_BAGGAGE_"; + // NOTE: this is an implementation detail that the interceptor should not know. // We are temporarily using this to evaluate as a patch for a context leak problem we're suffering. // Once we are clear this is correctly fixed and no more corner cases, then, we should change the TraceProcessorsInterceptStrategy @@ -63,7 +66,7 @@ public class TraceProcessorsOtelInterceptStrategy implements InterceptStrategy { Span activeSpan = spanStorage.peek(exchange); if (activeSpan != null) { OpenTelemetrySpanAdapter otelSpan = (OpenTelemetrySpanAdapter) activeSpan; - Baggage baggage = getBaggageFromProperties(otelSpan.getBaggage(), exchange); + Baggage baggage = collectBaggage(otelSpan.getBaggage(), exchange); try (Scope scope = otelSpan.getSpan().makeCurrent(); Scope baggageScope = baggage.makeCurrent()) { processor.process(exchange); @@ -78,7 +81,7 @@ public class TraceProcessorsOtelInterceptStrategy implements InterceptStrategy { Span activeSpan = spanStorage.peek(exchange); if (activeSpan != null) { OpenTelemetrySpanAdapter otelSpan = (OpenTelemetrySpanAdapter) activeSpan; - Baggage baggage = getBaggageFromProperties(otelSpan.getBaggage(), exchange); + Baggage baggage = collectBaggage(otelSpan.getBaggage(), exchange); try (Scope scope = otelSpan.getSpan().makeCurrent(); Scope baggageScope = baggage.makeCurrent()) { return processor.process(exchange, doneSync -> { @@ -101,26 +104,34 @@ public class TraceProcessorsOtelInterceptStrategy implements InterceptStrategy { } } - // We inspect the exchange in order to find any baggage variable + private Baggage collectBaggage(Baggage baggage, Exchange exchange) { + baggage = getBaggageFromProperties(baggage, exchange); + baggage = getBaggageFromHeaders(baggage, exchange); + return baggage; + } + private Baggage getBaggageFromProperties(Baggage baggage, Exchange exchange) { for (String propertyKey : exchange.getProperties().keySet()) { - String key = getBaggageVar(propertyKey); - if (key != null) { + if (propertyKey != null && propertyKey.startsWith(org.apache.camel.telemetry.Tracer.BAGGAGE_PROPERTY)) { + String key = propertyKey.substring(org.apache.camel.telemetry.Tracer.BAGGAGE_PROPERTY.length()); String value = exchange.getProperty(propertyKey) == null ? null : exchange.getProperty(propertyKey).toString(); baggage = baggage.toBuilder().put(key, value).build(); } } - return baggage; } - private String getBaggageVar(String key) { - if (key == null || !key.startsWith(org.apache.camel.telemetry.Tracer.BAGGAGE_PROPERTY)) { - return null; + private Baggage getBaggageFromHeaders(Baggage baggage, Exchange exchange) { + for (Map.Entry<String, Object> entry : exchange.getMessage().getHeaders().entrySet()) { + String headerKey = entry.getKey(); + if (headerKey != null && headerKey.startsWith(BAGGAGE_HEADER_PREFIX)) { + String key = headerKey.substring(BAGGAGE_HEADER_PREFIX.length()); + String value = entry.getValue() == null ? null : entry.getValue().toString(); + baggage = baggage.toBuilder().put(key, value).build(); + } } - - return key.substring(org.apache.camel.telemetry.Tracer.BAGGAGE_PROPERTY.length()); + return baggage; } } diff --git a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/BaggageHeaderWithoutProcessorSpansTest.java b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/BaggageHeaderWithoutProcessorSpansTest.java new file mode 100644 index 000000000000..96dd9d67cb7e --- /dev/null +++ b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/BaggageHeaderWithoutProcessorSpansTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.opentelemetry2; + +import java.io.IOException; +import java.util.Map; + +import io.opentelemetry.api.baggage.Baggage; +import io.opentelemetry.api.trace.Tracer; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.opentelemetry2.CamelOpenTelemetryExtension.OtelTrace; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Verifies that OTEL_BAGGAGE_* headers set in a route are visible in Baggage.current() inside processors without + * requiring traceProcessors=true (CAMEL-23564). + */ +public class BaggageHeaderWithoutProcessorSpansTest extends OpenTelemetryTracerTestSupport { + + Tracer tracer = otelExtension.getOpenTelemetry().getTracer("baggageTest"); + + @Override + protected CamelContext createCamelContext() throws Exception { + OpenTelemetryTracer tst = new OpenTelemetryTracer(); + tst.setTracer(tracer); + tst.setContextPropagators(otelExtension.getOpenTelemetry().getPropagators()); + // traceProcessors is NOT enabled — the default + CamelContext context = super.createCamelContext(); + CamelContextAware.trySetCamelContext(tst, context); + tst.init(context); + return context; + } + + @Test + void testBaggageHeaderVisibleWithoutProcessorSpans() throws IOException { + template.sendBody("direct:start", "my-body"); + Map<String, OtelTrace> traces = otelExtension.getTraces(); + assertEquals(1, traces.size()); + } + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start") + .routeId("start") + .setHeader("OTEL_BAGGAGE_BusinessReference", constant("REF-42")) + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + assertEquals("REF-42", Baggage.current().getEntryValue("BusinessReference")); + } + }) + .to("log:info"); + } + }; + } +}
