This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-4.4.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 8ea3633404e8ea900d4b8729b071f4f1a8d46aa7 Author: Adriano Machado <60320+ammach...@users.noreply.github.com> AuthorDate: Thu Oct 3 02:59:33 2024 -0400 CAMEL-21202 - Add a ThreadPoolFactory to propagate OpenTelemetry contexts (#15496) * CAMEL-21202 - Add a ThreadPoolFactory to propagate OpenTelemetry contexts Signed-off-by: Adriano Machado <60320+ammach...@users.noreply.github.com> * CAMEL-21202 - Adding missing implementation for SheduledExecutorService context propagation * CAMEL-21202 - Adding the instrumented thread pool factory to the tests * Add strict checking for OpenTelemetry contexts * Add ThreadFactoryListener implementation for OTEL context propagation --------- Signed-off-by: Adriano Machado <60320+ammach...@users.noreply.github.com> --- components/camel-opentelemetry/pom.xml | 57 +++++++++++-- .../org/apache/camel/thread-factory-listener | 2 + .../services/org/apache/camel/thread-pool-factory | 2 + ...TelemetryInstrumentedThreadFactoryListener.java | 32 +++++++ ...OpenTelemetryInstrumentedThreadPoolFactory.java | 70 ++++++++++++++++ .../CurrentContextScheduledExecutorService.java | 98 ++++++++++++++++++++++ .../ForwardingScheduledExecutorService.java | 72 ++++++++++++++++ .../camel/opentelemetry/internal/package-info.java | 20 +++++ .../CamelOpenTelemetryTestSupport.java | 7 ++ .../org/apache/camel/spi/ThreadPoolFactory.java | 7 +- 10 files changed, 357 insertions(+), 10 deletions(-) diff --git a/components/camel-opentelemetry/pom.xml b/components/camel-opentelemetry/pom.xml index cfebf1f6551..16501b8c75b 100644 --- a/components/camel-opentelemetry/pom.xml +++ b/components/camel-opentelemetry/pom.xml @@ -45,6 +45,46 @@ <camel.surefire.fork.additional-vmargs>-Xmx1G</camel.surefire.fork.additional-vmargs> </properties> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-sdk</artifactId> + <version>${opentelemetry-version}</version> + </dependency> + <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-api</artifactId> + <version>${opentelemetry-version}</version> + </dependency> + <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-context</artifactId> + <version>${opentelemetry-version}</version> + </dependency> + <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-semconv</artifactId> + <version>${opentelemetry-alpha-version}</version> + </dependency> + <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-sdk-testing</artifactId> + <version>${opentelemetry-version}</version> + </dependency> + <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-sdk-extension-incubator</artifactId> + <version>${opentelemetry-incubator-version}</version> + </dependency> + <dependency> + <groupId>io.opentelemetry.instrumentation</groupId> + <artifactId>opentelemetry-log4j-appender-2.17</artifactId> + <version>${opentelemetry-log4j2-version}</version> + </dependency> + </dependencies> + </dependencyManagement> + <dependencies> <dependency> <groupId>org.apache.camel</groupId> @@ -72,17 +112,18 @@ <dependency> <groupId>io.opentelemetry</groupId> <artifactId>opentelemetry-sdk</artifactId> - <version>${opentelemetry-version}</version> </dependency> <dependency> <groupId>io.opentelemetry</groupId> <artifactId>opentelemetry-api</artifactId> - <version>${opentelemetry-version}</version> + </dependency> + <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-context</artifactId> </dependency> <dependency> <groupId>io.opentelemetry</groupId> <artifactId>opentelemetry-semconv</artifactId> - <version>${opentelemetry-alpha-version}</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> @@ -92,19 +133,16 @@ <dependency> <groupId>io.opentelemetry</groupId> <artifactId>opentelemetry-sdk-testing</artifactId> - <version>${opentelemetry-version}</version> <scope>test</scope> </dependency> <dependency> <groupId>io.opentelemetry</groupId> <artifactId>opentelemetry-sdk-extension-incubator</artifactId> - <version>${opentelemetry-incubator-version}</version> <scope>test</scope> </dependency> <dependency> <groupId>io.opentelemetry.instrumentation</groupId> <artifactId>opentelemetry-log4j-appender-2.17</artifactId> - <version>${opentelemetry-log4j2-version}</version> <scope>test</scope> </dependency> <dependency> @@ -113,7 +151,12 @@ <version>${awaitility-version}</version> <scope>test</scope> </dependency> - + <dependency> + <groupId>org.junit-pioneer</groupId> + <artifactId>junit-pioneer</artifactId> + <version>${junit-pioneer-version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/components/camel-opentelemetry/src/generated/resources/META-INF/services/org/apache/camel/thread-factory-listener b/components/camel-opentelemetry/src/generated/resources/META-INF/services/org/apache/camel/thread-factory-listener new file mode 100644 index 00000000000..312d74578dd --- /dev/null +++ b/components/camel-opentelemetry/src/generated/resources/META-INF/services/org/apache/camel/thread-factory-listener @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.opentelemetry.OpenTelemetryInstrumentedThreadFactoryListener diff --git a/components/camel-opentelemetry/src/generated/resources/META-INF/services/org/apache/camel/thread-pool-factory b/components/camel-opentelemetry/src/generated/resources/META-INF/services/org/apache/camel/thread-pool-factory new file mode 100644 index 00000000000..b9b12881ac6 --- /dev/null +++ b/components/camel-opentelemetry/src/generated/resources/META-INF/services/org/apache/camel/thread-pool-factory @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.opentelemetry.OpenTelemetryInstrumentedThreadPoolFactory diff --git a/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryInstrumentedThreadFactoryListener.java b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryInstrumentedThreadFactoryListener.java new file mode 100644 index 00000000000..ef5bb9459cd --- /dev/null +++ b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryInstrumentedThreadFactoryListener.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.opentelemetry; + +import java.util.concurrent.ThreadFactory; + +import io.opentelemetry.context.Context; +import org.apache.camel.spi.ExecutorServiceManager; +import org.apache.camel.spi.annotations.JdkService; + +@JdkService(ExecutorServiceManager.ThreadFactoryListener.FACTORY) +public class OpenTelemetryInstrumentedThreadFactoryListener implements ExecutorServiceManager.ThreadFactoryListener { + + @Override + public ThreadFactory onNewThreadFactory(ThreadFactory factory) { + return runnable -> factory.newThread(Context.current().wrap(runnable)); + } +} diff --git a/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryInstrumentedThreadPoolFactory.java b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryInstrumentedThreadPoolFactory.java new file mode 100644 index 00000000000..a0b450f16d6 --- /dev/null +++ b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryInstrumentedThreadPoolFactory.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.opentelemetry; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import io.opentelemetry.context.Context; +import org.apache.camel.opentelemetry.internal.CurrentContextScheduledExecutorService; +import org.apache.camel.spi.ThreadPoolFactory; +import org.apache.camel.spi.ThreadPoolProfile; +import org.apache.camel.spi.annotations.JdkService; +import org.apache.camel.support.DefaultThreadPoolFactory; + +@JdkService(ThreadPoolFactory.FACTORY) +public class OpenTelemetryInstrumentedThreadPoolFactory extends DefaultThreadPoolFactory implements ThreadPoolFactory { + + @Override + public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { + return Context.taskWrapping(super.newCachedThreadPool(threadFactory)); + } + + @Override + public ExecutorService newThreadPool( + int corePoolSize, + int maxPoolSize, + long keepAliveTime, + TimeUnit timeUnit, + int maxQueueSize, + boolean allowCoreThreadTimeOut, + RejectedExecutionHandler rejectedExecutionHandler, + ThreadFactory threadFactory) + throws IllegalArgumentException { + + ExecutorService executorService = super.newThreadPool( + corePoolSize, + maxPoolSize, + keepAliveTime, + timeUnit, + maxQueueSize, + allowCoreThreadTimeOut, + rejectedExecutionHandler, + threadFactory); + + return Context.taskWrapping(executorService); + } + + @Override + public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) { + return new CurrentContextScheduledExecutorService(super.newScheduledThreadPool(profile, threadFactory)); + } + +} diff --git a/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/internal/CurrentContextScheduledExecutorService.java b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/internal/CurrentContextScheduledExecutorService.java new file mode 100644 index 00000000000..ad150fb8a8f --- /dev/null +++ b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/internal/CurrentContextScheduledExecutorService.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.opentelemetry.internal; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import io.opentelemetry.context.Context; + +public class CurrentContextScheduledExecutorService extends ForwardingScheduledExecutorService { + + public CurrentContextScheduledExecutorService(ScheduledExecutorService delegate) { + super(delegate); + } + + @Override + public <T> Future<T> submit(Callable<T> task) { + return delegate().submit(Context.current().wrap(task)); + } + + @Override + public <T> Future<T> submit(Runnable task, T result) { + return delegate().submit(Context.current().wrap(task), result); + } + + @Override + public Future<?> submit(Runnable task) { + return delegate().submit(Context.current().wrap(task)); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { + return delegate().invokeAll(wrap(Context.current(), tasks)); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + return delegate().invokeAll(wrap(Context.current(), tasks), timeout, unit); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { + return delegate().invokeAny(wrap(Context.current(), tasks)); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return delegate().invokeAny(wrap(Context.current(), tasks), timeout, unit); + } + + @Override + public void execute(Runnable command) { + delegate().execute(Context.current().wrap(command)); + } + + @Override + public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { + return delegate().schedule(Context.current().wrap(command), delay, unit); + } + + @Override + public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { + return delegate().schedule(Context.current().wrap(callable), delay, unit); + } + + @Override + public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return delegate().scheduleAtFixedRate(Context.current().wrap(command), initialDelay, period, unit); + } + + @Override + public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return delegate().scheduleWithFixedDelay(Context.current().wrap(command), initialDelay, delay, unit); + } +} diff --git a/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/internal/ForwardingScheduledExecutorService.java b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/internal/ForwardingScheduledExecutorService.java new file mode 100644 index 00000000000..d3873edc4dd --- /dev/null +++ b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/internal/ForwardingScheduledExecutorService.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.opentelemetry.internal; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import io.opentelemetry.context.Context; + +abstract class ForwardingScheduledExecutorService implements ScheduledExecutorService { + + private final ScheduledExecutorService delegate; + + protected ForwardingScheduledExecutorService(ScheduledExecutorService delegate) { + this.delegate = delegate; + } + + ScheduledExecutorService delegate() { + return delegate; + } + + @Override + public void shutdown() { + delegate.shutdown(); + } + + @Override + public List<Runnable> shutdownNow() { + return delegate.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return delegate.isShutdown(); + } + + @Override + public boolean isTerminated() { + return delegate.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return delegate.awaitTermination(timeout, unit); + } + + protected static <T> Collection<? extends Callable<T>> wrap(Context context, Collection<? extends Callable<T>> tasks) { + List<Callable<T>> wrapped = new ArrayList<>(); + for (Callable<T> task : tasks) { + wrapped.add(context.wrap(task)); + } + return wrapped; + } +} diff --git a/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/internal/package-info.java b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/internal/package-info.java new file mode 100644 index 00000000000..5c463e79ac0 --- /dev/null +++ b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/internal/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Classes in this package are missing implementation from OpenTelemetry. + */ +package org.apache.camel.opentelemetry.internal; diff --git a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java index 26977d6d8f8..6c3f39a1a07 100644 --- a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java +++ b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java @@ -33,9 +33,11 @@ import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.logs.data.LogRecordData; import io.opentelemetry.sdk.trace.data.SpanData; +import org.apache.camel.BindToRegistry; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.spi.InterceptStrategy; +import org.apache.camel.spi.ThreadPoolFactory; import org.apache.camel.test.junit5.CamelTestSupport; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; @@ -43,6 +45,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Tags; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junitpioneer.jupiter.SetSystemProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +53,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @Tags({ @Tag("not-parallel") }) +@SetSystemProperty(key = "io.opentelemetry.context.enableStrictContext", value = "true") class CamelOpenTelemetryTestSupport extends CamelTestSupport { static final AttributeKey<String> CAMEL_URI_KEY = AttributeKey.stringKey("camel-uri"); static final AttributeKey<String> COMPONENT_KEY = AttributeKey.stringKey("component"); @@ -63,6 +67,9 @@ class CamelOpenTelemetryTestSupport extends CamelTestSupport { @RegisterExtension public final CamelOpenTelemetryExtension otelExtension = CamelOpenTelemetryExtension.create(); + @BindToRegistry + ThreadPoolFactory threadPoolFactory = new OpenTelemetryInstrumentedThreadPoolFactory(); + SpanTestData[] expected; Tracer tracer; OpenTelemetryTracer otTracer; diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ThreadPoolFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ThreadPoolFactory.java index 3820ddd4d24..5336e984b10 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/ThreadPoolFactory.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ThreadPoolFactory.java @@ -24,7 +24,7 @@ import java.util.concurrent.ThreadFactory; * Creates ExecutorService and ScheduledExecutorService objects that work with a thread pool for a given * ThreadPoolProfile and ThreadFactory. * - * This interface allows to customize the creation of these objects to adapt camel for application servers and other + * This interface allows customizing the creation of these objects to adapt camel for application servers and other * environments where thread pools should not be created with the jdk methods */ public interface ThreadPoolFactory { @@ -38,8 +38,9 @@ public interface ThreadPoolFactory { * Creates a new cached thread pool * <p/> * The cached thread pool is a term from the JDK from the method - * {@link java.util.concurrent.Executors#newCachedThreadPool()}. Typically it will have no size limit (this is why - * it is handled separately + * {@link java.util.concurrent.Executors#newCachedThreadPool()}. + * <p/> + * Typically, it will have no size limit (this is why it is handled separately) * * @param threadFactory factory for creating threads * @return the created thread pool