This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch 3.8.x
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git

commit 876c9a2ede126a4ef9dc3d929d53b1b835a7d7ff
Author: James Netherton <jamesnether...@gmail.com>
AuthorDate: Mon Oct 21 14:53:17 2024 +0100

    Work around Camel OpenTelemetry thread factory services not being 
compatible with Quarkus OpenTelemetry
---
 .../deployment/OpenTelemetryProcessor.java         |  29 ++++++
 ...TelemetryInstrumentedThreadFactoryListener.java |  35 +++++++
 ...OpenTelemetryInstrumentedThreadPoolFactory.java |  73 +++++++++++++++
 .../CurrentContextScheduledExecutorService.java    | 101 +++++++++++++++++++++
 .../ForwardingScheduledExecutorService.java        |  75 +++++++++++++++
 5 files changed, 313 insertions(+)

diff --git 
a/extensions/opentelemetry/deployment/src/main/java/org/apache/camel/quarkus/component/opentelemetry/deployment/OpenTelemetryProcessor.java
 
b/extensions/opentelemetry/deployment/src/main/java/org/apache/camel/quarkus/component/opentelemetry/deployment/OpenTelemetryProcessor.java
index 1040841541..3c16a81894 100644
--- 
a/extensions/opentelemetry/deployment/src/main/java/org/apache/camel/quarkus/component/opentelemetry/deployment/OpenTelemetryProcessor.java
+++ 
b/extensions/opentelemetry/deployment/src/main/java/org/apache/camel/quarkus/component/opentelemetry/deployment/OpenTelemetryProcessor.java
@@ -16,11 +16,19 @@
  */
 package org.apache.camel.quarkus.component.opentelemetry.deployment;
 
+import java.nio.file.Paths;
+import java.util.Map;
+
 import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
+import io.quarkus.deployment.annotations.BuildProducer;
 import io.quarkus.deployment.annotations.BuildStep;
 import io.quarkus.deployment.builditem.FeatureBuildItem;
 import io.quarkus.opentelemetry.deployment.tracing.TracerEnabled;
 import 
org.apache.camel.quarkus.component.opentelemetry.OpenTelemetryTracerProducer;
+import org.apache.camel.quarkus.core.deployment.spi.CamelServiceBuildItem;
+import org.apache.camel.quarkus.core.deployment.spi.CamelServiceDestination;
+import 
org.apache.camel.quarkus.core.deployment.spi.CamelServicePatternBuildItem;
+import org.apache.camel.spi.FactoryFinder;
 
 class OpenTelemetryProcessor {
 
@@ -38,4 +46,25 @@ class OpenTelemetryProcessor {
                 .addBeanClass(OpenTelemetryTracerProducer.class)
                 .build();
     }
+
+    // TODO: Remove this: https://github.com/apache/camel-quarkus/issues/6669
+    @BuildStep
+    void overrideCamelOpenTelemetryThreadPoolServices(
+            BuildProducer<CamelServicePatternBuildItem> camelServicePattern,
+            BuildProducer<CamelServiceBuildItem> camelService) {
+
+        Map.of("thread-pool-factory", 
"OpenTelemetryInstrumentedThreadPoolFactory",
+                "thread-factory-listener", 
"OpenTelemetryInstrumentedThreadFactoryListener")
+                .forEach((serviceName, type) -> {
+                    String servicePath = FactoryFinder.DEFAULT_PATH + 
serviceName;
+                    // Disable broken original service
+                    camelServicePattern
+                            .produce(new 
CamelServicePatternBuildItem(CamelServiceDestination.DISCOVERY, false, 
servicePath));
+
+                    // Replace with working
+                    camelService.produce(new 
CamelServiceBuildItem(Paths.get(servicePath),
+                            
"org.apache.camel.quarkus.component.opentelemetry.patch.%s".formatted(type)));
+                });
+
+    }
 }
diff --git 
a/extensions/opentelemetry/runtime/src/main/java/org/apache/camel/quarkus/component/opentelemetry/patch/OpenTelemetryInstrumentedThreadFactoryListener.java
 
