This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 501498d9c8620dd825f94bb177ff63719f2d18d9 Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Sat Apr 1 15:58:05 2023 +0200 CAMEL-15105: make the AsyncProcessorAwaitManager a plugin of the context --- .../java/org/apache/camel/ExtendedCamelContext.java | 16 ---------------- .../camel/impl/engine/AbstractCamelContext.java | 8 ++------ .../impl/engine/DefaultCamelContextExtension.java | 19 ------------------- .../impl/engine/SharedCamelInternalProcessor.java | 3 ++- .../apache/camel/impl/console/BlockedConsole.java | 5 +++-- .../camel/impl/ExtendedCamelContextConfigurer.java | 6 ------ .../impl/lw/LightweightCamelContextExtension.java | 13 ------------- .../errorhandler/RedeliveryErrorHandler.java | 2 +- .../core/xml/AbstractCamelContextFactoryBean.java | 2 +- .../processor/async/AsyncEndpointPolicyTest.java | 3 ++- .../AsyncProcessorAwaitManagerInterruptTest.java | 21 ++++++++++++--------- ...ssorAwaitManagerInterruptWithRedeliveryTest.java | 19 +++++++++++-------- .../async/AsyncProcessorAwaitManagerTest.java | 16 +++++++++------- .../camel/main/DefaultConfigurationConfigurer.java | 2 +- .../apache/camel/support/AsyncProcessorHelper.java | 2 +- .../apache/camel/support/AsyncProcessorSupport.java | 2 +- .../apache/camel/support/DefaultAsyncProducer.java | 2 +- .../java/org/apache/camel/support/PluginHelper.java | 20 +++++++++++++++++++- 18 files changed, 66 insertions(+), 95 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java index e8346448418..16dbcc3c229 100644 --- a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java +++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java @@ -23,7 +23,6 @@ import java.util.function.Supplier; import org.apache.camel.catalog.RuntimeCamelCatalog; import org.apache.camel.spi.AnnotationBasedProcessorFactory; -import org.apache.camel.spi.AsyncProcessorAwaitManager; import org.apache.camel.spi.BeanIntrospection; import org.apache.camel.spi.BeanProcessorFactory; import org.apache.camel.spi.BeanProxyFactory; @@ -49,7 +48,6 @@ import org.apache.camel.spi.ResourceLoader; import org.apache.camel.spi.RestBindingJaxbDataFormatFactory; import org.apache.camel.spi.RouteController; import org.apache.camel.spi.RouteStartupOrder; -import org.apache.camel.spi.RoutesLoader; import org.apache.camel.spi.StartupStepRecorder; import org.apache.camel.spi.UnitOfWorkFactory; @@ -384,20 +382,6 @@ public interface ExtendedCamelContext { */ void addLogListener(LogListener listener); - /** - * Gets the {@link org.apache.camel.AsyncProcessor} await manager. - * - * @return the manager - */ - AsyncProcessorAwaitManager getAsyncProcessorAwaitManager(); - - /** - * Sets a custom {@link org.apache.camel.AsyncProcessor} await manager. - * - * @param manager the manager - */ - void setAsyncProcessorAwaitManager(AsyncProcessorAwaitManager manager); - /** * Gets the {@link BeanIntrospection} */ diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java index a177273fd49..36aecfecdc5 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java @@ -221,7 +221,6 @@ public abstract class AbstractCamelContext extends BaseService volatile ModelToXMLDumper modelToXMLDumper; volatile RestBindingJaxbDataFormatFactory restBindingJaxbDataFormatFactory; volatile RuntimeCamelCatalog runtimeCamelCatalog; - volatile AsyncProcessorAwaitManager asyncProcessorAwaitManager; volatile UnitOfWorkFactory unitOfWorkFactory; volatile BeanIntrospection beanIntrospection; volatile boolean eventNotificationApplicable; @@ -381,6 +380,7 @@ public abstract class AbstractCamelContext extends BaseService camelContextExtension.lazyAddContextPlugin(InterceptEndpointFactory.class, this::createInterceptEndpointFactory); camelContextExtension.lazyAddContextPlugin(RouteFactory.class, this::createRouteFactory); camelContextExtension.lazyAddContextPlugin(RoutesLoader.class, this::createRoutesLoader); + camelContextExtension.lazyAddContextPlugin(AsyncProcessorAwaitManager.class, this::createAsyncProcessorAwaitManager); if (build) { try { @@ -2878,6 +2878,7 @@ public abstract class AbstractCamelContext extends BaseService // shutdown await manager to trigger interrupt of blocked threads to // attempt to free these threads graceful + final AsyncProcessorAwaitManager asyncProcessorAwaitManager = PluginHelper.getAsyncProcessorAwaitManager(this); InternalServiceManager.shutdownServices(this, asyncProcessorAwaitManager); // we need also to include routes which failed to start to ensure all resources get stopped when stopping Camel @@ -3286,7 +3287,6 @@ public abstract class AbstractCamelContext extends BaseService typeConverterRegistry = null; typeConverter = null; reactiveExecutor = null; - asyncProcessorAwaitManager = null; exchangeFactory = null; exchangeFactoryManager = null; processorExchangeFactory = null; @@ -4171,10 +4171,6 @@ public abstract class AbstractCamelContext extends BaseService camelContextExtension.addInterceptStrategy(interceptStrategy); } - public AsyncProcessorAwaitManager getAsyncProcessorAwaitManager() { - return camelContextExtension.getAsyncProcessorAwaitManager(); - } - public BeanIntrospection getBeanIntrospection() { return camelContextExtension.getBeanIntrospection(); } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java index b981927d3cb..dc0532c2898 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java @@ -37,7 +37,6 @@ import org.apache.camel.RuntimeCamelException; import org.apache.camel.Service; import org.apache.camel.catalog.RuntimeCamelCatalog; import org.apache.camel.spi.AnnotationBasedProcessorFactory; -import org.apache.camel.spi.AsyncProcessorAwaitManager; import org.apache.camel.spi.BeanIntrospection; import org.apache.camel.spi.BeanProcessorFactory; import org.apache.camel.spi.BeanProxyFactory; @@ -407,24 +406,6 @@ class DefaultCamelContextExtension implements ExtendedCamelContext { } } - @Override - public AsyncProcessorAwaitManager getAsyncProcessorAwaitManager() { - if (camelContext.asyncProcessorAwaitManager == null) { - synchronized (camelContext.lock) { - if (camelContext.asyncProcessorAwaitManager == null) { - setAsyncProcessorAwaitManager(camelContext.createAsyncProcessorAwaitManager()); - } - } - } - return camelContext.asyncProcessorAwaitManager; - } - - @Override - public void setAsyncProcessorAwaitManager(AsyncProcessorAwaitManager asyncProcessorAwaitManager) { - camelContext.asyncProcessorAwaitManager - = camelContext.getInternalServiceManager().addService(asyncProcessorAwaitManager); - } - @Override public BeanIntrospection getBeanIntrospection() { if (camelContext.beanIntrospection == null) { diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java index 6ae6ed90103..28f89571d46 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java @@ -37,6 +37,7 @@ import org.apache.camel.spi.Transformer; import org.apache.camel.spi.UnitOfWork; import org.apache.camel.support.AsyncCallbackToCompletableFutureAdapter; import org.apache.camel.support.OrderedComparator; +import org.apache.camel.support.PluginHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,7 +83,7 @@ public class SharedCamelInternalProcessor implements SharedInternalProcessor { public SharedCamelInternalProcessor(CamelContext camelContext, CamelInternalProcessorAdvice... advices) { this.camelContext = camelContext; this.reactiveExecutor = camelContext.getCamelContextExtension().getReactiveExecutor(); - this.awaitManager = camelContext.getCamelContextExtension().getAsyncProcessorAwaitManager(); + this.awaitManager = PluginHelper.getAsyncProcessorAwaitManager(camelContext); this.shutdownStrategy = camelContext.getShutdownStrategy(); if (advices != null) { diff --git a/core/camel-console/src/main/java/org/apache/camel/impl/console/BlockedConsole.java b/core/camel-console/src/main/java/org/apache/camel/impl/console/BlockedConsole.java index 339f216f558..8c897dc9b93 100644 --- a/core/camel-console/src/main/java/org/apache/camel/impl/console/BlockedConsole.java +++ b/core/camel-console/src/main/java/org/apache/camel/impl/console/BlockedConsole.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.camel.spi.AsyncProcessorAwaitManager; import org.apache.camel.spi.annotations.DevConsole; +import org.apache.camel.support.PluginHelper; import org.apache.camel.support.console.AbstractDevConsole; import org.apache.camel.util.TimeUtils; import org.apache.camel.util.json.JsonObject; @@ -37,7 +38,7 @@ public class BlockedConsole extends AbstractDevConsole { protected String doCallText(Map<String, Object> options) { StringBuilder sb = new StringBuilder(); - AsyncProcessorAwaitManager am = getCamelContext().getCamelContextExtension().getAsyncProcessorAwaitManager(); + AsyncProcessorAwaitManager am = PluginHelper.getAsyncProcessorAwaitManager(getCamelContext()); sb.append(String.format("\n Blocked: %s", am.size())); for (AsyncProcessorAwaitManager.AwaitThread at : am.browse()) { String age = TimeUtils.printDuration(at.getWaitDuration(), true); @@ -52,7 +53,7 @@ public class BlockedConsole extends AbstractDevConsole { protected JsonObject doCallJson(Map<String, Object> options) { JsonObject root = new JsonObject(); - AsyncProcessorAwaitManager am = getCamelContext().getCamelContextExtension().getAsyncProcessorAwaitManager(); + AsyncProcessorAwaitManager am = PluginHelper.getAsyncProcessorAwaitManager(getCamelContext()); root.put("blocked", am.size()); final List<JsonObject> list = new ArrayList<>(); diff --git a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java index 77230c6cdf2..5866067408b 100644 --- a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java +++ b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java @@ -23,8 +23,6 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com switch (ignoreCase ? name.toLowerCase() : name) { case "annotationbasedprocessorfactory": case "AnnotationBasedProcessorFactory": target.setAnnotationBasedProcessorFactory(property(camelContext, org.apache.camel.spi.AnnotationBasedProcessorFactory.class, value)); return true; - case "asyncprocessorawaitmanager": - case "AsyncProcessorAwaitManager": target.setAsyncProcessorAwaitManager(property(camelContext, org.apache.camel.spi.AsyncProcessorAwaitManager.class, value)); return true; case "basepackagescan": case "BasePackageScan": target.setBasePackageScan(property(camelContext, java.lang.String.class, value)); return true; case "beanintrospection": @@ -76,8 +74,6 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com switch (ignoreCase ? name.toLowerCase() : name) { case "annotationbasedprocessorfactory": case "AnnotationBasedProcessorFactory": return org.apache.camel.spi.AnnotationBasedProcessorFactory.class; - case "asyncprocessorawaitmanager": - case "AsyncProcessorAwaitManager": return org.apache.camel.spi.AsyncProcessorAwaitManager.class; case "basepackagescan": case "BasePackageScan": return java.lang.String.class; case "beanintrospection": @@ -130,8 +126,6 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com switch (ignoreCase ? name.toLowerCase() : name) { case "annotationbasedprocessorfactory": case "AnnotationBasedProcessorFactory": return target.getAnnotationBasedProcessorFactory(); - case "asyncprocessorawaitmanager": - case "AsyncProcessorAwaitManager": return target.getAsyncProcessorAwaitManager(); case "basepackagescan": case "BasePackageScan": return target.getBasePackageScan(); case "beanintrospection": diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContextExtension.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContextExtension.java index 82469a20b83..1878d61c495 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContextExtension.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContextExtension.java @@ -38,7 +38,6 @@ import org.apache.camel.Service; import org.apache.camel.ServiceStatus; import org.apache.camel.catalog.RuntimeCamelCatalog; import org.apache.camel.spi.AnnotationBasedProcessorFactory; -import org.apache.camel.spi.AsyncProcessorAwaitManager; import org.apache.camel.spi.BeanIntrospection; import org.apache.camel.spi.BeanProcessorFactory; import org.apache.camel.spi.BeanProxyFactory; @@ -63,9 +62,7 @@ import org.apache.camel.spi.Registry; import org.apache.camel.spi.ResourceLoader; import org.apache.camel.spi.RestBindingJaxbDataFormatFactory; import org.apache.camel.spi.RouteController; -import org.apache.camel.spi.RouteFactory; import org.apache.camel.spi.RouteStartupOrder; -import org.apache.camel.spi.RoutesLoader; import org.apache.camel.spi.StartupStepRecorder; import org.apache.camel.spi.SupervisingRouteController; import org.apache.camel.spi.UnitOfWorkFactory; @@ -290,16 +287,6 @@ class LightweightCamelContextExtension implements ExtendedCamelContext { throw new UnsupportedOperationException(); } - @Override - public AsyncProcessorAwaitManager getAsyncProcessorAwaitManager() { - return camelContext.getCamelContextExtension().getAsyncProcessorAwaitManager(); - } - - @Override - public void setAsyncProcessorAwaitManager(AsyncProcessorAwaitManager manager) { - throw new UnsupportedOperationException(); - } - @Override public BeanIntrospection getBeanIntrospection() { return camelContext.getCamelContextExtension().getBeanIntrospection(); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java index 2e8eba75bd7..688b7cf1cd6 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java @@ -118,7 +118,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport this.camelContext = camelContext; this.reactiveExecutor = camelContext.getCamelContextExtension().getReactiveExecutor(); - this.awaitManager = camelContext.getCamelContextExtension().getAsyncProcessorAwaitManager(); + this.awaitManager = PluginHelper.getAsyncProcessorAwaitManager(camelContext); this.shutdownStrategy = camelContext.getShutdownStrategy(); this.redeliveryProcessor = redeliveryProcessor; this.deadLetter = deadLetter; diff --git a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java index faa44550efd..ea03b928510 100644 --- a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java +++ b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java @@ -267,7 +267,7 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex AsyncProcessorAwaitManager asyncProcessorAwaitManager = getBeanForType(AsyncProcessorAwaitManager.class); if (asyncProcessorAwaitManager != null) { LOG.info("Using custom AsyncProcessorAwaitManager: {}", asyncProcessorAwaitManager); - getContext().getCamelContextExtension().setAsyncProcessorAwaitManager(asyncProcessorAwaitManager); + getContext().getCamelContextExtension().addContextPlugin(AsyncProcessorAwaitManager.class, asyncProcessorAwaitManager); } ManagementStrategy managementStrategy = getBeanForType(ManagementStrategy.class); if (managementStrategy != null) { diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java index 9522bbc165c..7c3912d4b62 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java @@ -31,6 +31,7 @@ import org.apache.camel.spi.Policy; import org.apache.camel.spi.Registry; import org.apache.camel.support.AsyncCallbackToCompletableFutureAdapter; import org.apache.camel.support.AsyncProcessorConverterHelper; +import org.apache.camel.support.PluginHelper; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -125,7 +126,7 @@ public class AsyncEndpointPolicyTest extends ContextTestSupport { public void process(Exchange exchange) throws Exception { final AsyncProcessorAwaitManager awaitManager - = exchange.getContext().getCamelContextExtension().getAsyncProcessorAwaitManager(); + = PluginHelper.getAsyncProcessorAwaitManager(exchange.getContext()); awaitManager.process(this, exchange); } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java index ec8383aaad5..51db2d95b5a 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java @@ -25,6 +25,7 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.spi.AsyncProcessorAwaitManager; +import org.apache.camel.support.PluginHelper; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -34,10 +35,10 @@ public class AsyncProcessorAwaitManagerInterruptTest extends ContextTestSupport @Test public void testAsyncAwaitInterrupt() throws Exception { - context.getCamelContextExtension().getAsyncProcessorAwaitManager().getStatistics().setStatisticsEnabled(true); - - assertEquals(0, context.getCamelContextExtension().getAsyncProcessorAwaitManager().size()); + final AsyncProcessorAwaitManager asyncProcessorAwaitManager = PluginHelper.getAsyncProcessorAwaitManager(context); + asyncProcessorAwaitManager.getStatistics().setStatisticsEnabled(true); + assertEquals(0, asyncProcessorAwaitManager.size()); getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel"); getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel"); getMockEndpoint("mock:result").expectedMessageCount(0); @@ -51,10 +52,10 @@ public class AsyncProcessorAwaitManagerInterruptTest extends ContextTestSupport assertMockEndpointsSatisfied(); - assertEquals(0, context.getCamelContextExtension().getAsyncProcessorAwaitManager().size()); + assertEquals(0, asyncProcessorAwaitManager.size()); assertEquals(1, - context.getCamelContextExtension().getAsyncProcessorAwaitManager().getStatistics().getThreadsBlocked()); - assertEquals(1, context.getCamelContextExtension().getAsyncProcessorAwaitManager().getStatistics() + asyncProcessorAwaitManager.getStatistics().getThreadsBlocked()); + assertEquals(1, asyncProcessorAwaitManager.getStatistics() .getThreadsInterrupted()); } @@ -69,17 +70,19 @@ public class AsyncProcessorAwaitManagerInterruptTest extends ContextTestSupport .to("mock:after").process(new Processor() { @Override public void process(Exchange exchange) throws Exception { - int size = context.getCamelContextExtension().getAsyncProcessorAwaitManager().size(); + final AsyncProcessorAwaitManager asyncProcessorAwaitManager + = PluginHelper.getAsyncProcessorAwaitManager(context); + int size = asyncProcessorAwaitManager.size(); log.info("async inflight: {}", size); assertEquals(1, size); Collection<AsyncProcessorAwaitManager.AwaitThread> threads - = context.getCamelContextExtension().getAsyncProcessorAwaitManager().browse(); + = asyncProcessorAwaitManager.browse(); AsyncProcessorAwaitManager.AwaitThread thread = threads.iterator().next(); // lets interrupt it String id = thread.getExchange().getExchangeId(); - context.getCamelContextExtension().getAsyncProcessorAwaitManager().interrupt(id); + asyncProcessorAwaitManager.interrupt(id); } }).transform(constant("Hi Camel")).to("mock:result"); } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptWithRedeliveryTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptWithRedeliveryTest.java index 808e39cb562..e8dcf3bee9b 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptWithRedeliveryTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptWithRedeliveryTest.java @@ -26,6 +26,7 @@ import org.apache.camel.ContextTestSupport; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.spi.AsyncProcessorAwaitManager; import org.apache.camel.spi.Registry; +import org.apache.camel.support.PluginHelper; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -50,9 +51,10 @@ public class AsyncProcessorAwaitManagerInterruptWithRedeliveryTest extends Conte @Test public void testAsyncAwaitInterrupt() throws Exception { - context.getCamelContextExtension().getAsyncProcessorAwaitManager().getStatistics().setStatisticsEnabled(true); + final AsyncProcessorAwaitManager asyncProcessorAwaitManager = PluginHelper.getAsyncProcessorAwaitManager(context); + asyncProcessorAwaitManager.getStatistics().setStatisticsEnabled(true); - assertEquals(0, context.getCamelContextExtension().getAsyncProcessorAwaitManager().size()); + assertEquals(0, asyncProcessorAwaitManager.size()); getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel"); getMockEndpoint("mock:result").expectedMessageCount(0); @@ -72,10 +74,10 @@ public class AsyncProcessorAwaitManagerInterruptWithRedeliveryTest extends Conte // Check we have not reached the full 5 re-deliveries verify(bean, atMost(4)).callMe(); - assertEquals(0, context.getCamelContextExtension().getAsyncProcessorAwaitManager().size()); + assertEquals(0, asyncProcessorAwaitManager.size()); assertEquals(1, - context.getCamelContextExtension().getAsyncProcessorAwaitManager().getStatistics().getThreadsBlocked()); - assertEquals(1, context.getCamelContextExtension().getAsyncProcessorAwaitManager().getStatistics() + asyncProcessorAwaitManager.getStatistics().getThreadsBlocked()); + assertEquals(1, asyncProcessorAwaitManager.getStatistics() .getThreadsInterrupted()); } @@ -89,16 +91,17 @@ public class AsyncProcessorAwaitManagerInterruptWithRedeliveryTest extends Conte } // Get our blocked thread - int size = context.getCamelContextExtension().getAsyncProcessorAwaitManager().size(); + final AsyncProcessorAwaitManager asyncProcessorAwaitManager = PluginHelper.getAsyncProcessorAwaitManager(context); + int size = asyncProcessorAwaitManager.size(); assertEquals(1, size); Collection<AsyncProcessorAwaitManager.AwaitThread> threads - = context.getCamelContextExtension().getAsyncProcessorAwaitManager().browse(); + = asyncProcessorAwaitManager.browse(); AsyncProcessorAwaitManager.AwaitThread thread = threads.iterator().next(); // Interrupt it String id = thread.getExchange().getExchangeId(); - context.getCamelContextExtension().getAsyncProcessorAwaitManager().interrupt(id); + asyncProcessorAwaitManager.interrupt(id); }).start(); } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java index c70e060063c..659797fa756 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java @@ -23,6 +23,7 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.spi.AsyncProcessorAwaitManager; +import org.apache.camel.support.PluginHelper; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; @@ -32,9 +33,10 @@ public class AsyncProcessorAwaitManagerTest extends ContextTestSupport { @Test public void testAsyncAwait() throws Exception { - context.getCamelContextExtension().getAsyncProcessorAwaitManager().getStatistics().setStatisticsEnabled(true); + final AsyncProcessorAwaitManager asyncProcessorAwaitManager = PluginHelper.getAsyncProcessorAwaitManager(context); + asyncProcessorAwaitManager.getStatistics().setStatisticsEnabled(true); - assertEquals(0, context.getCamelContextExtension().getAsyncProcessorAwaitManager().size()); + assertEquals(0, asyncProcessorAwaitManager.size()); getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel"); getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel"); @@ -45,10 +47,10 @@ public class AsyncProcessorAwaitManagerTest extends ContextTestSupport { assertMockEndpointsSatisfied(); - assertEquals(0, context.getCamelContextExtension().getAsyncProcessorAwaitManager().size()); + assertEquals(0, asyncProcessorAwaitManager.size()); assertEquals(1, - context.getCamelContextExtension().getAsyncProcessorAwaitManager().getStatistics().getThreadsBlocked()); - assertEquals(0, context.getCamelContextExtension().getAsyncProcessorAwaitManager().getStatistics() + asyncProcessorAwaitManager.getStatistics().getThreadsBlocked()); + assertEquals(0, asyncProcessorAwaitManager.getStatistics() .getThreadsInterrupted()); } @@ -63,12 +65,12 @@ public class AsyncProcessorAwaitManagerTest extends ContextTestSupport { .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { - int size = context.getCamelContextExtension().getAsyncProcessorAwaitManager().size(); + int size = PluginHelper.getAsyncProcessorAwaitManager(context).size(); log.info("async inflight: {}", size); assertEquals(1, size); Collection<AsyncProcessorAwaitManager.AwaitThread> threads - = context.getCamelContextExtension().getAsyncProcessorAwaitManager().browse(); + = PluginHelper.getAsyncProcessorAwaitManager(context).browse(); AsyncProcessorAwaitManager.AwaitThread thread = threads.iterator().next(); long wait = thread.getWaitDuration(); diff --git a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java index cfd445143f5..d1cc0adc905 100644 --- a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java +++ b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java @@ -383,7 +383,7 @@ public final class DefaultConfigurationConfigurer { } AsyncProcessorAwaitManager apam = getSingleBeanOfType(registry, AsyncProcessorAwaitManager.class); if (apam != null) { - ecc.getCamelContextExtension().setAsyncProcessorAwaitManager(apam); + ecc.getCamelContextExtension().addContextPlugin(AsyncProcessorAwaitManager.class, apam); } ManagementStrategy ms = getSingleBeanOfType(registry, ManagementStrategy.class); if (ms != null) { diff --git a/core/camel-support/src/main/java/org/apache/camel/support/AsyncProcessorHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/AsyncProcessorHelper.java index 6052d2f6dd4..19ae9d8f26c 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/AsyncProcessorHelper.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/AsyncProcessorHelper.java @@ -43,7 +43,7 @@ public final class AsyncProcessorHelper { */ public static void process(final AsyncProcessor processor, final Exchange exchange) throws Exception { final AsyncProcessorAwaitManager awaitManager - = exchange.getContext().getCamelContextExtension().getAsyncProcessorAwaitManager(); + = PluginHelper.getAsyncProcessorAwaitManager(exchange.getContext()); awaitManager.process(processor, exchange); } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/AsyncProcessorSupport.java b/core/camel-support/src/main/java/org/apache/camel/support/AsyncProcessorSupport.java index b693d3f1263..eba0c4ebd02 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/AsyncProcessorSupport.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/AsyncProcessorSupport.java @@ -36,7 +36,7 @@ public abstract class AsyncProcessorSupport extends ServiceSupport implements As @Override public void process(Exchange exchange) throws Exception { AsyncProcessorAwaitManager awaitManager - = exchange.getContext().getCamelContextExtension().getAsyncProcessorAwaitManager(); + = PluginHelper.getAsyncProcessorAwaitManager(exchange.getContext()); awaitManager.process(this, exchange); } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultAsyncProducer.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultAsyncProducer.java index 06df2020b51..24372acc978 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultAsyncProducer.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultAsyncProducer.java @@ -36,7 +36,7 @@ public abstract class DefaultAsyncProducer extends DefaultProducer implements As @Override public void process(Exchange exchange) throws Exception { AsyncProcessorAwaitManager awaitManager - = exchange.getContext().getCamelContextExtension().getAsyncProcessorAwaitManager(); + = PluginHelper.getAsyncProcessorAwaitManager(exchange.getContext()); awaitManager.process(this, exchange); } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java index deebc08f9d1..4870a9e22f5 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java @@ -23,6 +23,7 @@ import org.apache.camel.CamelContext; import org.apache.camel.ExtendedCamelContext; import org.apache.camel.console.DevConsoleResolver; import org.apache.camel.health.HealthCheckResolver; +import org.apache.camel.spi.AsyncProcessorAwaitManager; import org.apache.camel.spi.CamelBeanPostProcessor; import org.apache.camel.spi.CamelDependencyInjectionAnnotationFactory; import org.apache.camel.spi.ComponentNameResolver; @@ -430,11 +431,28 @@ public final class PluginHelper { return getRoutesLoader(camelContext.getCamelContextExtension()); } - /** * Gets the {@link RoutesLoader} to be used. */ public static RoutesLoader getRoutesLoader(ExtendedCamelContext extendedCamelContext) { return extendedCamelContext.getContextPlugin(RoutesLoader.class); } + + /** + * Gets the {@link org.apache.camel.AsyncProcessor} await manager. + * + * @return the manager + */ + public static AsyncProcessorAwaitManager getAsyncProcessorAwaitManager(CamelContext camelContext) { + return getAsyncProcessorAwaitManager(camelContext.getCamelContextExtension()); + } + + /** + * Gets the {@link org.apache.camel.AsyncProcessor} await manager. + * + * @return the manager + */ + public static AsyncProcessorAwaitManager getAsyncProcessorAwaitManager(ExtendedCamelContext extendedCamelContext) { + return extendedCamelContext.getContextPlugin(AsyncProcessorAwaitManager.class); + } }