This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 1a25fc8175b8fd9464ba1e44c69b61b161d7ba07 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Jan 13 17:13:21 2020 +0100 CAMEL-14354: camel-core - Optimize unnecessary object allocations. ReactiveExecutor no longer creates object for callback API. --- .../main/java/org/apache/camel/AsyncCallback.java | 9 +++++++- .../org/apache/camel/spi/ReactiveExecutor.java | 25 +++------------------- .../camel/impl/engine/DefaultReactiveExecutor.java | 14 ------------ .../camel/processor/CamelInternalProcessor.java | 2 +- .../apache/camel/processor/MulticastProcessor.java | 2 +- .../java/org/apache/camel/processor/Pipeline.java | 2 +- .../processor/SharedCamelInternalProcessor.java | 2 +- .../errorhandler/RedeliveryErrorHandler.java | 10 ++++----- 8 files changed, 20 insertions(+), 46 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/AsyncCallback.java b/core/camel-api/src/main/java/org/apache/camel/AsyncCallback.java index 2d9f33f..d0c3e5d 100644 --- a/core/camel-api/src/main/java/org/apache/camel/AsyncCallback.java +++ b/core/camel-api/src/main/java/org/apache/camel/AsyncCallback.java @@ -25,7 +25,7 @@ package org.apache.camel; * routing {@link Exchange} when all the data has been gathered. This allows to build non blocking * request/reply communication. */ -public interface AsyncCallback { +public interface AsyncCallback extends Runnable { /** * This method is invoked once the {@link Exchange} is done. @@ -38,4 +38,11 @@ public interface AsyncCallback { */ void done(boolean doneSync); + /** + * Optimized for the reactive executor engine to be able to schedule this callback in its engine. + */ + @Override + default void run() { + done(false); + } } diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java index fc3c4de..eb0df12 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java @@ -16,8 +16,6 @@ */ package org.apache.camel.spi; -import org.apache.camel.AsyncCallback; - /** * SPI to plugin different reactive engines in the Camel routing engine. */ @@ -36,6 +34,7 @@ public interface ReactiveExecutor { * @param runnable the task * @param description a human readable description for logging purpose */ + @Deprecated void schedule(Runnable runnable, String description); /** @@ -51,6 +50,7 @@ public interface ReactiveExecutor { * @param runnable the task * @param description a human readable description for logging purpose */ + @Deprecated void scheduleMain(Runnable runnable, String description); /** @@ -66,6 +66,7 @@ public interface ReactiveExecutor { * @param runnable the task * @param description a human readable description for logging purpose */ + @Deprecated void scheduleSync(Runnable runnable, String description); /** @@ -75,24 +76,4 @@ public interface ReactiveExecutor { */ boolean executeFromQueue(); - /** - * Schedules the callback to be run - * - * @param callback the callable - */ - default void callback(AsyncCallback callback) { - schedule(new Runnable() { - - @Override - public void run() { - callback.done(false); - } - - @Override - public String toString() { - return "Callback[" + callback + "]"; - } - }); - } - } diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java index 7be0a46..b12b8f4 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java @@ -110,20 +110,6 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE return pendingTasks.get(); } - @Override - public void callback(AsyncCallback callback) { - schedule(new Runnable() { - @Override - public void run() { - callback.done(false); - } - @Override - public String toString() { - return "Callback[" + callback + "]"; - } - }); - } - private static Runnable describe(Runnable runnable, String description) { return new Runnable() { @Override diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index 7f273cc..a685085 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -180,7 +180,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { // CAMEL END USER - DEBUG ME HERE +++ START +++ // ---------------------------------------------------------- // callback must be called - exchange.getContext().getReactiveExecutor().callback(originalCallback); + exchange.getContext().getReactiveExecutor().schedule(originalCallback); // ---------------------------------------------------------- // CAMEL END USER - DEBUG ME HERE +++ END +++ // ---------------------------------------------------------- diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 3c00670..6a918ed 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -547,7 +547,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat original.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhaust); } - camelContext.getReactiveExecutor().callback(callback); + camelContext.getReactiveExecutor().schedule(callback); } /** diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java index 2aa26f0..e646a45 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java @@ -110,7 +110,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); - camelContext.getReactiveExecutor().callback(callback); + camelContext.getReactiveExecutor().schedule(callback); } } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java index 4eaa6a7..a278187 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java @@ -254,7 +254,7 @@ public class SharedCamelInternalProcessor { // CAMEL END USER - DEBUG ME HERE +++ START +++ // ---------------------------------------------------------- // callback must be called - exchange.getContext().getReactiveExecutor().callback(callback); + exchange.getContext().getReactiveExecutor().schedule(callback); // ---------------------------------------------------------- // CAMEL END USER - DEBUG ME HERE +++ END +++ // ---------------------------------------------------------- diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java index f6a2721..6d3467a 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java @@ -464,7 +464,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme // mark the exchange to stop continue routing when interrupted // as we do not want to continue routing (for example a task has been cancelled) exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE); - camelContext.getReactiveExecutor().callback(callback); + camelContext.getReactiveExecutor().schedule(callback); } } } else { @@ -477,7 +477,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme // only process if the exchange hasn't failed // and it has not been handled by the error processor if (isDone(exchange)) { - camelContext.getReactiveExecutor().callback(callback); + camelContext.getReactiveExecutor().schedule(callback); } else { // error occurred so loop back around which we do by invoking the processAsyncErrorHandler camelContext.getReactiveExecutor().schedule(this); @@ -558,7 +558,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme // only process if the exchange hasn't failed // and it has not been handled by the error processor if (isDone(exchange)) { - camelContext.getReactiveExecutor().callback(callback); + camelContext.getReactiveExecutor().schedule(callback); return; } else { // error occurred so loop back around which we do by invoking the processAsyncErrorHandler @@ -846,7 +846,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri); } finally { // if the fault was handled asynchronously, this should be reflected in the callback as well - camelContext.getReactiveExecutor().callback(callback); + camelContext.getReactiveExecutor().schedule(callback); } }); } else { @@ -865,7 +865,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme prepareExchangeAfterFailure(exchange, isDeadLetterChannel, shouldHandle, shouldContinue); } finally { // callback we are done - camelContext.getReactiveExecutor().callback(callback); + camelContext.getReactiveExecutor().schedule(callback); } }