This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new b076058  CAMEL-14354: Optimize core. Revert optimiztion in pipeline as 
it caused a problem with Netty.
b076058 is described below

commit b076058899aee7fdb901fecc88a835683c3600a0
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Tue Jan 21 17:34:19 2020 +0100

    CAMEL-14354: Optimize core. Revert optimiztion in pipeline as it caused a 
problem with Netty.
---
 .../component/netty/NettyReuseChannelTest.java     |  4 +---
 .../java/org/apache/camel/processor/Pipeline.java  | 23 +++++++++++-----------
 2 files changed, 13 insertions(+), 14 deletions(-)

diff --git 
a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseChannelTest.java
 
b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseChannelTest.java
index 3b0c3b6..c8e1c69 100644
--- 
a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseChannelTest.java
+++ 
b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseChannelTest.java
@@ -24,10 +24,8 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
-import org.junit.Ignore;
 import org.junit.Test;
 
-@Ignore("TODO: Fix me")
 public class NettyReuseChannelTest extends BaseNettyTest {
 
     private final List<Channel> channels = new ArrayList<>();
@@ -44,7 +42,7 @@ public class NettyReuseChannelTest extends BaseNettyTest {
 
         assertMockEndpointsSatisfied();
 
-        assertTrue(notify.matchesMockWaitTime());
+        assertTrue(notify.matchesWaitTime());
 
         assertEquals(2, channels.size());
         assertSame("Should reuse channel", channels.get(0), channels.get(1));
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java 
b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
index 6b94f5f..2f50e1b 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -18,6 +18,7 @@ package org.apache.camel.processor;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -82,26 +83,25 @@ public class Pipeline extends AsyncProcessorSupport 
implements Navigate<Processo
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         if (exchange.isTransacted()) {
-            camelContext.getReactiveExecutor().scheduleSync(() -> 
Pipeline.this.doProcess(exchange, callback, processors, 0));
+            camelContext.getReactiveExecutor().scheduleSync(() -> 
Pipeline.this.doProcess(exchange, callback, processors.iterator(), true));
         } else {
-            camelContext.getReactiveExecutor().scheduleMain(() -> 
Pipeline.this.doProcess(exchange, callback, processors, 0));
+            camelContext.getReactiveExecutor().scheduleMain(() -> 
Pipeline.this.doProcess(exchange, callback, processors.iterator(), true));
         }
         return false;
     }
 
-    protected void doProcess(Exchange exchange, AsyncCallback callback, 
List<AsyncProcessor> processors, int index) {
-        if (continueRouting(processors, index, exchange)
-                && (index == 0 || continueProcessing(exchange, "so breaking 
out of pipeline", log))) {
+    protected void doProcess(Exchange exchange, AsyncCallback callback, 
Iterator<AsyncProcessor> processors, boolean first) {
+        if (continueRouting(processors, exchange)
+                && (first || continueProcessing(exchange, "so breaking out of 
pipeline", log))) {
 
             // prepare for next run
             ExchangeHelper.prepareOutToIn(exchange);
 
             // get the next processor
-            AsyncProcessor processor = processors.get(index);
+            AsyncProcessor processor = processors.next();
 
-            final Integer idx = index + 1;
             processor.process(exchange, doneSync ->
-                    camelContext.getReactiveExecutor().schedule(() -> 
doProcess(exchange, callback, processors, idx)));
+                    camelContext.getReactiveExecutor().schedule(() -> 
doProcess(exchange, callback, processors, false)));
         } else {
             ExchangeHelper.copyResults(exchange, exchange);
 
@@ -116,19 +116,19 @@ public class Pipeline extends AsyncProcessorSupport 
implements Navigate<Processo
         }
     }
 
-    protected boolean continueRouting(List<AsyncProcessor> processors, int 
index, Exchange exchange) {
+    protected boolean continueRouting(Iterator<AsyncProcessor> it, Exchange 
exchange) {
         Object stop = exchange.getProperty(Exchange.ROUTE_STOP);
         if (stop != null) {
             boolean doStop = 
exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop);
             if (doStop) {
-                if (log.isTraceEnabled()) {
+                if (log.isDebugEnabled()) {
                     log.debug("ExchangeId: {} is marked to stop routing: {}", 
exchange.getExchangeId(), exchange);
                 }
                 return false;
             }
         }
         // continue if there are more processors to route
-        boolean answer = index < processors.size();
+        boolean answer = it.hasNext();
         if (log.isTraceEnabled()) {
             log.trace("ExchangeId: {} should continue routing: {}", 
exchange.getExchangeId(), answer);
         }
@@ -150,6 +150,7 @@ public class Pipeline extends AsyncProcessorSupport 
implements Navigate<Processo
         return id;
     }
 
+    @SuppressWarnings("unchecked")
     public List<Processor> getProcessors() {
         return (List) processors;
     }

Reply via email to