This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch fix/CAMEL-23709 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 8c9f4a54ca486999fd04a295b1c7430935bdaf0d Author: Claus Ibsen <[email protected]> AuthorDate: Mon Jun 8 10:44:51 2026 +0200 CAMEL-23709: Reduce span verbosity for endpoint-sending processors Introduce EndpointSending marker interface in camel-api. Processors that send to endpoints (to, toD, wireTap, enrich) implement it to signal they produce their own EVENT_SENT span via ExchangeSendingEvent. The telemetry intercept strategy skips creating redundant EVENT_PROCESS processor spans for these processors, reducing span tree depth. Co-Authored-By: Claude <[email protected]> Signed-off-by: Claus Ibsen <[email protected]> --- .../camel/opentelemetry2/BaggageInjectionTest.java | 5 +-- .../camel/opentelemetry2/BaggageSettingTest.java | 5 +-- .../camel/opentelemetry2/EnableProcessorsTest.java | 7 +-- .../apache/camel/opentelemetry2/SpanBeanTest.java | 9 ++-- .../opentelemetry2/SpanCustomizationTest.java | 7 +-- .../camel/opentelemetry2/SpanInjectionTest.java | 7 +-- .../camel/opentelemetry2/SpanToBeanTest.java | 23 ++++------ .../TraceProcessorsInterceptStrategy.java | 52 +++++++++++++++------- .../camel/telemetry/EnableCoreProcessorsTest.java | 9 +--- .../org/apache/camel/telemetry/SpanBeanTest.java | 15 +++---- .../org/apache/camel/telemetry/SpanToBeanTest.java | 27 ++++------- .../java/org/apache/camel/EndpointSending.java | 27 +++++++++++ .../java/org/apache/camel/processor/Enricher.java | 4 +- .../camel/processor/SendDynamicProcessor.java | 3 +- .../org/apache/camel/processor/SendProcessor.java | 3 +- .../apache/camel/processor/WireTapProcessor.java | 3 +- 16 files changed, 109 insertions(+), 97 deletions(-) diff --git a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/BaggageInjectionTest.java b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/BaggageInjectionTest.java index 23a20c2f12a7..d7be5cba8de1 100644 --- a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/BaggageInjectionTest.java +++ b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/BaggageInjectionTest.java @@ -74,7 +74,8 @@ public class BaggageInjectionTest extends OpenTelemetryTracerTestSupport { private void checkTrace(OtelTrace trace) { List<SpanData> spans = trace.getSpans(); - assertEquals(8, spans.size()); + // to("log:info") no longer produces a processor span (SendProcessor implements EndpointSending) + assertEquals(7, spans.size()); SpanData testProducer = spans.get(0); SpanData direct = spans.get(1); SpanData innerProcessor1 = spans.get(2); @@ -82,7 +83,6 @@ public class BaggageInjectionTest extends OpenTelemetryTracerTestSupport { SpanData innerLog = spans.get(4); SpanData innerProcessor2 = spans.get(5); SpanData log = spans.get(6); - SpanData innerToLog = spans.get(7); // Validate span completion assertTrue(testProducer.hasEnded()); @@ -92,7 +92,6 @@ public class BaggageInjectionTest extends OpenTelemetryTracerTestSupport { assertTrue(innerLog.hasEnded()); assertTrue(innerProcessor2.hasEnded()); assertTrue(log.hasEnded()); - assertTrue(innerToLog.hasEnded()); } @Override diff --git a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/BaggageSettingTest.java b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/BaggageSettingTest.java index f953f0098564..e161a6dc3630 100644 --- a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/BaggageSettingTest.java +++ b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/BaggageSettingTest.java @@ -61,14 +61,14 @@ public class BaggageSettingTest extends OpenTelemetryTracerTestSupport { private void checkTrace(OtelTrace trace) { List<SpanData> spans = trace.getSpans(); - assertEquals(7, spans.size()); + // to("log:info") no longer produces a processor span (SendProcessor implements EndpointSending) + assertEquals(6, spans.size()); SpanData testProducer = spans.get(0); SpanData direct = spans.get(1); SpanData setHeaders = spans.get(2); SpanData innerLog = spans.get(3); SpanData innerProcessor = spans.get(4); SpanData log = spans.get(5); - SpanData innerToLog = spans.get(6); // Validate span completion assertTrue(testProducer.hasEnded()); @@ -77,7 +77,6 @@ public class BaggageSettingTest extends OpenTelemetryTracerTestSupport { assertTrue(innerLog.hasEnded()); assertTrue(innerProcessor.hasEnded()); assertTrue(log.hasEnded()); - assertTrue(innerToLog.hasEnded()); } @Override diff --git a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/EnableProcessorsTest.java b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/EnableProcessorsTest.java index 76f4abe6868f..e1470e581b5d 100644 --- a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/EnableProcessorsTest.java +++ b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/EnableProcessorsTest.java @@ -60,13 +60,13 @@ public class EnableProcessorsTest extends OpenTelemetryTracerTestSupport { private void checkTrace(OtelTrace trace) { List<SpanData> spans = trace.getSpans(); - assertEquals(6, spans.size()); + // to("log:info") no longer produces a processor span (SendProcessor implements EndpointSending) + assertEquals(5, spans.size()); SpanData testProducer = spans.get(0); SpanData direct = spans.get(1); SpanData innerLog = spans.get(2); SpanData innerProcessor = spans.get(3); SpanData log = spans.get(4); - SpanData innerToLog = spans.get(5); // Validate span completion assertTrue(testProducer.hasEnded()); @@ -74,14 +74,12 @@ public class EnableProcessorsTest extends OpenTelemetryTracerTestSupport { assertTrue(innerLog.hasEnded()); assertTrue(innerProcessor.hasEnded()); assertTrue(log.hasEnded()); - assertTrue(innerToLog.hasEnded()); // Validate same trace assertEquals(testProducer.getSpanContext().getTraceId(), direct.getSpanContext().getTraceId()); assertEquals(testProducer.getSpanContext().getTraceId(), innerLog.getSpanContext().getTraceId()); assertEquals(testProducer.getSpanContext().getTraceId(), innerProcessor.getSpanContext().getTraceId()); assertEquals(testProducer.getSpanContext().getTraceId(), log.getSpanContext().getTraceId()); - assertEquals(testProducer.getSpanContext().getTraceId(), innerToLog.getSpanContext().getTraceId()); // Validate operations assertEquals(Op.EVENT_RECEIVED.toString(), direct.getAttributes().get(AttributeKey.stringKey("op"))); @@ -93,7 +91,6 @@ public class EnableProcessorsTest extends OpenTelemetryTracerTestSupport { assertEquals(direct.getSpanContext().getSpanId(), innerLog.getParentSpanContext().getSpanId()); assertEquals(direct.getSpanContext().getSpanId(), innerProcessor.getParentSpanContext().getSpanId()); assertEquals(direct.getSpanContext().getSpanId(), log.getParentSpanContext().getSpanId()); - assertEquals(log.getSpanContext().getSpanId(), innerToLog.getParentSpanContext().getSpanId()); } @Override diff --git a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/SpanBeanTest.java b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/SpanBeanTest.java index a785de244f28..5952d2aca42c 100644 --- a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/SpanBeanTest.java +++ b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/SpanBeanTest.java @@ -63,14 +63,14 @@ public class SpanBeanTest extends OpenTelemetryTracerTestSupport { private void checkTrace(OtelTrace trace) { List<SpanData> spans = trace.getSpans(); - assertEquals(7, spans.size()); + // to("log:info") no longer produces a processor span (SendProcessor implements EndpointSending) + assertEquals(6, spans.size()); SpanData testProducer = spans.get(0); SpanData direct = spans.get(1); SpanData innerLog = spans.get(2); SpanData beanProcessor = spans.get(3); SpanData beanMethod = spans.get(4); SpanData log = spans.get(5); - SpanData innerToLog = spans.get(6); // Validate span completion assertTrue(testProducer.hasEnded()); @@ -79,7 +79,6 @@ public class SpanBeanTest extends OpenTelemetryTracerTestSupport { assertTrue(beanProcessor.hasEnded()); assertTrue(beanMethod.hasEnded()); assertTrue(log.hasEnded()); - assertTrue(innerToLog.hasEnded()); // Validate same trace assertEquals(testProducer.getSpanContext().getTraceId(), direct.getSpanContext().getTraceId()); @@ -87,7 +86,6 @@ public class SpanBeanTest extends OpenTelemetryTracerTestSupport { assertEquals(testProducer.getSpanContext().getTraceId(), beanProcessor.getSpanContext().getTraceId()); assertEquals(testProducer.getSpanContext().getTraceId(), beanMethod.getSpanContext().getTraceId()); assertEquals(testProducer.getSpanContext().getTraceId(), log.getSpanContext().getTraceId()); - assertEquals(testProducer.getSpanContext().getTraceId(), innerToLog.getSpanContext().getTraceId()); // Validate operations assertEquals(Op.EVENT_RECEIVED.toString(), direct.getAttributes().get(AttributeKey.stringKey("op"))); @@ -101,13 +99,12 @@ public class SpanBeanTest extends OpenTelemetryTracerTestSupport { assertEquals(direct.getSpanContext().getSpanId(), beanProcessor.getParentSpanContext().getSpanId()); assertEquals(beanProcessor.getSpanContext().getSpanId(), beanMethod.getParentSpanContext().getSpanId()); assertEquals(direct.getSpanContext().getSpanId(), log.getParentSpanContext().getSpanId()); - assertEquals(log.getSpanContext().getSpanId(), innerToLog.getParentSpanContext().getSpanId()); // Validate message logging assertEquals("A message", innerLog.getEvents().get(0).getAttributes().get(AttributeKey.stringKey("message"))); assertEquals( "Exchange[ExchangePattern: InOnly, BodyType: String, Body: my-body]", - innerToLog.getEvents().get(0).getAttributes().get(AttributeKey.stringKey("message"))); + log.getEvents().get(0).getAttributes().get(AttributeKey.stringKey("message"))); } @Override diff --git a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/SpanCustomizationTest.java b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/SpanCustomizationTest.java index 2b1d6c4d91d3..cc4d61938125 100644 --- a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/SpanCustomizationTest.java +++ b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/SpanCustomizationTest.java @@ -64,14 +64,14 @@ public class SpanCustomizationTest extends OpenTelemetryTracerTestSupport { private void checkTrace(OtelTrace trace) { List<SpanData> spans = trace.getSpans(); - assertEquals(7, spans.size()); + // to("log:info") no longer produces a processor span (SendProcessor implements EndpointSending) + assertEquals(6, spans.size()); SpanData testProducer = spans.get(0); SpanData direct = spans.get(1); SpanData innerLog = spans.get(2); SpanData innerProcessor = spans.get(3); SpanData customSpan = spans.get(4); SpanData log = spans.get(5); - SpanData innerToLog = spans.get(6); // Validate span completion assertTrue(testProducer.hasEnded()); @@ -80,16 +80,13 @@ public class SpanCustomizationTest extends OpenTelemetryTracerTestSupport { assertTrue(innerProcessor.hasEnded()); assertTrue(customSpan.hasEnded()); assertTrue(log.hasEnded()); - assertTrue(innerToLog.hasEnded()); // Validate same trace assertEquals(testProducer.getSpanContext().getTraceId(), direct.getSpanContext().getTraceId()); - assertEquals(testProducer.getSpanContext().getTraceId(), direct.getSpanContext().getTraceId()); assertEquals(testProducer.getSpanContext().getTraceId(), innerLog.getSpanContext().getTraceId()); assertEquals(testProducer.getSpanContext().getTraceId(), innerProcessor.getSpanContext().getTraceId()); assertEquals(testProducer.getSpanContext().getTraceId(), customSpan.getSpanContext().getTraceId()); assertEquals(testProducer.getSpanContext().getTraceId(), log.getSpanContext().getTraceId()); - assertEquals(testProducer.getSpanContext().getTraceId(), innerToLog.getSpanContext().getTraceId()); // Validate operations assertEquals(Op.EVENT_RECEIVED.toString(), direct.getAttributes().get(AttributeKey.stringKey("op"))); diff --git a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/SpanInjectionTest.java b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/SpanInjectionTest.java index fb72cd03684d..80182ad54569 100644 --- a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/SpanInjectionTest.java +++ b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/SpanInjectionTest.java @@ -102,13 +102,13 @@ public class SpanInjectionTest extends OpenTelemetryTracerTestSupport { private void checkTrace(OtelTrace trace, String parentTrace, String parentSpan) { List<SpanData> spans = trace.getSpans(); - assertEquals(6, spans.size()); + // to("log:info") no longer produces a processor span (SendProcessor implements EndpointSending) + assertEquals(5, spans.size()); SpanData testProducer = spans.get(0); SpanData direct = spans.get(1); SpanData innerLog = spans.get(2); SpanData innerProcessor = spans.get(3); SpanData log = spans.get(4); - SpanData innerToLog = spans.get(5); // Validate span completion assertTrue(testProducer.hasEnded()); @@ -116,7 +116,6 @@ public class SpanInjectionTest extends OpenTelemetryTracerTestSupport { assertTrue(innerLog.hasEnded()); assertTrue(innerProcessor.hasEnded()); assertTrue(log.hasEnded()); - assertTrue(innerToLog.hasEnded()); // Validate same trace assertEquals(parentTrace, testProducer.getSpanContext().getTraceId()); @@ -124,7 +123,6 @@ public class SpanInjectionTest extends OpenTelemetryTracerTestSupport { assertEquals(testProducer.getSpanContext().getTraceId(), innerLog.getSpanContext().getTraceId()); assertEquals(testProducer.getSpanContext().getTraceId(), innerProcessor.getSpanContext().getTraceId()); assertEquals(testProducer.getSpanContext().getTraceId(), log.getSpanContext().getTraceId()); - assertEquals(testProducer.getSpanContext().getTraceId(), innerToLog.getSpanContext().getTraceId()); // Validate operations assertEquals(Op.EVENT_RECEIVED.toString(), direct.getAttributes().get(AttributeKey.stringKey("op"))); @@ -139,7 +137,6 @@ public class SpanInjectionTest extends OpenTelemetryTracerTestSupport { assertEquals(direct.getSpanContext().getSpanId(), innerLog.getParentSpanContext().getSpanId()); assertEquals(direct.getSpanContext().getSpanId(), innerProcessor.getParentSpanContext().getSpanId()); assertEquals(direct.getSpanContext().getSpanId(), log.getParentSpanContext().getSpanId()); - assertEquals(log.getSpanContext().getSpanId(), innerToLog.getParentSpanContext().getSpanId()); } @Override diff --git a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/SpanToBeanTest.java b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/SpanToBeanTest.java index 64f0eb2d8c1c..d138c89fd8c9 100644 --- a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/SpanToBeanTest.java +++ b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/SpanToBeanTest.java @@ -61,46 +61,39 @@ public class SpanToBeanTest extends OpenTelemetryTracerTestSupport { private void checkTrace(OtelTrace trace) { List<SpanData> spans = trace.getSpans(); - assertEquals(8, spans.size()); + // to("bean:myBean") and to("log:info") no longer produce processor spans (SendProcessor implements EndpointSending) + assertEquals(6, spans.size()); SpanData testProducer = spans.get(0); SpanData direct = spans.get(1); SpanData innerLog = spans.get(2); - SpanData beanTo = spans.get(3); - SpanData beanProcessor = spans.get(4); - SpanData beanMethod = spans.get(5); - SpanData log = spans.get(6); - SpanData innerToLog = spans.get(7); + SpanData beanProcessor = spans.get(3); + SpanData beanMethod = spans.get(4); + SpanData log = spans.get(5); // Validate span completion assertTrue(testProducer.hasEnded()); assertTrue(direct.hasEnded()); assertTrue(innerLog.hasEnded()); - assertTrue(beanTo.hasEnded()); assertTrue(beanProcessor.hasEnded()); assertTrue(beanMethod.hasEnded()); assertTrue(log.hasEnded()); - assertTrue(innerToLog.hasEnded()); // Validate same trace assertEquals(testProducer.getSpanContext().getTraceId(), direct.getSpanContext().getTraceId()); assertEquals(testProducer.getSpanContext().getTraceId(), innerLog.getSpanContext().getTraceId()); - assertEquals(testProducer.getSpanContext().getTraceId(), beanTo.getSpanContext().getTraceId()); assertEquals(testProducer.getSpanContext().getTraceId(), beanProcessor.getSpanContext().getTraceId()); assertEquals(testProducer.getSpanContext().getTraceId(), beanMethod.getSpanContext().getTraceId()); assertEquals(testProducer.getSpanContext().getTraceId(), log.getSpanContext().getTraceId()); - assertEquals(testProducer.getSpanContext().getTraceId(), innerToLog.getSpanContext().getTraceId()); // Validate hierarchy assertFalse(testProducer.getParentSpanContext().isValid()); assertEquals(testProducer.getSpanContext().getSpanId(), direct.getParentSpanContext().getSpanId()); assertEquals(direct.getSpanContext().getSpanId(), innerLog.getParentSpanContext().getSpanId()); - assertEquals(direct.getSpanContext().getSpanId(), beanTo.getParentSpanContext().getSpanId()); - assertEquals(beanTo.getSpanContext().getSpanId(), beanProcessor.getParentSpanContext().getSpanId()); - // NOTE: the bean method belongs to the component ("to") node. - assertEquals(beanTo.getSpanContext().getSpanId(), beanMethod.getParentSpanContext().getSpanId()); + assertEquals(direct.getSpanContext().getSpanId(), beanProcessor.getParentSpanContext().getSpanId()); + // beanMethod's parent is direct (OTel context set by scope wrapper, not the event span) + assertEquals(direct.getSpanContext().getSpanId(), beanMethod.getParentSpanContext().getSpanId()); assertEquals(direct.getSpanContext().getSpanId(), log.getParentSpanContext().getSpanId()); - assertEquals(log.getSpanContext().getSpanId(), innerToLog.getParentSpanContext().getSpanId()); } @Override diff --git a/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/TraceProcessorsInterceptStrategy.java b/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/TraceProcessorsInterceptStrategy.java index 5369cba95192..2185881e7077 100644 --- a/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/TraceProcessorsInterceptStrategy.java +++ b/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/TraceProcessorsInterceptStrategy.java @@ -20,6 +20,8 @@ import java.util.concurrent.CompletableFuture; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; +import org.apache.camel.DelegateProcessor; +import org.apache.camel.EndpointSending; import org.apache.camel.Exchange; import org.apache.camel.NamedNode; import org.apache.camel.Processor; @@ -57,9 +59,7 @@ public class TraceProcessorsInterceptStrategy implements InterceptStrategy { @Override public void process(Exchange exchange) throws Exception { String processorName = processorDefinition.getId() + "-" + processorDefinition.getShortName(); - if ((isCoreProcessEnabled(tracer.isDisableCoreProcessors(), processorDefinition.getShortName()) || - isCustomProcessEnabled(tracer.isTraceProcessors(), processorDefinition.getShortName())) - && tracer.match(processorName, exchange.getContext())) { + if (shouldTrace(processorName, exchange)) { tracer.beginProcessorSpan(exchange, processorName); try { processor.process(exchange); @@ -67,7 +67,6 @@ public class TraceProcessorsInterceptStrategy implements InterceptStrategy { tracer.endProcessorSpan(exchange, processorName); } } else { - // We must always execute this processor.process(exchange); } } @@ -75,11 +74,8 @@ public class TraceProcessorsInterceptStrategy implements InterceptStrategy { @Override public boolean process(Exchange exchange, AsyncCallback callback) { String processorName = processorDefinition.getId() + "-" + processorDefinition.getShortName(); - boolean isTraceProcessor - = (isCoreProcessEnabled(tracer.isDisableCoreProcessors(), processorDefinition.getShortName()) || - isCustomProcessEnabled(tracer.isTraceProcessors(), processorDefinition.getShortName())) - && tracer.match(processorName, exchange.getContext()); - if (isTraceProcessor) { + boolean trace = shouldTrace(processorName, exchange); + if (trace) { try { tracer.beginProcessorSpan(exchange, processorName); } catch (Exception e) { @@ -90,7 +86,7 @@ public class TraceProcessorsInterceptStrategy implements InterceptStrategy { try { callback.done(doneSync); } finally { - if (isTraceProcessor) { + if (trace) { try { tracer.endProcessorSpan(exchange, processorName); } catch (Exception e) { @@ -108,15 +104,39 @@ public class TraceProcessorsInterceptStrategy implements InterceptStrategy { process(exchange, callback); return callback.getFuture(); } + + private boolean shouldTrace(String processorName, Exchange exchange) { + // skip processors that send to an endpoint (to, toD, wireTap, enrich) + // unwrap through any delegate chain (e.g. otel scope wrapper) to find the real processor + if (isEndpointSending(processor)) { + return false; + } + String shortName = processorDefinition.getShortName(); + boolean enabled = isCoreProcessEnabled(shortName) || isCustomProcessEnabled(shortName); + return enabled && tracer.match(processorName, exchange.getContext()); + } + + private boolean isEndpointSending(Processor target) { + Processor p = target; + while (p != null) { + if (p instanceof EndpointSending) { + return true; + } + if (p instanceof DelegateProcessor dp) { + p = dp.getProcessor(); + } else { + break; + } + } + return false; + } } - private boolean isCoreProcessEnabled(boolean isDisableCoreProcessors, String processDefinitionShortName) { - // Any enabled core process which is not a "custom" processor. - return !isDisableCoreProcessors && !processDefinitionShortName.equals("process"); + private boolean isCoreProcessEnabled(String shortName) { + return !tracer.isDisableCoreProcessors() && !"process".equals(shortName); } - private boolean isCustomProcessEnabled(boolean isTraceProcessors, String processDefinitionShortName) { - // Any custom core process. - return isTraceProcessors && processDefinitionShortName.equals("process"); + private boolean isCustomProcessEnabled(String shortName) { + return tracer.isTraceProcessors() && "process".equals(shortName); } } diff --git a/components/camel-telemetry/src/test/java/org/apache/camel/telemetry/EnableCoreProcessorsTest.java b/components/camel-telemetry/src/test/java/org/apache/camel/telemetry/EnableCoreProcessorsTest.java index 62c50a27899d..04b873507a30 100644 --- a/components/camel-telemetry/src/test/java/org/apache/camel/telemetry/EnableCoreProcessorsTest.java +++ b/components/camel-telemetry/src/test/java/org/apache/camel/telemetry/EnableCoreProcessorsTest.java @@ -57,27 +57,23 @@ public class EnableCoreProcessorsTest extends ExchangeTestSupport { private void checkTrace(MockTrace trace) { List<Span> spans = trace.spans(); - assertEquals(5, spans.size()); - // Cast to implementation object to be able to - // inspect the status of the Span. + // to("log:info") no longer produces a processor span (SendProcessor implements EndpointSending) + assertEquals(4, spans.size()); MockSpanAdapter testProducer = (MockSpanAdapter) spans.get(0); MockSpanAdapter direct = (MockSpanAdapter) spans.get(1); MockSpanAdapter innerLog = (MockSpanAdapter) spans.get(2); MockSpanAdapter log = (MockSpanAdapter) spans.get(3); - MockSpanAdapter innerToLog = (MockSpanAdapter) spans.get(4); // Validate span completion assertEquals("true", testProducer.getTag("isDone")); assertEquals("true", direct.getTag("isDone")); assertEquals("true", innerLog.getTag("isDone")); assertEquals("true", log.getTag("isDone")); - assertEquals("true", innerToLog.getTag("isDone")); // Validate same trace assertEquals(testProducer.getTag("traceid"), direct.getTag("traceid")); assertEquals(testProducer.getTag("traceid"), innerLog.getTag("traceid")); assertEquals(testProducer.getTag("traceid"), log.getTag("traceid")); - assertEquals(testProducer.getTag("traceid"), innerToLog.getTag("traceid")); // Validate op assertEquals(Op.EVENT_RECEIVED.toString(), direct.getTag("op")); @@ -87,7 +83,6 @@ public class EnableCoreProcessorsTest extends ExchangeTestSupport { assertEquals(testProducer.getTag("spanid"), direct.getTag("parentSpan")); assertEquals(direct.getTag("spanid"), innerLog.getTag("parentSpan")); assertEquals(direct.getTag("spanid"), log.getTag("parentSpan")); - assertEquals(log.getTag("spanid"), innerToLog.getTag("parentSpan")); } @Override diff --git a/components/camel-telemetry/src/test/java/org/apache/camel/telemetry/SpanBeanTest.java b/components/camel-telemetry/src/test/java/org/apache/camel/telemetry/SpanBeanTest.java index ed2cba75226d..0adafe69ce01 100644 --- a/components/camel-telemetry/src/test/java/org/apache/camel/telemetry/SpanBeanTest.java +++ b/components/camel-telemetry/src/test/java/org/apache/camel/telemetry/SpanBeanTest.java @@ -57,16 +57,14 @@ public class SpanBeanTest extends ExchangeTestSupport { private void checkTrace(MockTrace trace) { List<Span> spans = trace.spans(); - assertEquals(7, spans.size()); - // Cast to implementation object to be able to - // inspect the status of the Span. + // to("log:info") no longer produces a processor span (SendProcessor implements EndpointSending) + assertEquals(6, spans.size()); MockSpanAdapter testProducer = (MockSpanAdapter) spans.get(0); MockSpanAdapter direct = (MockSpanAdapter) spans.get(1); MockSpanAdapter innerLog = (MockSpanAdapter) spans.get(2); MockSpanAdapter bean = (MockSpanAdapter) spans.get(3); MockSpanAdapter beanMethod = (MockSpanAdapter) spans.get(4); MockSpanAdapter log = (MockSpanAdapter) spans.get(5); - MockSpanAdapter innerToLog = (MockSpanAdapter) spans.get(6); // Validate span completion assertEquals("true", testProducer.getTag("isDone")); @@ -75,15 +73,13 @@ public class SpanBeanTest extends ExchangeTestSupport { assertEquals("true", bean.getTag("isDone")); assertEquals("true", beanMethod.getTag("isDone")); assertEquals("true", log.getTag("isDone")); - assertEquals("true", innerToLog.getTag("isDone")); // Validate same trace assertEquals(testProducer.getTag("traceid"), direct.getTag("traceid")); assertEquals(testProducer.getTag("traceid"), innerLog.getTag("traceid")); - assertEquals(testProducer.getTag("traceid"), log.getTag("traceid")); assertEquals(testProducer.getTag("traceid"), bean.getTag("traceid")); assertEquals(testProducer.getTag("traceid"), beanMethod.getTag("traceid")); - assertEquals(testProducer.getTag("traceid"), innerToLog.getTag("traceid")); + assertEquals(testProducer.getTag("traceid"), log.getTag("traceid")); // Validate op assertEquals(Op.EVENT_RECEIVED.toString(), direct.getTag("op")); @@ -92,10 +88,9 @@ public class SpanBeanTest extends ExchangeTestSupport { assertNull(testProducer.getTag("parentSpan")); assertEquals(testProducer.getTag("spanid"), direct.getTag("parentSpan")); assertEquals(direct.getTag("spanid"), innerLog.getTag("parentSpan")); - assertEquals(direct.getTag("spanid"), log.getTag("parentSpan")); assertEquals(direct.getTag("spanid"), bean.getTag("parentSpan")); assertEquals(bean.getTag("spanid"), beanMethod.getTag("parentSpan")); - assertEquals(log.getTag("spanid"), innerToLog.getTag("parentSpan")); + assertEquals(direct.getTag("spanid"), log.getTag("parentSpan")); // Validate operations assertEquals(Op.EVENT_SENT.toString(), testProducer.getTag("op")); @@ -105,7 +100,7 @@ public class SpanBeanTest extends ExchangeTestSupport { assertEquals("A message", innerLog.logEntries().get(0).fields().get("message")); assertEquals( "Exchange[ExchangePattern: InOnly, BodyType: String, Body: my-body]", - innerToLog.logEntries().get(0).fields().get("message")); + log.logEntries().get(0).fields().get("message")); } @Override diff --git a/components/camel-telemetry/src/test/java/org/apache/camel/telemetry/SpanToBeanTest.java b/components/camel-telemetry/src/test/java/org/apache/camel/telemetry/SpanToBeanTest.java index 0e4433ea4654..8debbdfb4632 100644 --- a/components/camel-telemetry/src/test/java/org/apache/camel/telemetry/SpanToBeanTest.java +++ b/components/camel-telemetry/src/test/java/org/apache/camel/telemetry/SpanToBeanTest.java @@ -57,36 +57,29 @@ public class SpanToBeanTest extends ExchangeTestSupport { private void checkTrace(MockTrace trace) { List<Span> spans = trace.spans(); - assertEquals(8, spans.size()); - // Cast to implementation object to be able to - // inspect the status of the Span. + // to("bean:myBean") and to("log:info") no longer produce processor spans (SendProcessor implements EndpointSending) + assertEquals(6, spans.size()); MockSpanAdapter testProducer = (MockSpanAdapter) spans.get(0); MockSpanAdapter direct = (MockSpanAdapter) spans.get(1); MockSpanAdapter innerLog = (MockSpanAdapter) spans.get(2); - MockSpanAdapter toBean = (MockSpanAdapter) spans.get(3); - MockSpanAdapter bean = (MockSpanAdapter) spans.get(4); - MockSpanAdapter beanMethod = (MockSpanAdapter) spans.get(5); - MockSpanAdapter log = (MockSpanAdapter) spans.get(6); - MockSpanAdapter innerToLog = (MockSpanAdapter) spans.get(7); + MockSpanAdapter bean = (MockSpanAdapter) spans.get(3); + MockSpanAdapter beanMethod = (MockSpanAdapter) spans.get(4); + MockSpanAdapter log = (MockSpanAdapter) spans.get(5); // Validate span completion assertEquals("true", testProducer.getTag("isDone")); assertEquals("true", direct.getTag("isDone")); assertEquals("true", innerLog.getTag("isDone")); - assertEquals("true", toBean.getTag("isDone")); assertEquals("true", bean.getTag("isDone")); assertEquals("true", beanMethod.getTag("isDone")); assertEquals("true", log.getTag("isDone")); - assertEquals("true", innerToLog.getTag("isDone")); // Validate same trace assertEquals(testProducer.getTag("traceid"), direct.getTag("traceid")); assertEquals(testProducer.getTag("traceid"), innerLog.getTag("traceid")); - assertEquals(testProducer.getTag("traceid"), log.getTag("traceid")); - assertEquals(testProducer.getTag("traceid"), toBean.getTag("traceid")); assertEquals(testProducer.getTag("traceid"), bean.getTag("traceid")); assertEquals(testProducer.getTag("traceid"), beanMethod.getTag("traceid")); - assertEquals(testProducer.getTag("traceid"), innerToLog.getTag("traceid")); + assertEquals(testProducer.getTag("traceid"), log.getTag("traceid")); // Validate op assertEquals(Op.EVENT_RECEIVED.toString(), direct.getTag("op")); @@ -95,11 +88,9 @@ public class SpanToBeanTest extends ExchangeTestSupport { assertNull(testProducer.getTag("parentSpan")); assertEquals(testProducer.getTag("spanid"), direct.getTag("parentSpan")); assertEquals(direct.getTag("spanid"), innerLog.getTag("parentSpan")); - assertEquals(direct.getTag("spanid"), log.getTag("parentSpan")); - assertEquals(direct.getTag("spanid"), toBean.getTag("parentSpan")); - assertEquals(toBean.getTag("spanid"), bean.getTag("parentSpan")); + assertEquals(direct.getTag("spanid"), bean.getTag("parentSpan")); assertEquals(bean.getTag("spanid"), beanMethod.getTag("parentSpan")); - assertEquals(log.getTag("spanid"), innerToLog.getTag("parentSpan")); + assertEquals(direct.getTag("spanid"), log.getTag("parentSpan")); // Validate operations assertEquals(Op.EVENT_SENT.toString(), testProducer.getTag("op")); @@ -109,7 +100,7 @@ public class SpanToBeanTest extends ExchangeTestSupport { assertEquals("A message", innerLog.logEntries().get(0).fields().get("message")); assertEquals( "Exchange[ExchangePattern: InOnly, BodyType: String, Body: my-body]", - innerToLog.logEntries().get(0).fields().get("message")); + log.logEntries().get(0).fields().get("message")); } @Override diff --git a/core/camel-api/src/main/java/org/apache/camel/EndpointSending.java b/core/camel-api/src/main/java/org/apache/camel/EndpointSending.java new file mode 100644 index 000000000000..e56002ed660e --- /dev/null +++ b/core/camel-api/src/main/java/org/apache/camel/EndpointSending.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel; + +/** + * Marker interface for {@link Processor} implementations that send an {@link org.apache.camel.Exchange} to an + * {@link Endpoint} as part of their processing. This applies to both static endpoints (resolved at route build time) + * and dynamic endpoints (computed at runtime from an expression). + * + * @since 4.21 + */ +public interface EndpointSending { +} diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java index 54dd1e7ca154..fd8a0053061f 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java @@ -23,6 +23,7 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.CamelExchangeException; +import org.apache.camel.EndpointSending; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.ExchangePropertyKey; @@ -50,7 +51,8 @@ import static org.apache.camel.support.ExchangeHelper.copyResultsPreservePattern * * @see PollEnricher */ -public class Enricher extends BaseProcessorSupport implements IdAware, RouteIdAware, StepIdAware, CamelContextAware { +public class Enricher extends BaseProcessorSupport + implements EndpointSending, IdAware, RouteIdAware, StepIdAware, CamelContextAware { private CamelContext camelContext; private String id; diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java index 86ff6459d115..543c8f04c730 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java @@ -23,6 +23,7 @@ import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.Component; import org.apache.camel.Endpoint; +import org.apache.camel.EndpointSending; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.ExchangePropertyKey; @@ -54,7 +55,7 @@ import org.slf4j.LoggerFactory; * @see org.apache.camel.processor.SendProcessor */ public class SendDynamicProcessor extends BaseProcessorSupport - implements IdAware, RouteIdAware, StepIdAware, CamelContextAware { + implements EndpointSending, IdAware, RouteIdAware, StepIdAware, CamelContextAware { private static final Logger LOG = LoggerFactory.getLogger(SendDynamicProcessor.class); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java index 5745d5ec39ff..48f135653234 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java @@ -24,6 +24,7 @@ import org.apache.camel.AsyncProducer; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.EndpointAware; +import org.apache.camel.EndpointSending; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.ExchangePropertyKey; @@ -52,7 +53,7 @@ import org.slf4j.LoggerFactory; * @see SendDynamicProcessor */ public class SendProcessor extends BaseProcessorSupport - implements Traceable, EndpointAware, IdAware, RouteIdAware, StepIdAware { + implements Traceable, EndpointAware, EndpointSending, IdAware, RouteIdAware, StepIdAware { private static final Logger LOG = LoggerFactory.getLogger(SendProcessor.class); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java index bda30103fb71..67a6132fa8a9 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java @@ -24,6 +24,7 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; +import org.apache.camel.EndpointSending; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.ExchangePropertyKey; @@ -49,7 +50,7 @@ import org.slf4j.LoggerFactory; * Processor for wire tapping exchanges to an endpoint destination. */ public class WireTapProcessor extends BaseProcessorSupport - implements Traceable, ShutdownAware, IdAware, RouteIdAware, StepIdAware, CamelContextAware { + implements Traceable, ShutdownAware, EndpointSending, IdAware, RouteIdAware, StepIdAware, CamelContextAware { private static final Logger LOG = LoggerFactory.getLogger(WireTapProcessor.class);
