This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f764e6  CAMEL-14354: Optimize core.
7f764e6 is described below

commit 7f764e61fe8ce6c679c569b0ad23b8ae85eb5168
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Thu Jan 23 14:20:09 2020 +0100

    CAMEL-14354: Optimize core.
---
 .../java/org/apache/camel/processor/Pipeline.java    | 20 +++++++++++---------
 1 file changed, 11 insertions(+), 9 deletions(-)

diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java 
b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
index a282ce6..7557450 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -18,8 +18,8 @@ package org.apache.camel.processor;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import org.apache.camel.AsyncCallback;
@@ -88,25 +88,27 @@ public class Pipeline extends AsyncProcessorSupport 
implements Navigate<Processo
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         if (exchange.isTransacted()) {
-            camelContext.getReactiveExecutor().scheduleSync(() -> 
Pipeline.this.doProcess(exchange, callback, processors.iterator(), true));
+            camelContext.getReactiveExecutor().scheduleSync(() -> 
Pipeline.this.doProcess(exchange, callback, processors, new AtomicInteger(), 
true));
         } else {
-            camelContext.getReactiveExecutor().scheduleMain(() -> 
Pipeline.this.doProcess(exchange, callback, processors.iterator(), true));
+            camelContext.getReactiveExecutor().scheduleMain(() -> 
Pipeline.this.doProcess(exchange, callback, processors, new AtomicInteger(), 
true));
         }
         return false;
     }
 
-    protected void doProcess(Exchange exchange, AsyncCallback callback, 
Iterator<AsyncProcessor> processors, boolean first) {
-        if (continueRouting(processors, exchange)
+    protected void doProcess(Exchange exchange, AsyncCallback callback, 
List<AsyncProcessor> processors, AtomicInteger index, boolean first) {
+        // optimize to use an atomic index counter for tracking how long we 
are in the processors list (uses less memory than iterator on array list)
+
+        if (continueRouting(processors, index, exchange)
                 && (first || continueProcessing(exchange, "so breaking out of 
pipeline", LOG))) {
 
             // prepare for next run
             ExchangeHelper.prepareOutToIn(exchange);
 
             // get the next processor
-            AsyncProcessor processor = processors.next();
+            AsyncProcessor processor = processors.get(index.getAndIncrement());
 
             processor.process(exchange, doneSync ->
-                    camelContext.getReactiveExecutor().schedule(() -> 
doProcess(exchange, callback, processors, false)));
+                    camelContext.getReactiveExecutor().schedule(() -> 
doProcess(exchange, callback, processors, index, false)));
         } else {
             ExchangeHelper.copyResults(exchange, exchange);
 
@@ -121,7 +123,7 @@ public class Pipeline extends AsyncProcessorSupport 
implements Navigate<Processo
         }
     }
 
-    protected boolean continueRouting(Iterator<AsyncProcessor> it, Exchange 
exchange) {
+    protected boolean continueRouting(List<AsyncProcessor> list, AtomicInteger 
index, Exchange exchange) {
         Object stop = exchange.getProperty(Exchange.ROUTE_STOP);
         if (stop != null) {
             boolean doStop = 
exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop);
@@ -133,7 +135,7 @@ public class Pipeline extends AsyncProcessorSupport 
implements Navigate<Processo
             }
         }
         // continue if there are more processors to route
-        boolean answer = it.hasNext();
+        boolean answer = index.get() < list.size();
         if (LOG.isTraceEnabled()) {
             LOG.trace("ExchangeId: {} should continue routing: {}", 
exchange.getExchangeId(), answer);
         }

Reply via email to