This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a16e22  CAMEL-14354: Optimize core
5a16e22 is described below

commit 5a16e223c8f962dbba7693fba5db2b1514f42841
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Sat Jan 25 10:09:50 2020 +0100

    CAMEL-14354: Optimize core
---
 .../java/org/apache/camel/processor/Pipeline.java  | 35 ++++++++--------------
 .../org/apache/camel/support/ExchangeHelper.java   |  1 +
 2 files changed, 13 insertions(+), 23 deletions(-)

diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java 
b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
index 0fe1ef6..0c45a0e 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -40,7 +40,6 @@ import org.apache.camel.support.service.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 import static org.apache.camel.processor.PipelineHelper.continueProcessing;
 
 /**
@@ -53,7 +52,8 @@ public class Pipeline extends AsyncProcessorSupport 
implements Navigate<Processo
 
     private final CamelContext camelContext;
     private final ReactiveExecutor reactiveExecutor;
-    private List<AsyncProcessor> processors;
+    private final List<AsyncProcessor> processors;
+    private final int size;
     private String id;
     private String routeId;
 
@@ -61,6 +61,7 @@ public class Pipeline extends AsyncProcessorSupport 
implements Navigate<Processo
         this.camelContext = camelContext;
         this.reactiveExecutor = 
camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
         this.processors = 
processors.stream().map(AsyncProcessorConverterHelper::convert).collect(Collectors.toList());
+        this.size = processors.size();
     }
 
     public static Processor newInstance(CamelContext camelContext, 
List<Processor> processors) {
@@ -92,18 +93,21 @@ public class Pipeline extends AsyncProcessorSupport 
implements Navigate<Processo
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         if (exchange.isTransacted()) {
-            reactiveExecutor.scheduleSync(() -> 
Pipeline.this.doProcess(exchange, callback, processors, new AtomicInteger(), 
true));
+            reactiveExecutor.scheduleSync(() -> 
Pipeline.this.doProcess(exchange, callback, processors, size, new 
AtomicInteger(), true));
         } else {
-            reactiveExecutor.scheduleMain(() -> 
Pipeline.this.doProcess(exchange, callback, processors, new AtomicInteger(), 
true));
+            reactiveExecutor.scheduleMain(() -> 
Pipeline.this.doProcess(exchange, callback, processors, size, new 
AtomicInteger(), true));
         }
         return false;
     }
 
-    protected void doProcess(Exchange exchange, AsyncCallback callback, 
List<AsyncProcessor> processors, AtomicInteger index, boolean first) {
+    protected void doProcess(Exchange exchange, AsyncCallback callback, 
List<AsyncProcessor> processors, int size, AtomicInteger index, boolean first) {
         // optimize to use an atomic index counter for tracking how long we 
are in the processors list (uses less memory than iterator on array list)
 
-        if (continueRouting(processors, index, exchange)
-                && (first || continueProcessing(exchange, "so breaking out of 
pipeline", LOG))) {
+        boolean stop = exchange.isRouteStop();
+        boolean more = index.get() < size;
+
+        if (!stop && more &&
+                (first || continueProcessing(exchange, "so breaking out of 
pipeline", LOG))) {
 
             // prepare for next run
             ExchangeHelper.prepareOutToIn(exchange);
@@ -112,7 +116,7 @@ public class Pipeline extends AsyncProcessorSupport 
implements Navigate<Processo
             AsyncProcessor processor = processors.get(index.getAndIncrement());
 
             processor.process(exchange, doneSync ->
-                    reactiveExecutor.schedule(() -> doProcess(exchange, 
callback, processors, index, false)));
+                    reactiveExecutor.schedule(() -> doProcess(exchange, 
callback, processors, size, index, false)));
         } else {
             ExchangeHelper.copyResults(exchange, exchange);
 
@@ -127,21 +131,6 @@ public class Pipeline extends AsyncProcessorSupport 
implements Navigate<Processo
         }
     }
 
-    protected boolean continueRouting(List<AsyncProcessor> list, AtomicInteger 
index, Exchange exchange) {
-        if (exchange.isRouteStop()) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("ExchangeId: {} is marked to stop routing: {}", 
exchange.getExchangeId(), exchange);
-            }
-            return false;
-        }
-        // continue if there are more processors to route
-        boolean answer = index.get() < list.size();
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("ExchangeId: {} should continue routing: {}", 
exchange.getExchangeId(), answer);
-        }
-        return answer;
-    }
-
     @Override
     protected void doStart() throws Exception {
         ServiceHelper.startService(processors);
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java 
b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index 0696b72..5f6b938 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -680,6 +680,7 @@ public final class ExchangeHelper {
      * @return <tt>true</tt> if handled already by error handler, 
<tt>false</tt> otherwise
      */
     public static boolean hasExceptionBeenHandledByErrorHandler(Exchange 
exchange) {
+        // TODO: optimize this
         return 
Boolean.TRUE.equals(exchange.getProperty(Exchange.ERRORHANDLER_HANDLED));
     }
 

Reply via email to