This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.x by this push: new bb9e1d7c98d CAMEL-19023: Micrometer Observation component (#9389) bb9e1d7c98d is described below commit bb9e1d7c98d94723c47c4e6cd4592950c5ef0ac2 Author: Marcin Grzejszczak <mar...@grzejszczak.pl> AuthorDate: Wed Mar 8 12:02:51 2023 +0100 CAMEL-19023: Micrometer Observation component (#9389) * WIP * Changed sysout to logging * WIP * Low cardinality tags + default handlers Added low cardinality tags, changed the implementation to be as close as possible to the OTel one. Left the handlers as examples of how to make the behaviour equal to the OTel one. * Added missing carriers, fixed the error names * Fixed checkstyle --- bom/camel-bom/pom.xml | 5 + camel-dependencies/pom.xml | 95 +++--- catalog/camel-allcomponents/pom.xml | 5 + .../org/apache/camel/catalog/others.properties | 1 + .../apache/camel/catalog/others/observation.json | 15 + components/camel-observation/pom.xml | 88 +++++ .../services/org/apache/camel/other.properties | 7 + .../src/generated/resources/observation.json | 15 + .../src/main/docs/observation.adoc | 88 +++++ .../camel/observation/AttributeProcessor.java | 116 +++++++ .../GetCorrelationContextProcessor.java | 109 +++++++ .../MicrometerObservationSpanAdapter.java | 169 ++++++++++ .../observation/MicrometerObservationTracer.java | 168 ++++++++++ .../SetCorrelationContextProcessor.java | 115 +++++++ .../org.apache.camel.tracing.SpanDecorator | 54 ++++ .../org/apache/camel/observation/ABCRouteTest.java | 69 ++++ .../CamelMicrometerObservationTestSupport.java | 263 +++++++++++++++ .../observation/ClientRecipientListRouteTest.java | 67 ++++ .../apache/camel/observation/CurrentSpanTest.java | 356 +++++++++++++++++++++ .../observation/CustomComponentNameRouteTest.java | 73 +++++ .../observation/MulticastParallelRouteTest.java | 70 ++++ .../camel/observation/MulticastRouteTest.java | 72 +++++ .../camel/observation/RouteConcurrentTest.java | 82 +++++ .../camel/observation/SpanProcessorsTest.java | 85 +++++ .../org/apache/camel/observation/SpanTestData.java | 107 +++++++ .../camel/observation/TestSEDASpanDecorator.java} | 28 +- .../apache/camel/observation/TwoServiceTest.java | 59 ++++ .../observation/TwoServiceWithExcludeTest.java | 65 ++++ .../CamelDefaultTracingObservationHandler.java | 54 ++++ ...opagatingReceiverTracingObservationHandler.java | 55 ++++ ...PropagatingSenderTracingObservationHandler.java | 45 +++ .../src/test/resources/log4j2.properties | 29 ++ .../camel/opentelemetry/OpenTelemetryTracer.java | 2 +- .../camel/opentracing/OpenTracingTracer.java | 2 +- .../java/org/apache/camel/tracing/SpanAdapter.java | 20 ++ .../main/java/org/apache/camel/tracing/Tracer.java | 7 +- .../decorators/AbstractHttpSpanDecorator.java | 4 +- .../camel/tracing/decorators/CqlSpanDecorator.java | 2 +- .../decorators/ElasticsearchSpanDecorator.java | 2 +- .../tracing/decorators/JdbcSpanDecorator.java | 2 +- .../tracing/decorators/MongoDBSpanDecorator.java | 2 +- .../camel/tracing/decorators/SqlSpanDecorator.java | 2 +- components/pom.xml | 1 + .../modules/others/examples/json/observation.json | 1 + docs/components/modules/others/nav.adoc | 1 + .../modules/others/pages/observation.adoc | 1 + parent/pom.xml | 8 +- 47 files changed, 2607 insertions(+), 79 deletions(-) diff --git a/bom/camel-bom/pom.xml b/bom/camel-bom/pom.xml index a16706262b2..01f909ad6ae 100644 --- a/bom/camel-bom/pom.xml +++ b/bom/camel-bom/pom.xml @@ -1496,6 +1496,11 @@ <artifactId>camel-oaipmh</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-observation</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-ognl</artifactId> diff --git a/camel-dependencies/pom.xml b/camel-dependencies/pom.xml index feb45c5cfae..136c1e39c65 100644 --- a/camel-dependencies/pom.xml +++ b/camel-dependencies/pom.xml @@ -20,21 +20,21 @@ <project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache</groupId> - <artifactId>apache</artifactId> - <version>25</version> - <relativePath /> - </parent> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache</groupId> + <artifactId>apache</artifactId> + <version>25</version> + <relativePath /> + </parent> - <groupId>org.apache.camel</groupId> - <artifactId>camel-dependencies</artifactId> - <version>3.21.0-SNAPSHOT</version> + <groupId>org.apache.camel</groupId> + <artifactId>camel-dependencies</artifactId> + <version>3.21.0-SNAPSHOT</version> - <packaging>pom</packaging> - <name>Camel :: Dependencies</name> - <description>Camel Dependencies POM</description> + <packaging>pom</packaging> + <name>Camel :: Dependencies</name> + <description>Camel Dependencies POM</description> <!-- sync properties here from parent/pom.xml --> <properties> @@ -397,7 +397,8 @@ <maven-wagon-version>3.5.2</maven-wagon-version> <maven-war-plugin-version>3.3.1</maven-war-plugin-version> <metrics-version>4.2.15</metrics-version> - <micrometer-version>1.10.2</micrometer-version> + <micrometer-tracing-version>1.0.2</micrometer-tracing-version> + <micrometer-version>1.10.4</micrometer-version> <microprofile-config-version>2.0.1</microprofile-config-version> <microprofile-metrics-version>3.0.1</microprofile-metrics-version> <microprofile-fault-tolerance-version>3.0</microprofile-fault-tolerance-version> @@ -569,38 +570,38 @@ <zookeeper-version>3.5.9</zookeeper-version> <zxing-version>3.5.0</zxing-version> - <!-- OSGi bundles properties --> - <camel.osgi.import.camel.version> - version="[$(version;==;${camel.osgi.version.clean}),$(version;=+;${camel.osgi.version.clean}))" - </camel.osgi.import.camel.version> - <camel.osgi.import.strict.version> - version="[$(version;===;${camel.osgi.version.clean}),$(version;==+;${camel.osgi.version.clean}))" - </camel.osgi.import.strict.version> - <camel.osgi.import.default.version>[$(version;==;$(@)),$(version;+;$(@)))</camel.osgi.import.default.version> - <camel.osgi.import.defaults> - </camel.osgi.import.defaults> - <camel.osgi.import.before.defaults /> - <camel.osgi.import.additional /> - <camel.osgi.import.pkg> - org.apache.camel.*;${camel.osgi.import.camel.version}, - ${camel.osgi.import.before.defaults}, - ${camel.osgi.import.defaults}, - ${camel.osgi.import.additional}, - * - </camel.osgi.import.pkg> - <camel.osgi.activator /> - <camel.osgi.failok>false</camel.osgi.failok> - <camel.osgi.private.pkg>!*</camel.osgi.private.pkg> - <camel.osgi.export.pkg>$${replace;{local-packages};;;;}</camel.osgi.export.pkg> - <camel.osgi.export>${camel.osgi.export.pkg};-noimport:=true;${camel.osgi.version}</camel.osgi.export> - <camel.osgi.version>version=${project.version}</camel.osgi.version> - <camel.osgi.import>${camel.osgi.import.pkg}</camel.osgi.import> - <camel.osgi.dynamic /> - <camel.osgi.symbolic.name>${project.groupId}.${project.artifactId}</camel.osgi.symbolic.name> - <camel.osgi.exclude.dependencies>false</camel.osgi.exclude.dependencies> - <camel.osgi.require.capability /> - <camel.osgi.provide.capability /> - <camel.osgi.manifest>${project.build.outputDirectory}/META-INF/MANIFEST.MF</camel.osgi.manifest> - </properties> + <!-- OSGi bundles properties --> + <camel.osgi.import.camel.version> + version="[$(version;==;${camel.osgi.version.clean}),$(version;=+;${camel.osgi.version.clean}))" + </camel.osgi.import.camel.version> + <camel.osgi.import.strict.version> + version="[$(version;===;${camel.osgi.version.clean}),$(version;==+;${camel.osgi.version.clean}))" + </camel.osgi.import.strict.version> + <camel.osgi.import.default.version>[$(version;==;$(@)),$(version;+;$(@)))</camel.osgi.import.default.version> + <camel.osgi.import.defaults> + </camel.osgi.import.defaults> + <camel.osgi.import.before.defaults /> + <camel.osgi.import.additional /> + <camel.osgi.import.pkg> + org.apache.camel.*;${camel.osgi.import.camel.version}, + ${camel.osgi.import.before.defaults}, + ${camel.osgi.import.defaults}, + ${camel.osgi.import.additional}, + * + </camel.osgi.import.pkg> + <camel.osgi.activator /> + <camel.osgi.failok>false</camel.osgi.failok> + <camel.osgi.private.pkg>!*</camel.osgi.private.pkg> + <camel.osgi.export.pkg>$${replace;{local-packages};;;;}</camel.osgi.export.pkg> + <camel.osgi.export>${camel.osgi.export.pkg};-noimport:=true;${camel.osgi.version}</camel.osgi.export> + <camel.osgi.version>version=${project.version}</camel.osgi.version> + <camel.osgi.import>${camel.osgi.import.pkg}</camel.osgi.import> + <camel.osgi.dynamic /> + <camel.osgi.symbolic.name>${project.groupId}.${project.artifactId}</camel.osgi.symbolic.name> + <camel.osgi.exclude.dependencies>false</camel.osgi.exclude.dependencies> + <camel.osgi.require.capability /> + <camel.osgi.provide.capability /> + <camel.osgi.manifest>${project.build.outputDirectory}/META-INF/MANIFEST.MF</camel.osgi.manifest> + </properties> </project> diff --git a/catalog/camel-allcomponents/pom.xml b/catalog/camel-allcomponents/pom.xml index c29e11d7811..533e18f4fa9 100644 --- a/catalog/camel-allcomponents/pom.xml +++ b/catalog/camel-allcomponents/pom.xml @@ -1288,6 +1288,11 @@ <artifactId>camel-oaipmh</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-observation</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-ognl</artifactId> diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties index a9c456619c4..1c3494f2454 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties @@ -35,6 +35,7 @@ management microprofile-config microprofile-fault-tolerance microprofile-health +observation openapi-java opentelemetry opentracing diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/observation.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/observation.json new file mode 100644 index 00000000000..eb396f8efbc --- /dev/null +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/observation.json @@ -0,0 +1,15 @@ +{ + "other": { + "kind": "other", + "name": "observation", + "title": "Micrometer Observability", + "description": "Observability using Micrometer Observation", + "deprecated": false, + "firstVersion": "3.21.0", + "label": "monitoring,microservice", + "supportLevel": "Preview", + "groupId": "org.apache.camel", + "artifactId": "camel-observation", + "version": "3.20.3-SNAPSHOT" + } +} diff --git a/components/camel-observation/pom.xml b/components/camel-observation/pom.xml new file mode 100644 index 00000000000..44af91680ea --- /dev/null +++ b/components/camel-observation/pom.xml @@ -0,0 +1,88 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.camel</groupId> + <artifactId>components</artifactId> + <version>3.21.0-SNAPSHOT</version> + </parent> + + + <artifactId>camel-observation</artifactId> + <packaging>jar</packaging> + <name>Camel :: Micrometer Observation</name> + <description>Observability using Micrometer Observation</description> + + <properties> + <firstVersion>3.21.0</firstVersion> + <label>monitoring,microservice</label> + <title>Micrometer Observability</title> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-support</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-tracing</artifactId> + </dependency> + + <dependency> + <groupId>io.micrometer</groupId> + <artifactId>micrometer-core</artifactId> + <version>${micrometer-version}</version> + </dependency> + <dependency> + <groupId>io.micrometer</groupId> + <artifactId>micrometer-tracing</artifactId> + <version>${micrometer-tracing-version}</version> + </dependency> + + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-spring-junit5</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.micrometer</groupId> + <artifactId>micrometer-tracing-integration-test</artifactId> + <version>${micrometer-tracing-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-sdk-testing</artifactId> + <version>${opentelemetry-version}</version> + <scope>test</scope> + </dependency> + + </dependencies> + +</project> diff --git a/components/camel-observation/src/generated/resources/META-INF/services/org/apache/camel/other.properties b/components/camel-observation/src/generated/resources/META-INF/services/org/apache/camel/other.properties new file mode 100644 index 00000000000..2ef30a8f5b1 --- /dev/null +++ b/components/camel-observation/src/generated/resources/META-INF/services/org/apache/camel/other.properties @@ -0,0 +1,7 @@ +# Generated by camel build tools - do NOT edit this file! +name=observation +groupId=org.apache.camel +artifactId=camel-observation +version=3.21.0-SNAPSHOT +projectName=Camel :: Micrometer Observation +projectDescription=Observability using Micrometer Observation diff --git a/components/camel-observation/src/generated/resources/observation.json b/components/camel-observation/src/generated/resources/observation.json new file mode 100644 index 00000000000..e166365bf8a --- /dev/null +++ b/components/camel-observation/src/generated/resources/observation.json @@ -0,0 +1,15 @@ +{ + "other": { + "kind": "other", + "name": "observation", + "title": "Micrometer Observability", + "description": "Observability using Micrometer Observation", + "deprecated": false, + "firstVersion": "3.21.0", + "label": "monitoring,microservice", + "supportLevel": "Preview", + "groupId": "org.apache.camel", + "artifactId": "camel-observation", + "version": "3.21.0-SNAPSHOT" + } +} diff --git a/components/camel-observation/src/main/docs/observation.adoc b/components/camel-observation/src/main/docs/observation.adoc new file mode 100644 index 00000000000..649e6232c85 --- /dev/null +++ b/components/camel-observation/src/main/docs/observation.adoc @@ -0,0 +1,88 @@ += Micrometer Observability Component +:doctitle: Micrometer Observability +:shortname: observation +:artifactid: camel-observation +:description: Observability using Micrometer Observation +:since: 3.21 +:supportlevel: Preview +//Manually maintained attributes +:camel-spring-boot-name: observation + +*Since Camel {since}* + +The Micrometer Observation component is used for performing observability of incoming and +outgoing Camel messages using https://micrometer.io/docs/observation[Micrometer Observation]. + +By configuring the `ObservationRegistry` you can add behaviour to your observations such as metrics (e.g. via `Micrometer`) or tracing (e.g. via `OpenTelemetry` or `Brave`) or any custom behaviour. + +Events are captured for incoming and outgoing messages being sent to/from Camel. + +== Configuration + +The configuration properties for the Micrometer Observations are: + +[width="100%",cols="10%,10%,80%",options="header",] +|======================================================================= +|Option |Default |Description + +|excludePatterns | | Sets exclude pattern(s) that will disable tracing for Camel +messages that matches the pattern. The content is a Set<String> where the key is a pattern. The pattern +uses the rules from Intercept. +|encoding |false| Sets whether the header keys need to be encoded (connector specific) or not. The value is a boolean. +Dashes need for instances to be encoded for JMS property keys. + +|======================================================================= + + +=== Configuration + +Include the `camel-opentelemetry` component in your POM, along with any specific dependencies associated with the +chosen OpenTelemetry compliant Tracer. + +To explicitly configure OpenTelemetry support, instantiate the `OpenTelemetryTracer` and initialize the camel +context. You can optionally specify a `Tracer`, or alternatively it can be implicitly discovered using the +`Registry` + +[source,java] +-------------------------------------------------------------------------------------------------- +ObservationRegistry observationRegistry = ObservationRegistry.create(); +MicrometerObservationTracer micrometerObservationTracer = new MicrometerObservationTracer(); + +// This component comes from Micrometer Core - it's used for creation of metrics +MeterRegistry meterRegistry = new SimpleMeterRegistry(); + +// This component comes from Micrometer Tracing - it's an abstraction over tracers +io.micrometer.tracing.Tracer otelTracer = otelTracer(); +// This component comes from Micrometer Tracing - example of B3 header propagation via OpenTelemetry +OtelPropagator otelPropagator = new OtelPropagator(ContextPropagators.create(B3Propagator.injectingSingleHeader()), tracer); + +// Configuration ObservationRegistry for metrics +observationRegistry.observationConfig().observationHandler(new DefaultMeterObservationHandler(meterRegistry)); + +// Configuration ObservationRegistry for tracing +observationRegistry.observationConfig().observationHandler(new ObservationHandler.FirstMatchingCompositeObservationHandler(new CamelPropagatingSenderTracingObservationHandler<>(otelTracer, otelPropagator), new CamelPropagatingReceiverTracingObservationHandler<>(otelTracer, otelPropagator), new CamelDefaultTracingObservationHandler(otelTracer))); + +// Both components ObserationRegistry and MeterRegistry should be set manually or they will be resolved from CamelContext if present +micrometerObservationTracer.setObservationRegistry(observationRegistry); +micrometerObservationTracer.setTracer(otelTracer); + +// Initialize the MicrometerObservationTracer +micrometerObservationTracer.init(context); +-------------------------------------------------------------------------------------------------- + +== Spring Boot + +// TODO: Not done yet + +If you are using Spring Boot then you can add +the `camel-observation-starter` dependency, and turn on OpenTracing by annotating +the main class with `@CamelObservation`. + +The `MicrometerObservationTracer` will be implicitly obtained from the camel context's `Registry`, unless +a `MicrometerObservationTracer` bean has been defined by the application. + +include::spring-boot:partial$starter.adoc[] + +== MDC Logging + +When MDC Logging is enabled for the active Camel context the Trace ID and Span ID will be added and removed from the MDC for each route, the keys are `trace_id` and `span_id`, respectively. diff --git a/components/camel-observation/src/main/java/org/apache/camel/observation/AttributeProcessor.java b/components/camel-observation/src/main/java/org/apache/camel/observation/AttributeProcessor.java new file mode 100644 index 00000000000..c551908aa4a --- /dev/null +++ b/components/camel-observation/src/main/java/org/apache/camel/observation/AttributeProcessor.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.observation; + +import io.micrometer.observation.Observation; +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.Expression; +import org.apache.camel.Traceable; +import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; +import org.apache.camel.support.AsyncProcessorSupport; +import org.apache.camel.tracing.ActiveSpanManager; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A processor which adds a attribute on the active {@link Observation} with an {@link org.apache.camel.Expression} + */ +public class AttributeProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware { + + private static final Logger LOG = LoggerFactory.getLogger(AttributeProcessor.class); + private final String attributeName; + private final Expression expression; + private String id; + private String routeId; + + public AttributeProcessor(String tagName, Expression expression) { + this.attributeName = ObjectHelper.notNull(tagName, "tagName"); + this.expression = ObjectHelper.notNull(expression, "expression"); + } + + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + try { + MicrometerObservationSpanAdapter camelSpan = (MicrometerObservationSpanAdapter) ActiveSpanManager.getSpan(exchange); + Observation observation = camelSpan.getMicrometerObservation(); + if (observation != null) { + String tag = expression.evaluate(exchange, String.class); + observation.highCardinalityKeyValue(attributeName, tag); + } else { + LOG.warn("Micrometer Observation: could not find managed span for exchange={}", exchange); + } + } catch (Exception e) { + exchange.setException(e); + } finally { + // callback must be invoked + callback.done(true); + } + + return true; + } + + @Override + public String getTraceLabel() { + return "attribute[" + attributeName + ", " + expression + "]"; + } + + @Override + public String getId() { + return id; + } + + @Override + public void setId(String id) { + this.id = id; + } + + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + public String getAttributeName() { + return attributeName; + } + + public Expression getExpression() { + return expression; + } + + @Override + protected void doStart() throws Exception { + // noop + } + + @Override + protected void doStop() throws Exception { + // noop + } + + @Override + public String toString() { + return id; + } +} diff --git a/components/camel-observation/src/main/java/org/apache/camel/observation/GetCorrelationContextProcessor.java b/components/camel-observation/src/main/java/org/apache/camel/observation/GetCorrelationContextProcessor.java new file mode 100644 index 00000000000..44176ef1d08 --- /dev/null +++ b/components/camel-observation/src/main/java/org/apache/camel/observation/GetCorrelationContextProcessor.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.observation; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.Traceable; +import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; +import org.apache.camel.support.AsyncProcessorSupport; +import org.apache.camel.tracing.ActiveSpanManager; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GetCorrelationContextProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware { + private static final Logger LOG = LoggerFactory.getLogger(GetCorrelationContextProcessor.class); + private final String headerName; + private final String keyName; + private String id; + private String routeId; + + public GetCorrelationContextProcessor(String keyName, String headerName) { + this.keyName = ObjectHelper.notNull(keyName, "keyName"); + this.headerName = ObjectHelper.notNull(headerName, "headerName"); + } + + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + try { + MicrometerObservationSpanAdapter camelSpan = (MicrometerObservationSpanAdapter) ActiveSpanManager.getSpan(exchange); + if (camelSpan != null) { + String item = camelSpan.getContextPropagationItem(keyName); + exchange.getMessage().setHeader(headerName, item); + } else { + LOG.warn("Observation: could not find managed span for exchange={}", exchange); + } + } catch (Exception e) { + exchange.setException(e); + } finally { + // callback must be invoked + callback.done(true); + } + + return true; + } + + @Override + public String getTraceLabel() { + return "getCorrelationContext[" + keyName + ", " + headerName + "]"; + } + + @Override + public String getId() { + return id; + } + + @Override + public void setId(String id) { + this.id = id; + } + + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + public String getKeyName() { + return keyName; + } + + public String getHeaderName() { + return headerName; + } + + @Override + protected void doStart() throws Exception { + // noop + } + + @Override + protected void doStop() throws Exception { + // noop + } + + @Override + public String toString() { + return id; + } +} diff --git a/components/camel-observation/src/main/java/org/apache/camel/observation/MicrometerObservationSpanAdapter.java b/components/camel-observation/src/main/java/org/apache/camel/observation/MicrometerObservationSpanAdapter.java new file mode 100644 index 00000000000..1d85ec690e0 --- /dev/null +++ b/components/camel-observation/src/main/java/org/apache/camel/observation/MicrometerObservationSpanAdapter.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.observation; + +import java.util.Map; + +import io.micrometer.observation.Observation; +import io.micrometer.tracing.Baggage; +import io.micrometer.tracing.Span; +import io.micrometer.tracing.Tracer; +import io.micrometer.tracing.handler.TracingObservationHandler.TracingContext; +import org.apache.camel.tracing.SpanAdapter; +import org.apache.camel.tracing.Tag; + +public class MicrometerObservationSpanAdapter implements SpanAdapter { + private static final String DEFAULT_EVENT_NAME = "log"; + + private final Observation observation; + + private final Tracer tracer; + + MicrometerObservationSpanAdapter(Observation observation, Tracer tracer) { + this.observation = observation; + this.tracer = tracer; + } + + Observation getMicrometerObservation() { + return this.observation; + } + + @Override + public void setComponent(String component) { + this.observation.lowCardinalityKeyValue("component", component); + } + + @Override + public void setError(boolean error) { + this.observation.lowCardinalityKeyValue("error", String.valueOf(error)); + } + + @Override + public void setTag(Tag key, String value) { + this.observation.highCardinalityKeyValue(key.toString(), value); + } + + @Override + public void setTag(Tag key, Number value) { + setTag(key, value.toString()); + } + + @Override + public void setTag(String key, String value) { + this.observation.highCardinalityKeyValue(key, value); + } + + @Override + public void setTag(String key, Number value) { + setTag(key, value.toString()); + } + + @Override + public void setTag(String key, Boolean value) { + setTag(key, value.toString()); + } + + @Override + public void setLowCardinalityTag(Tag key, String value) { + observation.lowCardinalityKeyValue(key.toString(), value); + } + + @Override + public void setLowCardinalityTag(Tag key, Number value) { + observation.lowCardinalityKeyValue(key.toString(), value.toString()); + } + + @Override + public void setLowCardinalityTag(String key, String value) { + observation.lowCardinalityKeyValue(key, value); + } + + @Override + public void setLowCardinalityTag(String key, Number value) { + observation.lowCardinalityKeyValue(key, value.toString()); + } + + @Override + public void setLowCardinalityTag(String key, Boolean value) { + observation.lowCardinalityKeyValue(key, value.toString()); + } + + @Override + public void log(Map<String, String> fields) { + String event = fields.get("event"); + if ("error".equalsIgnoreCase(event)) { + if (fields.containsKey("message")) { + observation.error(new RuntimeException(fields.get("message"))); + } else { + setError(true); + } + } else { + observation.event(() -> getMessageNameFromFields(fields)); + } + } + + @Override + public String traceId() { + TracingContext tracingContext = getTracingContext(); + return tracingContext.getSpan() != null ? tracingContext.getSpan().context().traceId() : null; + } + + private TracingContext getTracingContext() { + return observation.getContextView().getOrDefault(TracingContext.class, new TracingContext()); + } + + @Override + public String spanId() { + TracingContext tracingContext = getTracingContext(); + return tracingContext.getSpan() != null ? tracingContext.getSpan().context().spanId() : null; + } + + @Override + public AutoCloseable makeCurrent() { + return observation.openScope(); + } + + String getMessageNameFromFields(Map<String, ?> fields) { + Object eventValue = fields == null ? null : fields.get("message"); + if (eventValue != null) { + return eventValue.toString(); + } + + return DEFAULT_EVENT_NAME; + } + public void setCorrelationContextItem(String key, String value) { + Baggage baggage = tracer.createBaggage(key); + Span span = getTracingContext().getSpan(); + if (span == null) { + return; + } + baggage.set(span.context(), value); + } + + public String getContextPropagationItem(String key) { + Span span = getTracingContext().getSpan(); + if (span == null) { + return null; + } + Baggage baggage = tracer.getBaggage(span.context(), key); + if (baggage != null) { + return baggage.get(span.context()); + } + return null; + } + +} diff --git a/components/camel-observation/src/main/java/org/apache/camel/observation/MicrometerObservationTracer.java b/components/camel-observation/src/main/java/org/apache/camel/observation/MicrometerObservationTracer.java new file mode 100644 index 00000000000..7085760ba6d --- /dev/null +++ b/components/camel-observation/src/main/java/org/apache/camel/observation/MicrometerObservationTracer.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.observation; + +import java.util.Set; + +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationRegistry; +import io.micrometer.observation.transport.ReceiverContext; +import io.micrometer.observation.transport.RequestReplyReceiverContext; +import io.micrometer.observation.transport.RequestReplySenderContext; +import io.micrometer.observation.transport.SenderContext; +import io.micrometer.tracing.Tracer; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.tracing.ExtractAdapter; +import org.apache.camel.tracing.InjectAdapter; +import org.apache.camel.tracing.SpanAdapter; +import org.apache.camel.tracing.SpanDecorator; +import org.apache.camel.tracing.SpanKind; +import org.apache.camel.tracing.decorators.AbstractInternalSpanDecorator; + +@ManagedResource(description = "MicrometerObservationTracer") +public class MicrometerObservationTracer extends org.apache.camel.tracing.Tracer { + + static final String SPAN_DECORATOR_INTERNAL = "camel.micrometer.abstract-internal"; + + private static final String CAMEL_CONTEXT_NAME = "camel.component"; + + private Tracer tracer = Tracer.NOOP; + + private ObservationRegistry observationRegistry; + + public ObservationRegistry getObservationRegistry() { + return observationRegistry; + } + + public void setObservationRegistry(ObservationRegistry observationRegistry) { + this.observationRegistry = observationRegistry; + } + + public Tracer getTracer() { + return tracer; + } + + public void setTracer(Tracer tracer) { + this.tracer = tracer; + } + + private Observation.Context spanKindToContextOnExtract(org.apache.camel.tracing.SpanKind kind, SpanDecorator sd, Exchange exchange) { + ExtractAdapter adapter = sd.getExtractAdapter(exchange.getIn().getHeaders(), encoding); + switch (kind) { + case PRODUCER: + throw new UnsupportedOperationException("You can't extract when sending a message"); + case SPAN_KIND_SERVER: + RequestReplyReceiverContext<Object, Message> replyReceiverContext = new RequestReplyReceiverContext<>((carrier, key) -> String.valueOf(adapter.get(key))); + replyReceiverContext.setResponse(exchange.getMessage()); + replyReceiverContext.setCarrier(exchange.getIn()); + return replyReceiverContext; + case CONSUMER: + case SPAN_KIND_CLIENT: + ReceiverContext<Message> receiverContext = new ReceiverContext<>((carrier, key) -> String.valueOf(adapter.get(key))); + receiverContext.setCarrier(exchange.getIn()); + return receiverContext; + default: + return new Observation.Context(); + } + } + + private Observation.Context spanKindToContextOnInject(org.apache.camel.tracing.SpanKind kind, InjectAdapter adapter, Exchange exchange) { + switch (kind) { + case SPAN_KIND_CLIENT: + RequestReplySenderContext<Object, Message> senderContext = new RequestReplySenderContext<>((carrier, key, value) -> adapter.put(key, value)); + senderContext.setResponse(exchange.getMessage()); + senderContext.setCarrier(exchange.getIn()); + return senderContext; + case PRODUCER: + SenderContext<Message> context = new SenderContext<>((carrier, key, value) -> adapter.put(key, value)); + context.setCarrier(exchange.getIn()); + return context; + case SPAN_KIND_SERVER: + case CONSUMER: + throw new UnsupportedOperationException("You can't inject when receiving a message"); + default: + return new Observation.Context(); + } + } + + @Override + protected void initTracer() { + if (observationRegistry == null) { + Set<ObservationRegistry> registries = getCamelContext().getRegistry().findByType(ObservationRegistry.class); + if (registries.size() == 1) { + observationRegistry = registries.iterator().next(); + } + } + + if (tracer == null) { + Set<Tracer> tracers = getCamelContext().getRegistry().findByType(Tracer.class); + if (tracers.size() == 1) { + tracer = tracers.iterator().next(); + } + } + + if (observationRegistry == null) { + // No Observation Registry is available, so setup Noop + observationRegistry = ObservationRegistry.NOOP; + } + } + + @Override + protected SpanAdapter startSendingEventSpan( + String operationName, SpanKind kind, SpanAdapter parentObservation, Exchange exchange, InjectAdapter injectAdapter) { + Observation.Context context = spanKindToContextOnInject(kind, injectAdapter, exchange); + Observation observation = Observation.createNotStarted(CAMEL_CONTEXT_NAME, () -> context, observationRegistry); + observation.contextualName(operationName); + if (parentObservation != null) { + observation.parentObservation(getParentObservation(parentObservation)); + } + return new MicrometerObservationSpanAdapter(observation.start(), tracer); + } + + private static Observation getParentObservation(SpanAdapter parentObservation) { + MicrometerObservationSpanAdapter observationWrapper = (MicrometerObservationSpanAdapter) parentObservation; + return observationWrapper.getMicrometerObservation(); + } + + @Override + protected SpanAdapter startExchangeBeginSpan( + Exchange exchange, SpanDecorator sd, String operationName, org.apache.camel.tracing.SpanKind kind, + SpanAdapter parent) { + boolean parentPresent = parent != null; + Observation.Context context = parentPresent ? new Observation.Context() : spanKindToContextOnExtract(kind, sd, exchange); + context.put(SPAN_DECORATOR_INTERNAL, sd instanceof AbstractInternalSpanDecorator); + Observation observation = Observation.createNotStarted(operationName, () -> context, observationRegistry); + if (parentPresent) { + observation.parentObservation(getParentObservation(parent)); + } + return new MicrometerObservationSpanAdapter(observation.start(), tracer); + } + + @Override + protected void finishSpan(SpanAdapter span) { + MicrometerObservationSpanAdapter observationSpanAdapter = (MicrometerObservationSpanAdapter) span; + observationSpanAdapter.getMicrometerObservation().stop(); + } + + @Override + protected void inject(SpanAdapter span, InjectAdapter adapter) { + // Inject happens on start of an observation + } + +} diff --git a/components/camel-observation/src/main/java/org/apache/camel/observation/SetCorrelationContextProcessor.java b/components/camel-observation/src/main/java/org/apache/camel/observation/SetCorrelationContextProcessor.java new file mode 100644 index 00000000000..80d329ffafa --- /dev/null +++ b/components/camel-observation/src/main/java/org/apache/camel/observation/SetCorrelationContextProcessor.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.observation; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.Expression; +import org.apache.camel.Traceable; +import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; +import org.apache.camel.support.AsyncProcessorSupport; +import org.apache.camel.tracing.ActiveSpanManager; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author rvargasp + */ +public class SetCorrelationContextProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware { + + private static final Logger LOG = LoggerFactory.getLogger(SetCorrelationContextProcessor.class); + + private String id; + private String routeId; + private final String baggageName; + private final Expression expression; + + public SetCorrelationContextProcessor(String baggageName, Expression expression) { + this.baggageName = ObjectHelper.notNull(baggageName, "baggageName"); + this.expression = ObjectHelper.notNull(expression, "expression"); + } + + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + try { + MicrometerObservationSpanAdapter camelSpan = (MicrometerObservationSpanAdapter) ActiveSpanManager.getSpan(exchange); + if (camelSpan != null) { + String item = expression.evaluate(exchange, String.class); + camelSpan.setCorrelationContextItem(baggageName, item); + } else { + LOG.warn("OpenTelemetry: could not find managed span for exchange={}", exchange); + } + } catch (Exception e) { + exchange.setException(e); + } finally { + // callback must be invoked + callback.done(true); + } + + return true; + } + + @Override + public String getTraceLabel() { + return "setCorrelationContext[" + baggageName + ", " + expression + "]"; + } + + @Override + public String getId() { + return id; + } + + @Override + public void setId(String id) { + this.id = id; + } + + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + public String getBaggageName() { + return baggageName; + } + + public Expression getExpression() { + return expression; + } + + @Override + protected void doStart() throws Exception { + // noop + } + + @Override + protected void doStop() throws Exception { + // noop + } + + @Override + public String toString() { + return id; + } +} diff --git a/components/camel-observation/src/main/resources/META-INF/services/org.apache.camel.tracing.SpanDecorator b/components/camel-observation/src/main/resources/META-INF/services/org.apache.camel.tracing.SpanDecorator new file mode 100644 index 00000000000..cedccb3f682 --- /dev/null +++ b/components/camel-observation/src/main/resources/META-INF/services/org.apache.camel.tracing.SpanDecorator @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.camel.tracing.decorators.AhcSpanDecorator +org.apache.camel.tracing.decorators.AmqpSpanDecorator +org.apache.camel.tracing.decorators.AzureServiceBusSpanDecorator +org.apache.camel.tracing.decorators.CometdSpanDecorator +org.apache.camel.tracing.decorators.CometdsSpanDecorator +org.apache.camel.tracing.decorators.CqlSpanDecorator +org.apache.camel.tracing.decorators.DirectSpanDecorator +org.apache.camel.tracing.decorators.DirectvmSpanDecorator +org.apache.camel.tracing.decorators.DisruptorSpanDecorator +org.apache.camel.tracing.decorators.DisruptorvmSpanDecorator +org.apache.camel.tracing.decorators.ElasticsearchSpanDecorator +org.apache.camel.tracing.decorators.HttpSpanDecorator +org.apache.camel.tracing.decorators.HttpsSpanDecorator +org.apache.camel.tracing.decorators.IronmqSpanDecorator +org.apache.camel.tracing.decorators.JdbcSpanDecorator +org.apache.camel.tracing.decorators.JettySpanDecorator +org.apache.camel.tracing.decorators.JmsSpanDecorator +org.apache.camel.tracing.decorators.KafkaSpanDecorator +org.apache.camel.tracing.decorators.LogSpanDecorator +org.apache.camel.tracing.decorators.MongoDBSpanDecorator +org.apache.camel.tracing.decorators.NettyHttpSpanDecorator +org.apache.camel.tracing.decorators.NatsSpanDecorator +org.apache.camel.tracing.decorators.NsqSpanDecorator +org.apache.camel.tracing.decorators.PahoSpanDecorator +org.apache.camel.tracing.decorators.PlatformHttpSpanDecorator +org.apache.camel.tracing.decorators.RestSpanDecorator +org.apache.camel.tracing.decorators.SedaSpanDecorator +org.apache.camel.tracing.decorators.ServletSpanDecorator +org.apache.camel.tracing.decorators.SjmsSpanDecorator +org.apache.camel.tracing.decorators.Sjms2SpanDecorator +org.apache.camel.tracing.decorators.SqlSpanDecorator +org.apache.camel.tracing.decorators.StompSpanDecorator +org.apache.camel.tracing.decorators.TimerSpanDecorator +org.apache.camel.tracing.decorators.UndertowSpanDecorator +org.apache.camel.tracing.decorators.VertxHttpSpanDecorator +org.apache.camel.tracing.decorators.VmSpanDecorator + diff --git a/components/camel-observation/src/test/java/org/apache/camel/observation/ABCRouteTest.java b/components/camel-observation/src/test/java/org/apache/camel/observation/ABCRouteTest.java new file mode 100644 index 00000000000..6c80d85ee6f --- /dev/null +++ b/components/camel-observation/src/test/java/org/apache/camel/observation/ABCRouteTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.observation; + +import io.opentelemetry.api.trace.SpanKind; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Test; + +class ABCRouteTest extends CamelMicrometerObservationTestSupport { + private static SpanTestData[] testdata = { + new SpanTestData().setLabel("seda:b server").setUri("seda://b").setOperation("b") + .setParentId(2).addLogMessage("routing at b"), + new SpanTestData().setLabel("seda:c server").setUri("seda://c").setOperation("c") + .setParentId(2).addLogMessage("Exchange[ExchangePattern: InOut, BodyType: String, Body: Hello]"), + new SpanTestData().setLabel("seda:a server").setUri("seda://a").setOperation("a") + .setParentId(3).addLogMessage("routing at a").addLogMessage("End of routing"), + new SpanTestData().setLabel("direct:start server").setUri("direct://start").setOperation("start").setKind(SpanKind.SERVER) + }; + + ABCRouteTest() { + super(testdata); + } + + @Test + void testRoute() { + template.requestBody("direct:start", "Hello"); + verify(); + } + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start").to("seda:a").routeId("start"); + + from("seda:a").routeId("a") + .log("routing at ${routeId}") + .to("seda:b") + .delay(2000) + .to("seda:c") + .log("End of routing"); + + from("seda:b").routeId("b") + .log("routing at ${routeId}") + .delay(simple("${random(1000,2000)}")); + + from("seda:c").routeId("c") + .to("log:test") + .delay(simple("${random(0,100)}")); + } + }; + } +} diff --git a/components/camel-observation/src/test/java/org/apache/camel/observation/CamelMicrometerObservationTestSupport.java b/components/camel-observation/src/test/java/org/apache/camel/observation/CamelMicrometerObservationTestSupport.java new file mode 100644 index 00000000000..b4e8b7ef50f --- /dev/null +++ b/components/camel-observation/src/test/java/org/apache/camel/observation/CamelMicrometerObservationTestSupport.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.observation; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import io.micrometer.observation.ObservationHandler; +import io.micrometer.observation.ObservationRegistry; +import io.micrometer.observation.tck.TestObservationRegistry; +import io.micrometer.tracing.handler.DefaultTracingObservationHandler; +import io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler; +import io.micrometer.tracing.handler.PropagatingSenderTracingObservationHandler; +import io.micrometer.tracing.otel.bridge.OtelBaggageManager; +import io.micrometer.tracing.otel.bridge.OtelCurrentTraceContext; +import io.micrometer.tracing.otel.bridge.OtelPropagator; +import io.micrometer.tracing.otel.bridge.OtelTracer; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.extension.trace.propagation.B3Propagator; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.ReadWriteSpan; +import io.opentelemetry.sdk.trace.ReadableSpan; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.SpanProcessor; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import org.apache.camel.CamelContext; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.apache.camel.tracing.SpanDecorator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +class CamelMicrometerObservationTestSupport extends CamelTestSupport { + + static final AttributeKey<String> CAMEL_URI_KEY = AttributeKey.stringKey("camel-uri"); + static final AttributeKey<String> COMPONENT_KEY = AttributeKey.stringKey("component"); + static final AttributeKey<String> PRE_KEY = AttributeKey.stringKey("pre"); + static final AttributeKey<String> POST_KEY = AttributeKey.stringKey("post"); + + private static final Logger LOG = LoggerFactory.getLogger(CamelMicrometerObservationTestSupport.class); + + private InMemorySpanExporter inMemorySpanExporter = InMemorySpanExporter.create(); + private SpanTestData[] expected; + private Tracer tracer; + private MicrometerObservationTracer micrometerObservationTracer; + private SdkTracerProvider tracerFactory; + + private MeterRegistry meterRegistry = new SimpleMeterRegistry(); + + private ObservationRegistry observationRegistry; + + CamelMicrometerObservationTestSupport(SpanTestData[] expected) { + this.expected = expected; + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + micrometerObservationTracer = new MicrometerObservationTracer(); + + tracerFactory = SdkTracerProvider.builder() + .addSpanProcessor(new LoggingSpanProcessor()) + .addSpanProcessor(SimpleSpanProcessor.create(inMemorySpanExporter)).build(); + + tracer = tracerFactory.get("tracerTest"); + + observationRegistry = ObservationRegistry.create(); + observationRegistry.observationConfig().observationHandler(new DefaultMeterObservationHandler(meterRegistry)); + + io.micrometer.tracing.Tracer otelTracer = otelTracer(); + OtelPropagator otelPropagator = new OtelPropagator(ContextPropagators.create(B3Propagator.injectingSingleHeader()), tracer); + observationRegistry.observationConfig().observationHandler( + new ObservationHandler.FirstMatchingCompositeObservationHandler( + new PropagatingSenderTracingObservationHandler<>(otelTracer, otelPropagator), + new PropagatingReceiverTracingObservationHandler<>(otelTracer, otelPropagator), + new DefaultTracingObservationHandler(otelTracer))); + + micrometerObservationTracer.setObservationRegistry(observationRegistry); + // if you want baggage + micrometerObservationTracer.setTracer(otelTracer); + micrometerObservationTracer.setExcludePatterns(getExcludePatterns()); + micrometerObservationTracer.addDecorator(new TestSEDASpanDecorator()); + micrometerObservationTracer.init(context); + return context; + } + + private OtelTracer otelTracer() { + OtelCurrentTraceContext otelCurrentTraceContext = new OtelCurrentTraceContext(); + OtelBaggageManager otelBaggageManager = new OtelBaggageManager(otelCurrentTraceContext, Collections.emptyList(), Collections.emptyList()); + return new OtelTracer(tracer, otelCurrentTraceContext, o -> { + + }, otelBaggageManager); + } + + protected Set<String> getExcludePatterns() { + return new HashSet<>(); + } + + protected void verify() { + verify(expected, false); + } + + protected void verify(boolean async) { + verify(expected, async); + } + + protected List<SpanData> verify(SpanTestData[] expected, boolean async) { + List<SpanData> spans = inMemorySpanExporter.getFinishedSpanItems(); + spans.forEach(mockSpan -> { + LOG.info("Span: " + mockSpan); + LOG.info("\tComponent: " + mockSpan.getAttributes().get(COMPONENT_KEY)); + LOG.info("\tTags: " + mockSpan.getAttributes()); + LOG.info("\tLogs: "); + + }); + assertEquals(expected.length, spans.size(), "Incorrect number of spans"); + verifySameTrace(); + + if (async) { + final List<SpanData> unsortedSpans = spans; + spans = Arrays.stream(expected) + .map(td -> findSpan(td, unsortedSpans)).distinct().collect(Collectors.toList()); + assertEquals(expected.length, spans.size(), "Incorrect number of spans after sorting"); + } + + for (int i = 0; i < expected.length; i++) { + verifySpan(i, expected, spans); + } + + return spans; + } + + protected SpanData findSpan(SpanTestData testdata, List<SpanData> spans) { + return spans.stream().filter(s -> { + boolean matched = s.getName().equals(testdata.getOperation()); + if (s.getAttributes().get(CAMEL_URI_KEY) != null) { + matched = matched && s.getAttributes().get(CAMEL_URI_KEY).equals(testdata.getUri()); + } + matched = matched && s.getKind().equals(testdata.getKind()); + return matched; + }).findFirst().orElse(null); + } + + protected Tracer getTracer() { + return tracer; + } + + protected void verifyTraceSpanNumbers(int numOfTraces, int numSpansPerTrace) { + Map<String, List<SpanData>> traces = new HashMap<>(); + + List<SpanData> finishedSpans = inMemorySpanExporter.getFinishedSpanItems(); + // Sort spans into separate traces + for (int i = 0; i < finishedSpans.size(); i++) { + List<SpanData> spans = traces.computeIfAbsent(finishedSpans.get(i).getTraceId(), k -> new ArrayList<>()); + spans.add(finishedSpans.get(i)); + } + + assertEquals(numOfTraces, traces.size()); + + for (Map.Entry<String, List<SpanData>> spans : traces.entrySet()) { + assertEquals(numSpansPerTrace, spans.getValue().size()); + } + } + + protected void verifySpan(int index, SpanTestData[] testdata, List<SpanData> spans) { + SpanData span = spans.get(index); + SpanTestData td = testdata[index]; + + String component = span.getAttributes().get(COMPONENT_KEY); + assertNotNull(component); + + if (td.getUri() != null) { + assertEquals(SpanDecorator.CAMEL_COMPONENT + URI.create(td.getUri()).getScheme(), component, td.getLabel()); + } + + if ("camel-seda".equals(component)) { + assertNotNull(span.getAttributes().get(PRE_KEY)); + assertNotNull(span.getAttributes().get(POST_KEY)); + } + + assertEquals(td.getOperation(), span.getName(), td.getLabel()); + + assertEquals(td.getKind(), span.getKind(), td.getLabel()); + + if (!td.getLogMessages().isEmpty()) { + assertEquals(td.getLogMessages().size(), span.getEvents().size(), td.getLabel()); + for (int i = 0; i < td.getLogMessages().size(); i++) { + assertEquals(td.getLogMessages().get(i), span.getEvents().get(i).getName()); // The difference between OTel directly and Observation is that we log with a name + } + } + + if (td.getParentId() != -1) { + assertEquals(spans.get(td.getParentId()).getSpanId(), span.getParentSpanId(), td.getLabel()); + } + if (!td.getTags().isEmpty()) { + for (Map.Entry<String, String> entry : td.getTags().entrySet()) { + assertEquals(entry.getValue(), span.getAttributes().get(AttributeKey.stringKey(entry.getKey()))); + } + } + + } + + protected void verifySameTrace() { + assertEquals(1, inMemorySpanExporter.getFinishedSpanItems().stream().map(s -> s.getTraceId()).distinct().count()); + } + + private static class LoggingSpanProcessor implements SpanProcessor { + private static final Logger LOG = LoggerFactory.getLogger(LoggingSpanProcessor.class); + + @Override + public void onStart(Context context, ReadWriteSpan readWriteSpan) { + LOG.debug("Span started: name - '{}', kind - '{}', id - '{}-{}", readWriteSpan.getName(), readWriteSpan.getKind(), + readWriteSpan.getSpanContext().getTraceId(), readWriteSpan.getSpanContext().getSpanId()); + } + + @Override + public boolean isStartRequired() { + return true; + } + + @Override + public void onEnd(ReadableSpan readableSpan) { + LOG.debug("Span ended: name - '{}', kind - '{}', id - '{}-{}", readableSpan.getName(), readableSpan.getKind(), + readableSpan.getSpanContext().getTraceId(), readableSpan.getSpanContext().getSpanId()); + } + + @Override + public boolean isEndRequired() { + return true; + } + } +} diff --git a/components/camel-observation/src/test/java/org/apache/camel/observation/ClientRecipientListRouteTest.java b/components/camel-observation/src/test/java/org/apache/camel/observation/ClientRecipientListRouteTest.java new file mode 100644 index 00000000000..9075ddbcc08 --- /dev/null +++ b/components/camel-observation/src/test/java/org/apache/camel/observation/ClientRecipientListRouteTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.observation; + +import io.opentelemetry.api.trace.SpanKind; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Test; + +class ClientRecipientListRouteTest extends CamelMicrometerObservationTestSupport { + + private static SpanTestData[] testdata = { + new SpanTestData().setLabel("seda:a server").setUri("seda://a").setOperation("a") + .setParentId(3), + new SpanTestData().setLabel("seda:b server").setUri("seda://b").setOperation("b") + .setParentId(3), + new SpanTestData().setLabel("seda:c server").setUri("seda://c").setOperation("c") + .setParentId(3), + new SpanTestData().setLabel("direct:start server").setUri("direct://start").setOperation("start").setKind(SpanKind.SERVER) + }; + + ClientRecipientListRouteTest() { + super(testdata); + } + + @Test + void testRoute() { + template.requestBody("direct:start", "Hello"); + + verify(); + } + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start").recipientList(constant("seda:a,seda:b,seda:c")).routeId("start"); + + from("seda:a").routeId("a") + .log("routing at ${routeId}"); + + from("seda:b").routeId("b") + .log("routing at ${routeId}") + .delay(simple("${random(1000,2000)}")); + + from("seda:c").routeId("c") + .log("routing at ${routeId}") + .delay(simple("${random(0,100)}")); + } + }; + } +} diff --git a/components/camel-observation/src/test/java/org/apache/camel/observation/CurrentSpanTest.java b/components/camel-observation/src/test/java/org/apache/camel/observation/CurrentSpanTest.java new file mode 100644 index 00000000000..fb5353bb576 --- /dev/null +++ b/components/camel-observation/src/test/java/org/apache/camel/observation/CurrentSpanTest.java @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.observation; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.sdk.trace.ReadableSpan; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.internal.data.ExceptionEventData; +import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelExecutionException; +import org.apache.camel.Consumer; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockComponent; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.support.DefaultAsyncProducer; +import org.apache.camel.support.DefaultConsumer; +import org.apache.camel.support.DefaultProducer; +import org.apache.camel.tracing.ActiveSpanManager; +import org.apache.camel.util.StopWatch; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class CurrentSpanTest extends CamelMicrometerObservationTestSupport { + CurrentSpanTest() { + super(new SpanTestData[0]); + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + context.addComponent("asyncmock", new AsyncMockComponent()); + context.addComponent("asyncmock1", new AsyncMockComponent()); + context.addComponent("asyncmock2", new AsyncMockComponent()); + context.addComponent("asyncmock3", new AsyncMockComponent()); + context.addComponent("syncmock", new SyncMockComponent()); + + return context; + } + + @Test + void testSync() { + SpanTestData[] expectedSpans = { + new SpanTestData().setLabel("syncmock:result").setUri("syncmock://result").setOperation("syncmock") + .setKind(SpanKind.CLIENT), + new SpanTestData().setLabel("direct:bar").setUri("direct://bar").setOperation("bar").setKind(SpanKind.SERVER) + }; + + // sync pipeline + template.sendBody("direct:bar", "Hello World"); + + List<SpanData> spans = verify(expectedSpans, false); + assertEquals(spans.get(0).getParentSpanId(), spans.get(1).getSpanId()); + + // validates that span was active in async producer's processor + assertFalse(Span.current().getSpanContext().isValid()); + } + + @Test + void testSyncToAsync() { + SpanTestData[] expectedSpans = { + new SpanTestData().setLabel("asyncmock1:result").setUri("asyncmock1://result").setOperation("asyncmock1") + .setKind(SpanKind.CLIENT), + new SpanTestData().setLabel("direct:foo").setUri("direct://foo").setOperation("foo").setKind(SpanKind.SERVER) + }; + + // sync to async pipeline + template.sendBody("direct:foo", "Hello World"); + + List<SpanData> spans = verify(expectedSpans, false); + assertEquals(spans.get(0).getParentSpanId(), spans.get(1).getSpanId()); + + // context is cleaned up + assertFalse(Span.current().getSpanContext().isValid()); + } + + @Test + void testAsyncToSync() { + // direct client spans (event spans) are not created, so we saw only two spans in previous tests + SpanTestData[] expectedSpans = { + new SpanTestData().setLabel("syncmock:result").setUri("syncmock://result").setOperation("syncmock") + .setKind(SpanKind.CLIENT), + new SpanTestData().setLabel("asyncmock1:start").setUri("asyncmock1://start").setOperation("asyncmock1"), + new SpanTestData().setLabel("asyncmock1:start").setUri("asyncmock1://start").setOperation("asyncmock1") + .setKind(SpanKind.CLIENT), + }; + + // sync pipeline + template.sendBody("asyncmock1:start", "Hello World"); + + List<SpanData> spans = verify(expectedSpans, false); + assertEquals(spans.get(0).getParentSpanId(), spans.get(1).getSpanId()); + assertFalse(Span.current().getSpanContext().isValid()); + } + + @Test + void testAsyncToAsync() { + SpanTestData[] expectedSpans = { + new SpanTestData().setLabel("asyncmock2:result").setUri("asyncmock2://result").setOperation("asyncmock2") + .setKind(SpanKind.CLIENT), + new SpanTestData().setLabel("asyncmock2:start").setUri("asyncmock2://start").setOperation("asyncmock2"), + new SpanTestData().setLabel("asyncmock2:start").setUri("asyncmock2://start").setOperation("asyncmock2") + .setKind(SpanKind.CLIENT), + }; + + // async pipeline + template.sendBody("asyncmock2:start", "Hello World"); + + List<SpanData> spans = verify(expectedSpans, false); + assertEquals(spans.get(0).getParentSpanId(), spans.get(1).getSpanId()); + assertFalse(Span.current().getSpanContext().isValid()); + } + + @Test + void testAsyncFailure() { + SpanTestData[] expectedSpans = { + new SpanTestData().setLabel("asyncmock:fail").setUri("asyncmock://fail").setOperation("asyncmock"), + new SpanTestData().setLabel("asyncmock:fail").setUri("asyncmock://fail").setOperation("asyncmock") + .setKind(SpanKind.CLIENT), + }; + + assertThrows(CamelExecutionException.class, () -> template.sendBody("asyncmock:fail", "Hello World")); + assertFalse(Span.current().getSpanContext().isValid()); + + List<SpanData> spans = verify(expectedSpans, false); + assertEquals(spans.get(0).getParentSpanId(), spans.get(1).getSpanId()); + + assertNotNull(((ExceptionEventData) spans.get(0).getEvents().get(0)).getException()); + assertNotNull(((ExceptionEventData) spans.get(1).getEvents().get(0)).getException()); + + } + + @Test + void testMulticastAsync() { + SpanTestData[] expectedSpans = { + new SpanTestData().setLabel("asyncmock1:result").setUri("asyncmock1://result").setOperation("asyncmock1") + .setKind(SpanKind.CLIENT), + new SpanTestData().setLabel("asyncmock2:result").setUri("asyncmock2://result").setOperation("asyncmock2") + .setKind(SpanKind.CLIENT), + new SpanTestData().setLabel("syncmock:result").setUri("syncmock://result").setOperation("syncmock") + .setKind(SpanKind.CLIENT), + new SpanTestData().setLabel("direct:start").setUri("direct://start").setOperation("start").setKind(SpanKind.SERVER) + }; + + // sync pipeline + template.sendBody("direct:start", "Hello World"); + + List<SpanData> spans = verify(expectedSpans, false); + assertEquals(spans.get(0).getParentSpanId(), spans.get(3).getSpanId()); + assertEquals(spans.get(1).getParentSpanId(), spans.get(3).getSpanId()); + assertEquals(spans.get(2).getParentSpanId(), spans.get(3).getSpanId()); + assertFalse(Span.current().getSpanContext().isValid()); + } + + @Test + void testContextDoesNotLeak() { + for (int i = 0; i < 30; i++) { + template.sendBody("asyncmock3:start", String.valueOf(i)); + assertFalse(Span.current().getSpanContext().isValid()); + } + + verifyTraceSpanNumbers(30, 10); + } + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + // sync pipeline + from("direct:bar").to("syncmock:result"); + + // sync to async pipeline + from("direct:foo").to("asyncmock1:result"); + + // async to sync pipeline + from("asyncmock1:start").to("syncmock:result"); + + // async pipeline + from("asyncmock2:start").to("asyncmock2:result"); + + // async fail + from("asyncmock:fail").process(i -> { + throw new IOException("error"); + }); + + // multicast pipeline + from("direct:start").multicast() + .to("asyncmock1:result") + .to("asyncmock2:result") + .to("syncmock:result"); + + // stress pipeline + from("asyncmock3:start").multicast() + .aggregationStrategy((oldExchange, newExchange) -> { + checkCurrentSpan(newExchange); + return newExchange; + }) + .executorService(context.getExecutorServiceManager().newFixedThreadPool(this, "CurrentSpanTest", 10)) + .streaming() + .delay(10) + .to("log:line", "asyncmock1:start") + .to("log:line", "asyncmock2:start") + .to("log:line", "direct:bar") + .process(ex -> checkCurrentSpan(ex)); + } + }; + } + + private static void checkCurrentSpan(Exchange exc) { + String errorMessage = null; + if (Span.current() instanceof ReadableSpan) { + ReadableSpan readable = (ReadableSpan) Span.current(); + errorMessage = String.format( + "Current span: name - '%s', kind - '%s', ended - `%s', id - '%s-%s', exchange id - '%s-%s', thread - '%s'\n", + readable.getName(), readable.getKind(), readable.hasEnded(), + readable.getSpanContext().getTraceId(), readable.getSpanContext().getSpanId(), + ActiveSpanManager.getSpan(exc).traceId(), ActiveSpanManager.getSpan(exc).spanId(), + Thread.currentThread().getName()); + + } + + assertFalse(Span.current().getSpanContext().isValid(), errorMessage); + } + + private static class AsyncMockComponent extends MockComponent { + + @Override + protected Endpoint createEndpoint(String uri, String key, Map<String, Object> parameters) { + return new AsyncMockEndpoint(this, uri, key); + } + } + + private static class AsyncMockEndpoint extends MockEndpoint { + private static final Executor DELAYED + = CompletableFuture.delayedExecutor(10L, TimeUnit.MILLISECONDS, new ForkJoinPool(3)); + + private Consumer consumer; + private final String key; + + public AsyncMockEndpoint(AsyncMockComponent component, String uri, String key) { + super(uri, component); + this.key = key; + } + + @Override + public Consumer createConsumer(Processor processor) { + consumer = new DefaultConsumer(this, exchange -> { + assertCurrentSpan(exchange); + processor.process(exchange); + }); + try { + configureConsumer(consumer); + } catch (Exception e) { + // ignore + } + return consumer; + } + + @Override + public Producer createProducer() { + return new DefaultAsyncProducer(this) { + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + assertCurrentSpan(exchange); + if (!key.equals("result")) { + try { + getConsumer(1000).getProcessor().process(exchange); + } catch (Exception e) { + fail(e); + } + } + CompletableFuture.runAsync(() -> { + }, DELAYED) + .thenRun(() -> callback.run()); + + return false; + } + }; + } + + private Consumer getConsumer(long timeout) throws InterruptedException { + StopWatch watch = new StopWatch(); + while (consumer == null) { + long rem = timeout - watch.taken(); + if (rem <= 0) { + break; + } + consumer.wait(rem); + } + return consumer; + } + } + + private static class SyncMockComponent extends MockComponent { + + @Override + protected Endpoint createEndpoint(String uri, String key, Map<String, Object> parameters) { + return new SyncMockEndpoint(this, uri, key); + } + } + + private static class SyncMockEndpoint extends MockEndpoint { + public SyncMockEndpoint(SyncMockComponent component, String uri, String key) { + super(uri, component); + } + + @Override + public Producer createProducer() { + return new DefaultProducer(this) { + @Override + public void process(Exchange exchange) { + assertCurrentSpan(exchange); + } + }; + } + } + + private static void assertCurrentSpan(Exchange exchange) { + assertEquals(Span.current().getSpanContext().getSpanId(), ActiveSpanManager.getSpan(exchange).spanId()); + } +} diff --git a/components/camel-observation/src/test/java/org/apache/camel/observation/CustomComponentNameRouteTest.java b/components/camel-observation/src/test/java/org/apache/camel/observation/CustomComponentNameRouteTest.java new file mode 100644 index 00000000000..5c5d3c0ed0c --- /dev/null +++ b/components/camel-observation/src/test/java/org/apache/camel/observation/CustomComponentNameRouteTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.observation; + +import io.opentelemetry.api.trace.SpanKind; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Test; + +class CustomComponentNameRouteTest extends CamelMicrometerObservationTestSupport { + + private static SpanTestData[] testdata = { + new SpanTestData().setLabel("myseda:b server").setUri("myseda://b").setOperation("b") + .setParentId(2).addLogMessage("routing at b"), + new SpanTestData().setLabel("myseda:c server").setUri("myseda://c").setOperation("c") + .setParentId(2).addLogMessage("Exchange[ExchangePattern: InOut, BodyType: String, Body: Hello]"), + new SpanTestData().setLabel("myseda:a server").setUri("myseda://a").setOperation("a") + .setParentId(3).addLogMessage("routing at a").addLogMessage("End of routing"), + new SpanTestData().setLabel("direct:start server").setUri("direct://start").setOperation("start").setKind(SpanKind.SERVER) + }; + + CustomComponentNameRouteTest() { + super(testdata); + } + + @Test + void testRoute() { + template.requestBody("direct:start", "Hello"); + + verify(); + } + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + context.addComponent("myseda", context.getComponent("seda")); + + from("direct:start").to("myseda:a").routeId("start"); + + from("myseda:a").routeId("a") + .log("routing at ${routeId}") + .to("myseda:b") + .delay(2000) + .to("myseda:c") + .log("End of routing"); + + from("myseda:b").routeId("b") + .log("routing at ${routeId}") + .delay(simple("${random(1000,2000)}")); + + from("myseda:c").routeId("c") + .to("log:test") + .delay(simple("${random(0,100)}")); + } + }; + } +} diff --git a/components/camel-observation/src/test/java/org/apache/camel/observation/MulticastParallelRouteTest.java b/components/camel-observation/src/test/java/org/apache/camel/observation/MulticastParallelRouteTest.java new file mode 100644 index 00000000000..96c98861c3d --- /dev/null +++ b/components/camel-observation/src/test/java/org/apache/camel/observation/MulticastParallelRouteTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.observation; + +import io.opentelemetry.api.trace.SpanKind; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Test; + +class MulticastParallelRouteTest extends CamelMicrometerObservationTestSupport { + + private static SpanTestData[] testdata = { + new SpanTestData().setLabel("seda:b server").setUri("seda://b").setOperation("b") + .setParentId(2).addLogMessage("routing at b"), + new SpanTestData().setLabel("seda:c server").setUri("seda://c").setOperation("c") + .setParentId(2).addLogMessage("routing at c"), + new SpanTestData().setLabel("seda:a server").setUri("seda://a").setOperation("a") + .setParentId(3).addLogMessage("routing at a").addLogMessage("End of routing"), + new SpanTestData().setLabel("direct:start server").setUri("direct://start").setOperation("start").setKind(SpanKind.SERVER) + }; + + MulticastParallelRouteTest() { + super(testdata); + } + + @Test + void testRoute() { + template.requestBody("direct:start", "Hello"); + verify(true); + } + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start").to("seda:a").routeId("start"); + + from("seda:a").routeId("a") + .log("routing at ${routeId}") + .multicast().parallelProcessing() + .to("seda:b", "seda:c") + .end() + .log("End of routing"); + + from("seda:b").routeId("b") + .log("routing at ${routeId}") + .delay(simple("${random(1000,2000)}")); + + from("seda:c").routeId("c") + .log("routing at ${routeId}") + .delay(simple("${random(0,100)}")); + } + }; + } +} diff --git a/components/camel-observation/src/test/java/org/apache/camel/observation/MulticastRouteTest.java b/components/camel-observation/src/test/java/org/apache/camel/observation/MulticastRouteTest.java new file mode 100644 index 00000000000..40433a78619 --- /dev/null +++ b/components/camel-observation/src/test/java/org/apache/camel/observation/MulticastRouteTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.observation; + +import io.opentelemetry.api.trace.SpanKind; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Test; + +class MulticastRouteTest extends CamelMicrometerObservationTestSupport { + + private static SpanTestData[] testdata = { + new SpanTestData().setLabel("seda:b server").setUri("seda://b").setOperation("b") + .setParentId(2), + new SpanTestData().setLabel("seda:c server").setUri("seda://c").setOperation("c") + .setParentId(2), + new SpanTestData().setLabel("seda:a server").setUri("seda://a").setOperation("a") + .setParentId(3), + new SpanTestData().setLabel("direct:start server").setUri("direct://start").setOperation("start").setKind(SpanKind.SERVER) + }; + + MulticastRouteTest() { + super(testdata); + } + + @Test + void testRoute() { + template.requestBody("direct:start", "Hello"); + + verify(); + } + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start").to("seda:a").routeId("start"); + + from("seda:a").routeId("a") + .log("routing at ${routeId}") + .multicast() + .to("seda:b") + .to("seda:c") + .end() + .log("End of routing"); + + from("seda:b").routeId("b") + .log("routing at ${routeId}") + .delay(simple("${random(1000,2000)}")); + + from("seda:c").routeId("c") + .log("routing at ${routeId}") + .delay(simple("${random(0,100)}")); + } + }; + } +} diff --git a/components/camel-observation/src/test/java/org/apache/camel/observation/RouteConcurrentTest.java b/components/camel-observation/src/test/java/org/apache/camel/observation/RouteConcurrentTest.java new file mode 100644 index 00000000000..937c12fdd8d --- /dev/null +++ b/components/camel-observation/src/test/java/org/apache/camel/observation/RouteConcurrentTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.observation; + +import java.util.concurrent.TimeUnit; + +import io.opentelemetry.api.trace.SpanKind; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.NotifyBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +class RouteConcurrentTest extends CamelMicrometerObservationTestSupport { + + private static SpanTestData[] testdata = { + new SpanTestData().setLabel("seda:foo server").setUri("seda://foo?concurrentConsumers=5").setOperation("foo") + .setKind(SpanKind.SERVER), + new SpanTestData().setLabel("seda:bar server").setUri("seda://bar?concurrentConsumers=5").setOperation("bar") + .setParentId(0) + }; + + RouteConcurrentTest() { + super(testdata); + } + + @Test + void testSingleInvocationsOfRoute() { + NotifyBuilder notify = new NotifyBuilder(context).whenDone(2).create(); + + template.sendBody("seda:foo", "Hello World"); + + assertTrue(notify.matches(30, TimeUnit.SECONDS)); + + verify(); + } + + @Test + void testConcurrentInvocationsOfRoute() { + NotifyBuilder notify = new NotifyBuilder(context).whenDone(10).create(); + + for (int i = 0; i < 5; i++) { + template.sendBody("seda:foo", "Hello World"); + } + + assertTrue(notify.matches(30, TimeUnit.SECONDS)); + + verifyTraceSpanNumbers(5, testdata.length); + } + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("seda:foo?concurrentConsumers=5").routeId("foo") + .log("routing at ${routeId}") + .delay(simple("${random(1000,2000)}")) + .to("seda:bar"); + + from("seda:bar?concurrentConsumers=5").routeId("bar") + .log("routing at ${routeId}") + .delay(simple("${random(0,500)}")); + } + }; + } +} diff --git a/components/camel-observation/src/test/java/org/apache/camel/observation/SpanProcessorsTest.java b/components/camel-observation/src/test/java/org/apache/camel/observation/SpanProcessorsTest.java new file mode 100644 index 00000000000..2b345f13348 --- /dev/null +++ b/components/camel-observation/src/test/java/org/apache/camel/observation/SpanProcessorsTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.observation; + +import io.opentelemetry.api.trace.SpanKind; +import org.apache.camel.Exchange; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Test; + +import static org.apache.camel.language.simple.SimpleLanguage.simple; +import static org.junit.jupiter.api.Assertions.assertEquals; + +class SpanProcessorsTest extends CamelMicrometerObservationTestSupport { + + private static final SpanTestData[] TEST_DATA = { + new SpanTestData().setLabel("seda:b server").setUri("seda://b").setOperation("b") + .setParentId(2).addLogMessage("routing at b") + .addTag("b-tag", "request-header-value"), + new SpanTestData().setLabel("seda:c server").setUri("seda://c").setOperation("c") + .setParentId(2).addLogMessage("Exchange[ExchangePattern: InOut, BodyType: String, Body: Hello]"), + new SpanTestData().setLabel("seda:a server").setUri("seda://a").setOperation("a") + .setParentId(3).addLogMessage("routing at a").addLogMessage("End of routing"), + new SpanTestData().setLabel("direct:start server").setUri("direct://start").setOperation("start").setKind(SpanKind.SERVER) + }; + + SpanProcessorsTest() { + super(TEST_DATA); + } + + @Test + void testRoute() { + Exchange result = template.request("direct:start", + exchange -> { + exchange.getIn().setBody("Hello"); + exchange.getIn().setHeader("request-header", + context.resolveLanguage("simple").createExpression("request-header-value")); + }); + + verify(); + assertEquals("request-header-value", result.getMessage().getHeader("baggage-header", String.class)); + } + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start").to("seda:a").routeId("start"); + + from("seda:a").routeId("a") + .log("routing at ${routeId}") + .process(new SetCorrelationContextProcessor("a-baggage", simple("${header.request-header}"))) + .to("seda:b") + .delay(2000) + .to("seda:c") + .log("End of routing"); + + from("seda:b").routeId("b") + .log("routing at ${routeId}") + .process(new AttributeProcessor("b-tag", simple("${header.request-header}"))) + .delay(simple("${random(1000,2000)}")); + + from("seda:c").routeId("c") + .to("log:test") + .process(new GetCorrelationContextProcessor("a-baggage", "baggage-header")) + .delay(simple("${random(0,100)}")); + } + }; + } +} diff --git a/components/camel-observation/src/test/java/org/apache/camel/observation/SpanTestData.java b/components/camel-observation/src/test/java/org/apache/camel/observation/SpanTestData.java new file mode 100644 index 00000000000..aff1d7ea7ee --- /dev/null +++ b/components/camel-observation/src/test/java/org/apache/camel/observation/SpanTestData.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.observation; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import io.opentelemetry.api.trace.SpanKind; + +public class SpanTestData { + + private String label; + private String uri; + private String operation; + private SpanKind kind = SpanKind.INTERNAL; + private int parentId = -1; + private List<String> logMessages = new ArrayList<>(); + private Map<String, String> tags = new HashMap<>(); + private ArrayList<SpanTestData> childs = new ArrayList<>(); + private Map<String, String> baggage = new HashMap<>(); + + public String getLabel() { + return label; + } + + public SpanTestData setLabel(String label) { + this.label = label; + return this; + } + + public String getUri() { + return uri; + } + + public SpanTestData setUri(String uri) { + this.uri = uri; + return this; + } + + public String getOperation() { + return operation; + } + + public SpanTestData setOperation(String operation) { + this.operation = operation; + return this; + } + + public SpanKind getKind() { + return kind; + } + + public SpanTestData setKind(SpanKind kind) { + this.kind = kind; + return this; + } + + public int getParentId() { + return parentId; + } + + public SpanTestData setParentId(int parentId) { + this.parentId = parentId; + return this; + } + + public SpanTestData addLogMessage(String mesg) { + logMessages.add(mesg); + return this; + } + + public List<String> getLogMessages() { + return logMessages; + } + + public SpanTestData addTag(String key, String val) { + tags.put(key, val); + return this; + } + + public Map<String, String> getTags() { + return tags; + } + + public SpanTestData setChilds(SpanTestData[] childs) { + Collections.addAll(this.childs, childs); + return this; + } + +} diff --git a/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/JdbcSpanDecorator.java b/components/camel-observation/src/test/java/org/apache/camel/observation/TestSEDASpanDecorator.java similarity index 67% copy from components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/JdbcSpanDecorator.java copy to components/camel-observation/src/test/java/org/apache/camel/observation/TestSEDASpanDecorator.java index 12745783cb6..a2d89df37e2 100644 --- a/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/JdbcSpanDecorator.java +++ b/components/camel-observation/src/test/java/org/apache/camel/observation/TestSEDASpanDecorator.java @@ -14,35 +14,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.tracing.decorators; +package org.apache.camel.observation; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.tracing.SpanAdapter; -import org.apache.camel.tracing.Tag; +import org.apache.camel.tracing.decorators.SedaSpanDecorator; -public class JdbcSpanDecorator extends AbstractSpanDecorator { - - @Override - public String getComponent() { - return "jdbc"; - } - - @Override - public String getComponentClassName() { - return "org.apache.camel.component.jdbc.JdbcComponent"; - } +class TestSEDASpanDecorator extends SedaSpanDecorator { @Override public void pre(SpanAdapter span, Exchange exchange, Endpoint endpoint) { super.pre(span, exchange, endpoint); + span.setTag("pre", "test"); + } - span.setTag(Tag.DB_TYPE, "sql"); - - Object body = exchange.getIn().getBody(); - if (body instanceof String) { - span.setTag(Tag.DB_STATEMENT, (String) body); - } + @Override + public void post(SpanAdapter span, Exchange exchange, Endpoint endpoint) { + super.post(span, exchange, endpoint); + span.setTag("post", "test"); } } diff --git a/components/camel-observation/src/test/java/org/apache/camel/observation/TwoServiceTest.java b/components/camel-observation/src/test/java/org/apache/camel/observation/TwoServiceTest.java new file mode 100644 index 00000000000..d7f2f07fb55 --- /dev/null +++ b/components/camel-observation/src/test/java/org/apache/camel/observation/TwoServiceTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.observation; + +import io.opentelemetry.api.trace.SpanKind; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Test; + +class TwoServiceTest extends CamelMicrometerObservationTestSupport { + + private static SpanTestData[] testdata = { + new SpanTestData().setLabel("ServiceB server").setUri("direct://ServiceB").setOperation("service-b") + .setParentId(1), + new SpanTestData().setLabel("ServiceA server").setUri("direct://ServiceA").setOperation("ServiceA").setKind(SpanKind.SERVER) + }; + + TwoServiceTest() { + super(testdata); + } + + @Test + void testRoute() { + template.requestBody("direct:ServiceA", "Hello"); + + verify(); + } + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:ServiceA") + .log("ServiceA has been called") + .delay(simple("${random(1000,2000)}")) + .to("direct:ServiceB"); + + from("direct:ServiceB") + .log("ServiceB has been called") + .delay(simple("${random(0,500)}")); + } + }; + } +} diff --git a/components/camel-observation/src/test/java/org/apache/camel/observation/TwoServiceWithExcludeTest.java b/components/camel-observation/src/test/java/org/apache/camel/observation/TwoServiceWithExcludeTest.java new file mode 100644 index 00000000000..8bf3025fc2f --- /dev/null +++ b/components/camel-observation/src/test/java/org/apache/camel/observation/TwoServiceWithExcludeTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.observation; + +import java.util.Collections; +import java.util.Set; + +import io.opentelemetry.api.trace.SpanKind; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Test; + +class TwoServiceWithExcludeTest extends CamelMicrometerObservationTestSupport { + + private static SpanTestData[] testdata = { + new SpanTestData().setLabel("ServiceA server").setUri("direct://ServiceA").setOperation("ServiceA").setKind(SpanKind.SERVER) + }; + + TwoServiceWithExcludeTest() { + super(testdata); + } + + @Override + protected Set<String> getExcludePatterns() { + return Collections.singleton("direct:ServiceB"); + } + + @Test + void testRoute() { + template.requestBody("direct:ServiceA", "Hello"); + + verify(); + } + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:ServiceA") + .log("ServiceA has been called") + .delay(simple("${random(1000,2000)}")) + .to("direct:ServiceB"); + + from("direct:ServiceB") + .log("ServiceB has been called") + .delay(simple("${random(0,500)}")); + } + }; + } +} diff --git a/components/camel-observation/src/test/java/org/apache/camel/observation/otel/CamelDefaultTracingObservationHandler.java b/components/camel-observation/src/test/java/org/apache/camel/observation/otel/CamelDefaultTracingObservationHandler.java new file mode 100644 index 00000000000..1e9d37c9531 --- /dev/null +++ b/components/camel-observation/src/test/java/org/apache/camel/observation/otel/CamelDefaultTracingObservationHandler.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.observation.otel; + +import io.micrometer.common.util.StringUtils; +import io.micrometer.observation.Observation; +import io.micrometer.tracing.Span; +import io.micrometer.tracing.Tracer; +import io.micrometer.tracing.handler.DefaultTracingObservationHandler; + +public class CamelDefaultTracingObservationHandler extends DefaultTracingObservationHandler { + + /** + * Creates a new instance of {@link DefaultTracingObservationHandler}. + * + * @param tracer the tracer to use to record events + */ + public CamelDefaultTracingObservationHandler(Tracer tracer) { + super(tracer); + } + + // Current implementation of OpenTelemetry is not doing hyphens + // e.g. ServiceB does not become service-b + @Override + public String getSpanName(Observation.Context context) { + String name = context.getName(); + if (StringUtils.isNotBlank(context.getContextualName())) { + name = context.getContextualName(); + } + return name; + } + + @Override + public void tagSpan(Observation.Context context, Span span) { + super.tagSpan(context, span); + if (context.getError() != null) { + span.tag("error", "true"); + } + } +} diff --git a/components/camel-observation/src/test/java/org/apache/camel/observation/otel/CamelPropagatingReceiverTracingObservationHandler.java b/components/camel-observation/src/test/java/org/apache/camel/observation/otel/CamelPropagatingReceiverTracingObservationHandler.java new file mode 100644 index 00000000000..30533288d07 --- /dev/null +++ b/components/camel-observation/src/test/java/org/apache/camel/observation/otel/CamelPropagatingReceiverTracingObservationHandler.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.observation.otel; + +import io.micrometer.observation.transport.ReceiverContext; +import io.micrometer.tracing.Span; +import io.micrometer.tracing.Tracer; +import io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler; +import io.micrometer.tracing.propagation.Propagator; + +public class CamelPropagatingReceiverTracingObservationHandler<T extends ReceiverContext> extends PropagatingReceiverTracingObservationHandler<T> { + + static final String SPAN_DECORATOR_INTERNAL = "camel.micrometer.abstract-internal"; + + /** + * Creates a new instance of {@link PropagatingReceiverTracingObservationHandler}. + * + * @param tracer the tracer to use to record events + * @param propagator the mechanism to propagate tracing information from the carrier + */ + public CamelPropagatingReceiverTracingObservationHandler(Tracer tracer, Propagator propagator) { + super(tracer, propagator); + } + + @Override + public Span.Builder customizeExtractedSpan(T context, Span.Builder builder) { + boolean internalComponent = context.getOrDefault(SPAN_DECORATOR_INTERNAL, false); + if (internalComponent && context.getParentObservation() == null) { + builder.kind(null); + } + return builder; + } + + @Override + public void tagSpan(T context, Span span) { + super.tagSpan(context, span); + if (context.getError() != null) { + span.tag("error", "true"); + } + } +} diff --git a/components/camel-observation/src/test/java/org/apache/camel/observation/otel/CamelPropagatingSenderTracingObservationHandler.java b/components/camel-observation/src/test/java/org/apache/camel/observation/otel/CamelPropagatingSenderTracingObservationHandler.java new file mode 100644 index 00000000000..78d38754aa8 --- /dev/null +++ b/components/camel-observation/src/test/java/org/apache/camel/observation/otel/CamelPropagatingSenderTracingObservationHandler.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.observation.otel; + +import io.micrometer.observation.transport.SenderContext; +import io.micrometer.tracing.Span; +import io.micrometer.tracing.Tracer; +import io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler; +import io.micrometer.tracing.handler.PropagatingSenderTracingObservationHandler; +import io.micrometer.tracing.propagation.Propagator; + +public class CamelPropagatingSenderTracingObservationHandler<T extends SenderContext> extends PropagatingSenderTracingObservationHandler<T> { + + /** + * Creates a new instance of {@link PropagatingReceiverTracingObservationHandler}. + * + * @param tracer the tracer to use to record events + * @param propagator the mechanism to propagate tracing information from the carrier + */ + public CamelPropagatingSenderTracingObservationHandler(Tracer tracer, Propagator propagator) { + super(tracer, propagator); + } + + @Override + public void tagSpan(T context, Span span) { + super.tagSpan(context, span); + if (context.getError() != null) { + span.tag("error", "true"); + } + } +} diff --git a/components/camel-observation/src/test/resources/log4j2.properties b/components/camel-observation/src/test/resources/log4j2.properties new file mode 100644 index 00000000000..f54d01bb554 --- /dev/null +++ b/components/camel-observation/src/test/resources/log4j2.properties @@ -0,0 +1,29 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +appender.file.type=File +appender.file.name=file +appender.file.fileName=target/camel-observation-test.log +appender.file.layout.type=PatternLayout +appender.file.layout.pattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n +appender.out.type=Console +appender.out.name=out +appender.out.layout.type=PatternLayout +appender.out.layout.pattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n +logger.opentelemetry.name=org.apache.camel.observation +logger.opentelemetry.level=INFO +rootLogger.level=INFO +rootLogger.appenderRef.file.ref=file diff --git a/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryTracer.java b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryTracer.java index d56073a827d..d90d7092f40 100644 --- a/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryTracer.java +++ b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryTracer.java @@ -90,7 +90,7 @@ public class OpenTelemetryTracer extends org.apache.camel.tracing.Tracer { @Override protected SpanAdapter startSendingEventSpan( - String operationName, org.apache.camel.tracing.SpanKind kind, SpanAdapter parent) { + String operationName, org.apache.camel.tracing.SpanKind kind, SpanAdapter parent, Exchange exchange, InjectAdapter injectAdapter) { Baggage baggage = null; SpanBuilder builder = tracer.spanBuilder(operationName).setSpanKind(mapToSpanKind(kind)); if (parent != null) { diff --git a/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/OpenTracingTracer.java b/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/OpenTracingTracer.java index 7214c4ad228..ad5aa1c7793 100644 --- a/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/OpenTracingTracer.java +++ b/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/OpenTracingTracer.java @@ -97,7 +97,7 @@ public class OpenTracingTracer extends org.apache.camel.tracing.Tracer { } @Override - protected SpanAdapter startSendingEventSpan(String operationName, SpanKind kind, SpanAdapter parent) { + protected SpanAdapter startSendingEventSpan(String operationName, SpanKind kind, SpanAdapter parent, Exchange exchange, InjectAdapter injectAdapter) { SpanBuilder spanBuilder = tracer.buildSpan(operationName).withTag(Tags.SPAN_KIND.getKey(), mapToSpanKind(kind)); if (parent != null) { io.opentracing.Span parentSpan = ((OpenTracingSpanAdapter) parent).getOpenTracingSpan(); diff --git a/components/camel-tracing/src/main/java/org/apache/camel/tracing/SpanAdapter.java b/components/camel-tracing/src/main/java/org/apache/camel/tracing/SpanAdapter.java index d256f4528be..8413c3760a3 100644 --- a/components/camel-tracing/src/main/java/org/apache/camel/tracing/SpanAdapter.java +++ b/components/camel-tracing/src/main/java/org/apache/camel/tracing/SpanAdapter.java @@ -33,6 +33,26 @@ public interface SpanAdapter { void setTag(String key, Boolean value); + default void setLowCardinalityTag(Tag key, String value) { + setTag(key, value); + } + + default void setLowCardinalityTag(Tag key, Number value) { + setTag(key, value); + } + + default void setLowCardinalityTag(String key, String value) { + setTag(key, value); + } + + default void setLowCardinalityTag(String key, Number value) { + setTag(key, value); + } + + default void setLowCardinalityTag(String key, Boolean value) { + setTag(key, value); + } + void log(Map<String, String> log); String traceId(); diff --git a/components/camel-tracing/src/main/java/org/apache/camel/tracing/Tracer.java b/components/camel-tracing/src/main/java/org/apache/camel/tracing/Tracer.java index 6367270a188..8e9774745fc 100644 --- a/components/camel-tracing/src/main/java/org/apache/camel/tracing/Tracer.java +++ b/components/camel-tracing/src/main/java/org/apache/camel/tracing/Tracer.java @@ -78,7 +78,7 @@ public abstract class Tracer extends ServiceSupport implements RoutePolicyFactor protected abstract void initTracer(); - protected abstract SpanAdapter startSendingEventSpan(String operationName, SpanKind kind, SpanAdapter parent); + protected abstract SpanAdapter startSendingEventSpan(String operationName, SpanKind kind, SpanAdapter parent, Exchange exchange, InjectAdapter injectAdapter); protected abstract SpanAdapter startExchangeBeginSpan( Exchange exchange, SpanDecorator sd, String operationName, SpanKind kind, SpanAdapter parent); @@ -252,10 +252,11 @@ public abstract class Tracer extends ServiceSupport implements RoutePolicyFactor } SpanAdapter parent = ActiveSpanManager.getSpan(ese.getExchange()); + InjectAdapter injectAdapter = sd.getInjectAdapter(ese.getExchange().getIn().getHeaders(), encoding); SpanAdapter span = startSendingEventSpan(sd.getOperationName(ese.getExchange(), ese.getEndpoint()), - sd.getInitiatorSpanKind(), parent); + sd.getInitiatorSpanKind(), parent, ese.getExchange(), injectAdapter); sd.pre(span, ese.getExchange(), ese.getEndpoint()); - inject(span, sd.getInjectAdapter(ese.getExchange().getIn().getHeaders(), encoding)); + inject(span, injectAdapter); ActiveSpanManager.activate(ese.getExchange(), span); if (LOG.isTraceEnabled()) { LOG.trace("Tracing: start client span={}", span); diff --git a/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/AbstractHttpSpanDecorator.java b/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/AbstractHttpSpanDecorator.java index c4919271cc7..2b7081d7d36 100644 --- a/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/AbstractHttpSpanDecorator.java +++ b/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/AbstractHttpSpanDecorator.java @@ -71,7 +71,7 @@ public abstract class AbstractHttpSpanDecorator extends AbstractSpanDecorator { if (httpUrl != null) { span.setTag(Tag.HTTP_URL, httpUrl); } - span.setTag(Tag.HTTP_METHOD, getHttpMethod(exchange, endpoint)); + span.setLowCardinalityTag(Tag.HTTP_METHOD, getHttpMethod(exchange, endpoint)); } protected String getHttpURL(Exchange exchange, Endpoint endpoint) { @@ -101,7 +101,7 @@ public abstract class AbstractHttpSpanDecorator extends AbstractSpanDecorator { if (message != null) { Integer responseCode = message.getHeader(Exchange.HTTP_RESPONSE_CODE, Integer.class); if (responseCode != null) { - span.setTag(Tag.HTTP_STATUS, responseCode); + span.setLowCardinalityTag(Tag.HTTP_STATUS, responseCode); } } } diff --git a/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/CqlSpanDecorator.java b/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/CqlSpanDecorator.java index f8e07caabc0..61db6f68211 100644 --- a/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/CqlSpanDecorator.java +++ b/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/CqlSpanDecorator.java @@ -43,7 +43,7 @@ public class CqlSpanDecorator extends AbstractSpanDecorator { @Override public void pre(SpanAdapter span, Exchange exchange, Endpoint endpoint) { super.pre(span, exchange, endpoint); - span.setTag(Tag.DB_TYPE, CASSANDRA_DB_TYPE); + span.setLowCardinalityTag(Tag.DB_TYPE, CASSANDRA_DB_TYPE); URI uri = URI.create(endpoint.getEndpointUri()); if (uri.getPath() != null && uri.getPath().length() > 0) { // Strip leading '/' from path diff --git a/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/ElasticsearchSpanDecorator.java b/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/ElasticsearchSpanDecorator.java index 043a9e59d6a..bb249cba2b8 100644 --- a/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/ElasticsearchSpanDecorator.java +++ b/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/ElasticsearchSpanDecorator.java @@ -50,7 +50,7 @@ public class ElasticsearchSpanDecorator extends AbstractSpanDecorator { @Override public void pre(SpanAdapter span, Exchange exchange, Endpoint endpoint) { super.pre(span, exchange, endpoint); - span.setTag(Tag.DB_TYPE, ELASTICSEARCH_DB_TYPE); + span.setLowCardinalityTag(Tag.DB_TYPE, ELASTICSEARCH_DB_TYPE); Map<String, String> queryParameters = toQueryParameters(endpoint.getEndpointUri()); if (queryParameters.containsKey("indexName")) { diff --git a/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/JdbcSpanDecorator.java b/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/JdbcSpanDecorator.java index 12745783cb6..0d079aef364 100644 --- a/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/JdbcSpanDecorator.java +++ b/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/JdbcSpanDecorator.java @@ -37,7 +37,7 @@ public class JdbcSpanDecorator extends AbstractSpanDecorator { public void pre(SpanAdapter span, Exchange exchange, Endpoint endpoint) { super.pre(span, exchange, endpoint); - span.setTag(Tag.DB_TYPE, "sql"); + span.setLowCardinalityTag(Tag.DB_TYPE, "sql"); Object body = exchange.getIn().getBody(); if (body instanceof String) { diff --git a/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/MongoDBSpanDecorator.java b/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/MongoDBSpanDecorator.java index 31add1d1e76..3ec32145bb2 100644 --- a/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/MongoDBSpanDecorator.java +++ b/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/MongoDBSpanDecorator.java @@ -49,7 +49,7 @@ public class MongoDBSpanDecorator extends AbstractSpanDecorator { public void pre(SpanAdapter span, Exchange exchange, Endpoint endpoint) { super.pre(span, exchange, endpoint); - span.setTag(Tag.DB_TYPE, getComponent()); + span.setLowCardinalityTag(Tag.DB_TYPE, getComponent()); Map<String, String> queryParameters = toQueryParameters(endpoint.getEndpointUri()); String database = queryParameters.get("database"); if (database != null) { diff --git a/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/SqlSpanDecorator.java b/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/SqlSpanDecorator.java index f25517e0e82..99409aeb397 100644 --- a/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/SqlSpanDecorator.java +++ b/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/SqlSpanDecorator.java @@ -38,7 +38,7 @@ public class SqlSpanDecorator extends AbstractSpanDecorator { @Override public void pre(SpanAdapter span, Exchange exchange, Endpoint endpoint) { super.pre(span, exchange, endpoint); - span.setTag(Tag.DB_TYPE, "sql"); + span.setLowCardinalityTag(Tag.DB_TYPE, "sql"); String sqlquery = exchange.getIn().getHeader(CAMEL_SQL_QUERY, String.class); if (sqlquery != null) { diff --git a/components/pom.xml b/components/pom.xml index ca126418829..ee678b98c8f 100644 --- a/components/pom.xml +++ b/components/pom.xml @@ -238,6 +238,7 @@ <module>camel-netty</module> <module>camel-nitrite</module> <module>camel-oaipmh</module> + <module>camel-observation</module> <module>camel-ognl</module> <module>camel-olingo2</module> <module>camel-olingo4</module> diff --git a/docs/components/modules/others/examples/json/observation.json b/docs/components/modules/others/examples/json/observation.json new file mode 120000 index 00000000000..c68d4ab8072 --- /dev/null +++ b/docs/components/modules/others/examples/json/observation.json @@ -0,0 +1 @@ +../../../../../../components/camel-observation/src/generated/resources/observation.json \ No newline at end of file diff --git a/docs/components/modules/others/nav.adoc b/docs/components/modules/others/nav.adoc index 8bbcbfdda04..2e870f96d1b 100644 --- a/docs/components/modules/others/nav.adoc +++ b/docs/components/modules/others/nav.adoc @@ -34,6 +34,7 @@ ** xref:lra.adoc[LRA] ** xref:mail-microsoft-oauth.adoc[Mail Microsoft Oauth] ** xref:main.adoc[Main] +** xref:observation.adoc[Micrometer Observability] ** xref:microprofile-config.adoc[Microprofile Config] ** xref:microprofile-fault-tolerance.adoc[Microprofile Fault Tolerance] ** xref:microprofile-health.adoc[Microprofile Health] diff --git a/docs/components/modules/others/pages/observation.adoc b/docs/components/modules/others/pages/observation.adoc new file mode 120000 index 00000000000..ba78c4f085c --- /dev/null +++ b/docs/components/modules/others/pages/observation.adoc @@ -0,0 +1 @@ +../../../../../components/camel-observation/src/main/docs/observation.adoc \ No newline at end of file diff --git a/parent/pom.xml b/parent/pom.xml index 70dc66c979f..e118e4f00f8 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -392,7 +392,8 @@ <maven-wagon-version>3.5.2</maven-wagon-version> <maven-war-plugin-version>3.3.1</maven-war-plugin-version> <metrics-version>4.2.15</metrics-version> - <micrometer-version>1.10.2</micrometer-version> + <micrometer-version>1.10.4</micrometer-version> + <micrometer-tracing-version>1.0.2</micrometer-tracing-version> <microprofile-config-version>2.0.1</microprofile-config-version> <microprofile-metrics-version>3.0.1</microprofile-metrics-version> <microprofile-fault-tolerance-version>3.0</microprofile-fault-tolerance-version> @@ -2000,6 +2001,11 @@ <artifactId>camel-oaipmh</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-observation</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-ognl</artifactId>