This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-4.4.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.4.x by this push: new 68face50bcf CAMEL-21255: camel-core - Add listener for creating ThreadFactory in … (#15679) 68face50bcf is described below commit 68face50bcf9397961033d1e851cac3a3caae954 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Sep 25 13:34:44 2024 +0200 CAMEL-21255: camel-core - Add listener for creating ThreadFactory in … (#15679) * CAMEL-21255: camel-core - Add listener for creating ThreadFactory in ExecutorServiceManager --- .../apache/camel/spi/ExecutorServiceManager.java | 29 ++++++++++++++ .../impl/engine/BaseExecutorServiceManager.java | 31 ++++++++++++++- .../impl/DefaultExecutorServiceManagerTest.java | 45 ++++++++++++++++++++++ 3 files changed, 103 insertions(+), 2 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java index 5a4726914f7..f2e2b1a1c83 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java @@ -19,6 +19,7 @@ package org.apache.camel.spi; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import org.apache.camel.ShutdownableService; import org.apache.camel.StaticService; @@ -52,6 +53,34 @@ import org.apache.camel.StaticService; */ public interface ExecutorServiceManager extends ShutdownableService, StaticService { + /** + * Listener when a new {@link ThreadFactory} is created, which allows to plugin custom behaviour. + */ + @FunctionalInterface + interface ThreadFactoryListener { + + /** + * Service factory key. + */ + String FACTORY = "thread-factory-listener"; + + /** + * Listener when Camel has created a new {@link ThreadFactory} to be used by this + * {@link ExecutorServiceManager}. + * + * @param factory the created factory + * @return the factory to use by this {@link ExecutorServiceManager}. + */ + ThreadFactory onNewThreadFactory(ThreadFactory factory); + } + + /** + * Adds a custom {@link ThreadFactoryListener} to use + * + * @param threadFactoryListener the thread factory listener + */ + void addThreadFactoryListener(ThreadFactoryListener threadFactoryListener); + /** * Gets the {@link ThreadPoolFactory} to use for creating the thread pools. * diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java index 47a7d1885f4..886938e911f 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java @@ -39,6 +39,7 @@ import org.apache.camel.spi.ThreadPoolFactory; import org.apache.camel.spi.ThreadPoolProfile; import org.apache.camel.support.CamelContextHelper; import org.apache.camel.support.DefaultThreadPoolFactory; +import org.apache.camel.support.OrderedComparator; import org.apache.camel.support.ResolverHelper; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.support.service.ServiceSupport; @@ -63,6 +64,7 @@ public class BaseExecutorServiceManager extends ServiceSupport implements Execut private final CamelContext camelContext; private final List<ExecutorService> executorServices = new CopyOnWriteArrayList<>(); private final Map<String, ThreadPoolProfile> threadPoolProfiles = new ConcurrentHashMap<>(); + private final List<ThreadFactoryListener> threadFactoryListeners = new CopyOnWriteArrayList<>(); private ThreadPoolFactory threadPoolFactory; private String threadNamePattern; private long shutdownAwaitTermination = 10000; @@ -89,6 +91,11 @@ public class BaseExecutorServiceManager extends ServiceSupport implements Execut return camelContext; } + @Override + public void addThreadFactoryListener(ThreadFactoryListener threadFactoryListener) { + threadFactoryListeners.add(threadFactoryListener); + } + @Override public ThreadPoolFactory getThreadPoolFactory() { return threadPoolFactory; @@ -445,11 +452,26 @@ public class BaseExecutorServiceManager extends ServiceSupport implements Execut } CamelContextAware.trySetCamelContext(threadPoolFactory, camelContext); ServiceHelper.initService(threadPoolFactory); + + // discover custom thread factory listener via Camel factory finder + ResolverHelper.resolveService( + camelContext, + camelContext.getCamelContextExtension().getBootstrapFactoryFinder(), + ThreadFactoryListener.FACTORY, + ThreadFactoryListener.class).ifPresent(this::addThreadFactoryListener); } @Override protected void doStart() throws Exception { super.doStart(); + + Set<ThreadFactoryListener> listeners = camelContext.getRegistry().findByType(ThreadFactoryListener.class); + if (listeners != null && !listeners.isEmpty()) { + threadFactoryListeners.addAll(listeners); + } + if (!threadFactoryListeners.isEmpty()) { + threadFactoryListeners.sort(OrderedComparator.get()); + } ServiceHelper.startService(threadPoolFactory); } @@ -502,6 +524,7 @@ public class BaseExecutorServiceManager extends ServiceSupport implements Execut } ServiceHelper.stopAndShutdownServices(threadPoolFactory); + threadFactoryListeners.clear(); } /** @@ -566,8 +589,12 @@ public class BaseExecutorServiceManager extends ServiceSupport implements Execut onNewExecutorService(executorService); } - protected ThreadFactory createThreadFactory(String name, boolean isDaemon) { - return new CamelThreadFactory(threadNamePattern, name, isDaemon); + protected ThreadFactory createThreadFactory(String name, boolean daemon) { + ThreadFactory factory = new CamelThreadFactory(threadNamePattern, name, daemon); + for (ThreadFactoryListener listener : threadFactoryListeners) { + factory = listener.onNewThreadFactory(factory); + } + return factory; } } diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java index 8d3f98ef00b..f440d6311a2 100644 --- a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java @@ -18,10 +18,13 @@ package org.apache.camel.impl; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.camel.CamelContext; import org.apache.camel.ContextTestSupport; +import org.apache.camel.spi.ExecutorServiceManager; import org.apache.camel.spi.ThreadPoolProfile; import org.apache.camel.util.concurrent.SizedScheduledExecutorService; import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy; @@ -563,4 +566,46 @@ public class DefaultExecutorServiceManagerTest extends ContextTestSupport { assertTrue(pool.isTerminated()); } + @Test + public void testThreadFactoryListener() { + // custom thread factory + ThreadFactory myFactory = r -> new Thread(r, "MyFactory"); + // hook custom factory into Camel + context.getExecutorServiceManager().addThreadFactoryListener(factory -> myFactory); + // create thread + Thread thread = context.getExecutorServiceManager().newThread("Cool", () -> { + // noop + }); + + assertNotNull(thread); + assertTrue(thread.isDaemon()); + // should be created by custom factory instead of Camel + assertTrue(thread.getName().contains("MyFactory")); + } + + @Test + public void testThreadFactoryListenerViaRegistry() { + // create another CamelContext as camelContext is already started in this test-class + CamelContext c = new DefaultCamelContext(); + + // custom thread factory + ThreadFactory myFactory = r -> new Thread(r, "MyFactory2"); + // hook custom factory into Camel via registry + ExecutorServiceManager.ThreadFactoryListener listener = factory -> myFactory; + c.getRegistry().bind("myListener", listener); + c.start(); + + // create thread + Thread thread = c.getExecutorServiceManager().newThread("Cool2", () -> { + // noop + }); + + assertNotNull(thread); + assertTrue(thread.isDaemon()); + // should be created by custom factory instead of Camel + assertTrue(thread.getName().contains("MyFactory2")); + + c.stop(); + } + }