b/extensions/opentelemetry/runtime/src/main/java/org/apache/camel/quarkus/component/opentelemetry/patch/OpenTelemetryInstrumentedThreadFactoryListener.java
new file mode 100644
index 0000000000..655d5a39be
--- /dev/null
+++ 
b/extensions/opentelemetry/runtime/src/main/java/org/apache/camel/quarkus/component/opentelemetry/patch/OpenTelemetryInstrumentedThreadFactoryListener.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.opentelemetry.patch;
+
+import java.util.concurrent.ThreadFactory;
+
+import io.opentelemetry.context.Context;
+import org.apache.camel.spi.ExecutorServiceManager;
+import org.apache.camel.spi.annotations.JdkService;
+
+/**
+ * TODO: Remove this: https://github.com/apache/camel-quarkus/issues/6669
+ */
+@JdkService(ExecutorServiceManager.ThreadFactoryListener.FACTORY)
+public class OpenTelemetryInstrumentedThreadFactoryListener implements 
ExecutorServiceManager.ThreadFactoryListener {
+
+    @Override
+    public ThreadFactory onNewThreadFactory(ThreadFactory factory) {
+        return runnable -> factory.newThread(Context.current().wrap(runnable));
+    }
+}
diff --git 
a/extensions/opentelemetry/runtime/src/main/java/org/apache/camel/quarkus/component/opentelemetry/patch/OpenTelemetryInstrumentedThreadPoolFactory.java
 
b/extensions/opentelemetry/runtime/src/main/java/org/apache/camel/quarkus/component/opentelemetry/patch/OpenTelemetryInstrumentedThreadPoolFactory.java
new file mode 100644
index 0000000000..f379046b66
--- /dev/null
+++ 
b/extensions/opentelemetry/runtime/src/main/java/org/apache/camel/quarkus/component/opentelemetry/patch/OpenTelemetryInstrumentedThreadPoolFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.opentelemetry.patch;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import io.opentelemetry.context.Context;
+import 
org.apache.camel.quarkus.component.opentelemetry.patch.internal.CurrentContextScheduledExecutorService;
+import org.apache.camel.spi.ThreadPoolFactory;
+import org.apache.camel.spi.ThreadPoolProfile;
+import org.apache.camel.spi.annotations.JdkService;
+import org.apache.camel.support.DefaultThreadPoolFactory;
+
+/**
+ * TODO: Remove this: https://github.com/apache/camel-quarkus/issues/6669
+ */
+@JdkService(ThreadPoolFactory.FACTORY)
+public class OpenTelemetryInstrumentedThreadPoolFactory extends 
DefaultThreadPoolFactory implements ThreadPoolFactory {
+
+    @Override
+    public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
+        return Context.taskWrapping(super.newCachedThreadPool(threadFactory));
+    }
+
+    @Override
+    public ExecutorService newThreadPool(
+            int corePoolSize,
+            int maxPoolSize,
+            long keepAliveTime,
+            TimeUnit timeUnit,
+            int maxQueueSize,
+            boolean allowCoreThreadTimeOut,
+            RejectedExecutionHandler rejectedExecutionHandler,
+            ThreadFactory threadFactory)
+            throws IllegalArgumentException {
+
+        ExecutorService executorService = super.newThreadPool(
+                corePoolSize,
+                maxPoolSize,
+                keepAliveTime,
+                timeUnit,
+                maxQueueSize,
+                allowCoreThreadTimeOut,
+                rejectedExecutionHandler,
+                threadFactory);
+
+        return Context.taskWrapping(executorService);
+    }
+
+    @Override
+    public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile 
profile, ThreadFactory threadFactory) {
+        return new 
CurrentContextScheduledExecutorService(super.newScheduledThreadPool(profile, 
threadFactory));
+    }
+
+}
diff --git 
a/extensions/opentelemetry/runtime/src/main/java/org/apache/camel/quarkus/component/opentelemetry/patch/internal/CurrentContextScheduledExecutorService.java
 
