This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit d18688cda33a3176751891c3cc6f0a0f304c57ef Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sat Feb 1 11:38:37 2020 +0100 CAMEL-14354: camel-core optimize. Reduce object allocations for lambdas in pipeline. --- .../java/org/apache/camel/processor/Pipeline.java | 31 ++++++++++++++++++---- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java index 7840875..ee01a49 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java @@ -40,6 +40,7 @@ import org.apache.camel.support.service.ServiceHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import static org.apache.camel.processor.PipelineHelper.continueProcessing; /** @@ -57,6 +58,24 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo private String id; private String routeId; + private final class PipelineTask implements Runnable { + + private final Exchange exchange; + private final AsyncCallback callback; + private final AtomicInteger index; + + PipelineTask(Exchange exchange, AsyncCallback callback, AtomicInteger index) { + this.exchange = exchange; + this.callback = callback; + this.index = index; + } + + @Override + public void run() { + doProcess(this, exchange, callback, index); + } + } + public Pipeline(CamelContext camelContext, Collection<Processor> processors) { this.camelContext = camelContext; this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor(); @@ -92,15 +111,18 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo @Override public boolean process(Exchange exchange, AsyncCallback callback) { + // create task which has state used during routing + PipelineTask task = new PipelineTask(exchange, callback, new AtomicInteger()); + if (exchange.isTransacted()) { - reactiveExecutor.scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors, size, new AtomicInteger())); + reactiveExecutor.scheduleSync(task); } else { - reactiveExecutor.scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors, size, new AtomicInteger())); + reactiveExecutor.scheduleMain(task); } return false; } - protected void doProcess(Exchange exchange, AsyncCallback callback, List<AsyncProcessor> processors, int size, AtomicInteger index) { + protected void doProcess(PipelineTask task, Exchange exchange, AsyncCallback callback, AtomicInteger index) { // optimize to use an atomic index counter for tracking how long we are in the processors list (uses less memory than iterator on array list) boolean stop = exchange.isRouteStop(); @@ -119,8 +141,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo // get the next processor AsyncProcessor processor = processors.get(index.getAndIncrement()); - processor.process(exchange, doneSync -> - reactiveExecutor.schedule(() -> doProcess(exchange, callback, processors, size, index))); + processor.process(exchange, doneSync -> reactiveExecutor.schedule(task)); } else { ExchangeHelper.copyResults(exchange, exchange);