This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit d7952e5b2be84b94b78e414dc6477e3d8167e678 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sun Jan 19 22:36:32 2020 +0100 CAMEL-14354: Optimize core --- .../main/java/org/apache/camel/spi/UnitOfWork.java | 13 +++++++-- .../camel/impl/engine/DefaultUnitOfWork.java | 6 ++++ .../apache/camel/impl/engine/MDCUnitOfWork.java | 5 ++++ .../camel/processor/CamelInternalProcessor.java | 34 +++++++++++++--------- .../processor/SharedCamelInternalProcessor.java | 34 ++++++++++++---------- .../camel/impl/CustomUnitOfWorkFactoryTest.java | 5 ++++ 6 files changed, 66 insertions(+), 31 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java b/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java index 42bac62..e7efd1e 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java @@ -188,11 +188,18 @@ public interface UnitOfWork extends Service { RouteContext popRouteContext(); /** - * Strategy for optional work to be execute before processing + * Whether the unit of work should call the before/after process methods or not. + */ + boolean isBeforeAfterProcess(); + + /** + * Strategy for work to be execute before processing. * <p/> * For example the MDCUnitOfWork leverages this * to ensure MDC is handled correctly during routing exchanges using the * asynchronous routing engine. + * <p/> + * This requires {@link #isBeforeAfterProcess()} returns <tt>true</tt> to be enabled. * * @param processor the processor to be executed * @param exchange the current exchange @@ -202,7 +209,9 @@ public interface UnitOfWork extends Service { AsyncCallback beforeProcess(Processor processor, Exchange exchange, AsyncCallback callback); /** - * Strategy for optional work to be executed after the processing + * Strategy for work to be executed after the processing + * <p/> + * This requires {@link #isBeforeAfterProcess()} returns <tt>true</tt> to be enabled. * * @param processor the processor executed * @param exchange the current exchange diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java index a96df0b..42e70fd 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java @@ -292,6 +292,11 @@ public class DefaultUnitOfWork implements UnitOfWork, Service { } @Override + public boolean isBeforeAfterProcess() { + return false; + } + + @Override public AsyncCallback beforeProcess(Processor processor, Exchange exchange, AsyncCallback callback) { // no wrapping needed return callback; @@ -299,6 +304,7 @@ public class DefaultUnitOfWork implements UnitOfWork, Service { @Override public void afterProcess(Processor processor, Exchange exchange, AsyncCallback callback, boolean doneSync) { + // noop } private Set<Object> getTransactedBy() { diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java index 30812db..006f579 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java @@ -127,6 +127,11 @@ public class MDCUnitOfWork extends DefaultUnitOfWork { } @Override + public boolean isBeforeAfterProcess() { + return true; + } + + @Override public AsyncCallback beforeProcess(Processor processor, Exchange exchange, AsyncCallback callback) { // add optional step id String stepId = exchange.getProperty(Exchange.STEP_ID, String.class); diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index 9c9164f..46cc50e 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -134,6 +134,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { } @Override + @SuppressWarnings("unchecked") public boolean process(Exchange exchange, AsyncCallback originalCallback) { // ---------------------------------------------------------- // CAMEL END USER - READ ME FOR DEBUGGING TIPS @@ -193,7 +194,9 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { // CAMEL END USER - DEBUG ME HERE +++ START +++ // ---------------------------------------------------------- // callback must be called - exchange.getContext().getReactiveExecutor().schedule(originalCallback); + if (originalCallback != null) { + exchange.getContext().getReactiveExecutor().schedule(originalCallback); + } // ---------------------------------------------------------- // CAMEL END USER - DEBUG ME HERE +++ END +++ // ---------------------------------------------------------- @@ -225,11 +228,12 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { } else { final UnitOfWork uow = exchange.getUnitOfWork(); - // allow unit of work to wrap callback in case it need to do some special work - // for example the MDCUnitOfWork + // do uow before processing and if a value is returned the the uow wants to be processed after + // was well in the same thread AsyncCallback async = callback; - if (uow != null) { - async = uow.beforeProcess(processor, exchange, callback); + boolean beforeAndAfter = uow.isBeforeAfterProcess(); + if (beforeAndAfter) { + async = uow.beforeProcess(processor, exchange, async); } // ---------------------------------------------------------- @@ -243,17 +247,19 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { // CAMEL END USER - DEBUG ME HERE +++ END +++ // ---------------------------------------------------------- - exchange.getContext().getReactiveExecutor().schedule(() -> { - // execute any after processor work (in current thread, not in the callback) - if (uow != null) { + // optimize to only do after uow processing if really needed + if (beforeAndAfter) { + exchange.getContext().getReactiveExecutor().schedule(() -> { + // execute any after processor work (in current thread, not in the callback) uow.afterProcess(processor, exchange, callback, false); - } + }); + } - if (log.isTraceEnabled()) { - log.trace("Exchange processed and is continued routed asynchronously for exchangeId: {} -> {}", - exchange.getExchangeId(), exchange); - } - }); + if (log.isTraceEnabled()) { + log.trace("Exchange processed and is continued routed asynchronously for exchangeId: {} -> {}", + exchange.getExchangeId(), exchange); + } + // must return false return false; } } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java index 61f3177..687b875b 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java @@ -185,11 +185,12 @@ public class SharedCamelInternalProcessor { } else { final UnitOfWork uow = exchange.getUnitOfWork(); - // allow unit of work to wrap callback in case it need to do some special work - // for example the MDCUnitOfWork + // do uow before processing and if a value is returned the the uow wants to be processed after + // was well in the same thread AsyncCallback async = callback; - if (uow != null) { - async = uow.beforeProcess(processor, exchange, callback); + boolean beforeAndAfter = uow.isBeforeAfterProcess(); + if (beforeAndAfter) { + async = uow.beforeProcess(processor, exchange, async); } // ---------------------------------------------------------- @@ -203,17 +204,18 @@ public class SharedCamelInternalProcessor { // CAMEL END USER - DEBUG ME HERE +++ END +++ // ---------------------------------------------------------- - exchange.getContext().getReactiveExecutor().schedule(() -> { - // execute any after processor work (in current thread, not in the callback) - if (uow != null) { + // optimize to only do after uow processing if really needed + if (beforeAndAfter) { + exchange.getContext().getReactiveExecutor().schedule(() -> { + // execute any after processor work (in current thread, not in the callback) uow.afterProcess(processor, exchange, callback, sync); - } + }); + } - if (LOG.isTraceEnabled()) { - LOG.trace("Exchange processed and is continued routed asynchronously for exchangeId: {} -> {}", - exchange.getExchangeId(), exchange); - } - }); + if (LOG.isTraceEnabled()) { + LOG.trace("Exchange processed and is continued routed asynchronously for exchangeId: {} -> {}", + exchange.getExchangeId(), exchange); + } return sync; } } @@ -251,7 +253,7 @@ public class SharedCamelInternalProcessor { // we should call after in reverse order try { - for (int i = advices.size() - 1, j = states.length - 1; i >= 0; i--) { + for (int i = advices != null ? advices.size() - 1 : -1, j = states.length - 1; i >= 0; i--) { CamelInternalProcessorAdvice task = advices.get(i); Object state = null; if (task.hasState()) { @@ -269,7 +271,9 @@ public class SharedCamelInternalProcessor { // CAMEL END USER - DEBUG ME HERE +++ START +++ // ---------------------------------------------------------- // callback must be called - exchange.getContext().getReactiveExecutor().schedule(callback); + if (callback != null) { + exchange.getContext().getReactiveExecutor().schedule(callback); + } // ---------------------------------------------------------- // CAMEL END USER - DEBUG ME HERE +++ END +++ // ---------------------------------------------------------- diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/CustomUnitOfWorkFactoryTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/CustomUnitOfWorkFactoryTest.java index ffdca8f..b4e9776 100644 --- a/core/camel-core/src/test/java/org/apache/camel/impl/CustomUnitOfWorkFactoryTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/impl/CustomUnitOfWorkFactoryTest.java @@ -76,5 +76,10 @@ public class CustomUnitOfWorkFactoryTest extends ContextTestSupport { exchange.getIn().setHeader("before", "I was here"); return callback; } + + @Override + public boolean isBeforeAfterProcess() { + return true; + } } }