b/extensions/opentelemetry/runtime/src/main/java/org/apache/camel/quarkus/component/opentelemetry/patch/internal/CurrentContextScheduledExecutorService.java
new file mode 100644
index 0000000000..7e499657db
--- /dev/null
+++ 
b/extensions/opentelemetry/runtime/src/main/java/org/apache/camel/quarkus/component/opentelemetry/patch/internal/CurrentContextScheduledExecutorService.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.opentelemetry.patch.internal;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import io.opentelemetry.context.Context;
+
+/**
+ * TODO: Remove this: https://github.com/apache/camel-quarkus/issues/6669
+ */
+public class CurrentContextScheduledExecutorService extends 
ForwardingScheduledExecutorService {
+
+    public CurrentContextScheduledExecutorService(ScheduledExecutorService 
delegate) {
+        super(delegate);
+    }
+
+    @Override
+    public <T> Future<T> submit(Callable<T> task) {
+        return delegate().submit(Context.current().wrap(task));
+    }
+
+    @Override
+    public <T> Future<T> submit(Runnable task, T result) {
+        return delegate().submit(Context.current().wrap(task), result);
+    }
+
+    @Override
+    public Future<?> submit(Runnable task) {
+        return delegate().submit(Context.current().wrap(task));
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks) throws InterruptedException {
+        return delegate().invokeAll(wrap(Context.current(), tasks));
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks, long timeout, TimeUnit unit)
+            throws InterruptedException {
+        return delegate().invokeAll(wrap(Context.current(), tasks), timeout, 
unit);
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws 
InterruptedException, ExecutionException {
+        return delegate().invokeAny(wrap(Context.current(), tasks));
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long 
timeout, TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        return delegate().invokeAny(wrap(Context.current(), tasks), timeout, 
unit);
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        delegate().execute(Context.current().wrap(command));
+    }
+
+    @Override
+    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit 
unit) {
+        return delegate().schedule(Context.current().wrap(command), delay, 
unit);
+    }
+
+    @Override
+    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, 
TimeUnit unit) {
+        return delegate().schedule(Context.current().wrap(callable), delay, 
unit);
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long 
initialDelay, long period, TimeUnit unit) {
+        return delegate().scheduleAtFixedRate(Context.current().wrap(command), 
initialDelay, period, unit);
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long 
initialDelay, long delay, TimeUnit unit) {
+        return 
delegate().scheduleWithFixedDelay(Context.current().wrap(command), 
initialDelay, delay, unit);
+    }
+}
diff --git 
a/extensions/opentelemetry/runtime/src/main/java/org/apache/camel/quarkus/component/opentelemetry/patch/internal/ForwardingScheduledExecutorService.java
 
b/extensions/opentelemetry/runtime/src/main/java/org/apache/camel/quarkus/component/opentelemetry/patch/internal/ForwardingScheduledExecutorService.java
new file mode 100644
index 0000000000..e6f3d05fe6
--- /dev/null
+++ 
b/extensions/opentelemetry/runtime/src/main/java/org/apache/camel/quarkus/component/opentelemetry/patch/internal/ForwardingScheduledExecutorService.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.opentelemetry.patch.internal;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import io.opentelemetry.context.Context;
+
+/**
+ * TODO: Remove this: https://github.com/apache/camel-quarkus/issues/6669
+ */
+abstract class ForwardingScheduledExecutorService implements 
ScheduledExecutorService {
+
+    private final ScheduledExecutorService delegate;
+
+    protected ForwardingScheduledExecutorService(ScheduledExecutorService 
delegate) {
+        this.delegate = delegate;
+    }
+
+    ScheduledExecutorService delegate() {
+        return delegate;
+    }
+
+    @Override
+    public void shutdown() {
+        delegate.shutdown();
+    }
+
+    @Override
+    public List<Runnable> shutdownNow() {
+        return delegate.shutdownNow();
+    }
+
+    @Override
+    public boolean isShutdown() {
+        return delegate.isShutdown();
+    }
+
+    @Override
+    public boolean isTerminated() {
+        return delegate.isTerminated();
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+        return delegate.awaitTermination(timeout, unit);
+    }
+
+    protected static <T> Collection<? extends Callable<T>> wrap(Context 
context, Collection<? extends Callable<T>> tasks) {
+        List<Callable<T>> wrapped = new ArrayList<>();
+        for (Callable<T> task : tasks) {
+            wrapped.add(context.wrap(task));
+        }
+        return wrapped;
+    }
+}

Reply via email to