This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 5a16e22 CAMEL-14354: Optimize core 5a16e22 is described below commit 5a16e223c8f962dbba7693fba5db2b1514f42841 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sat Jan 25 10:09:50 2020 +0100 CAMEL-14354: Optimize core --- .../java/org/apache/camel/processor/Pipeline.java | 35 ++++++++-------------- .../org/apache/camel/support/ExchangeHelper.java | 1 + 2 files changed, 13 insertions(+), 23 deletions(-) diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java index 0fe1ef6..0c45a0e 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java @@ -40,7 +40,6 @@ import org.apache.camel.support.service.ServiceHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import static org.apache.camel.processor.PipelineHelper.continueProcessing; /** @@ -53,7 +52,8 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo private final CamelContext camelContext; private final ReactiveExecutor reactiveExecutor; - private List<AsyncProcessor> processors; + private final List<AsyncProcessor> processors; + private final int size; private String id; private String routeId; @@ -61,6 +61,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo this.camelContext = camelContext; this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor(); this.processors = processors.stream().map(AsyncProcessorConverterHelper::convert).collect(Collectors.toList()); + this.size = processors.size(); } public static Processor newInstance(CamelContext camelContext, List<Processor> processors) { @@ -92,18 +93,21 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo @Override public boolean process(Exchange exchange, AsyncCallback callback) { if (exchange.isTransacted()) { - reactiveExecutor.scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors, new AtomicInteger(), true)); + reactiveExecutor.scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors, size, new AtomicInteger(), true)); } else { - reactiveExecutor.scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors, new AtomicInteger(), true)); + reactiveExecutor.scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors, size, new AtomicInteger(), true)); } return false; } - protected void doProcess(Exchange exchange, AsyncCallback callback, List<AsyncProcessor> processors, AtomicInteger index, boolean first) { + protected void doProcess(Exchange exchange, AsyncCallback callback, List<AsyncProcessor> processors, int size, AtomicInteger index, boolean first) { // optimize to use an atomic index counter for tracking how long we are in the processors list (uses less memory than iterator on array list) - if (continueRouting(processors, index, exchange) - && (first || continueProcessing(exchange, "so breaking out of pipeline", LOG))) { + boolean stop = exchange.isRouteStop(); + boolean more = index.get() < size; + + if (!stop && more && + (first || continueProcessing(exchange, "so breaking out of pipeline", LOG))) { // prepare for next run ExchangeHelper.prepareOutToIn(exchange); @@ -112,7 +116,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo AsyncProcessor processor = processors.get(index.getAndIncrement()); processor.process(exchange, doneSync -> - reactiveExecutor.schedule(() -> doProcess(exchange, callback, processors, index, false))); + reactiveExecutor.schedule(() -> doProcess(exchange, callback, processors, size, index, false))); } else { ExchangeHelper.copyResults(exchange, exchange); @@ -127,21 +131,6 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo } } - protected boolean continueRouting(List<AsyncProcessor> list, AtomicInteger index, Exchange exchange) { - if (exchange.isRouteStop()) { - if (LOG.isDebugEnabled()) { - LOG.debug("ExchangeId: {} is marked to stop routing: {}", exchange.getExchangeId(), exchange); - } - return false; - } - // continue if there are more processors to route - boolean answer = index.get() < list.size(); - if (LOG.isTraceEnabled()) { - LOG.trace("ExchangeId: {} should continue routing: {}", exchange.getExchangeId(), answer); - } - return answer; - } - @Override protected void doStart() throws Exception { ServiceHelper.startService(processors); diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java index 0696b72..5f6b938 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java @@ -680,6 +680,7 @@ public final class ExchangeHelper { * @return <tt>true</tt> if handled already by error handler, <tt>false</tt> otherwise */ public static boolean hasExceptionBeenHandledByErrorHandler(Exchange exchange) { + // TODO: optimize this return Boolean.TRUE.equals(exchange.getProperty(Exchange.ERRORHANDLER_HANDLED)); }