This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 7f764e6 CAMEL-14354: Optimize core. 7f764e6 is described below commit 7f764e61fe8ce6c679c569b0ad23b8ae85eb5168 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Jan 23 14:20:09 2020 +0100 CAMEL-14354: Optimize core. --- .../java/org/apache/camel/processor/Pipeline.java | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java index a282ce6..7557450 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java @@ -18,8 +18,8 @@ package org.apache.camel.processor; import java.util.ArrayList; import java.util.Collection; -import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.camel.AsyncCallback; @@ -88,25 +88,27 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo @Override public boolean process(Exchange exchange, AsyncCallback callback) { if (exchange.isTransacted()) { - camelContext.getReactiveExecutor().scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true)); + camelContext.getReactiveExecutor().scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors, new AtomicInteger(), true)); } else { - camelContext.getReactiveExecutor().scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true)); + camelContext.getReactiveExecutor().scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors, new AtomicInteger(), true)); } return false; } - protected void doProcess(Exchange exchange, AsyncCallback callback, Iterator<AsyncProcessor> processors, boolean first) { - if (continueRouting(processors, exchange) + protected void doProcess(Exchange exchange, AsyncCallback callback, List<AsyncProcessor> processors, AtomicInteger index, boolean first) { + // optimize to use an atomic index counter for tracking how long we are in the processors list (uses less memory than iterator on array list) + + if (continueRouting(processors, index, exchange) && (first || continueProcessing(exchange, "so breaking out of pipeline", LOG))) { // prepare for next run ExchangeHelper.prepareOutToIn(exchange); // get the next processor - AsyncProcessor processor = processors.next(); + AsyncProcessor processor = processors.get(index.getAndIncrement()); processor.process(exchange, doneSync -> - camelContext.getReactiveExecutor().schedule(() -> doProcess(exchange, callback, processors, false))); + camelContext.getReactiveExecutor().schedule(() -> doProcess(exchange, callback, processors, index, false))); } else { ExchangeHelper.copyResults(exchange, exchange); @@ -121,7 +123,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo } } - protected boolean continueRouting(Iterator<AsyncProcessor> it, Exchange exchange) { + protected boolean continueRouting(List<AsyncProcessor> list, AtomicInteger index, Exchange exchange) { Object stop = exchange.getProperty(Exchange.ROUTE_STOP); if (stop != null) { boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop); @@ -133,7 +135,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo } } // continue if there are more processors to route - boolean answer = it.hasNext(); + boolean answer = index.get() < list.size(); if (LOG.isTraceEnabled()) { LOG.trace("ExchangeId: {} should continue routing: {}", exchange.getExchangeId(), answer); }