This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new b076058 CAMEL-14354: Optimize core. Revert optimiztion in pipeline as it caused a problem with Netty. b076058 is described below commit b076058899aee7fdb901fecc88a835683c3600a0 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Jan 21 17:34:19 2020 +0100 CAMEL-14354: Optimize core. Revert optimiztion in pipeline as it caused a problem with Netty. --- .../component/netty/NettyReuseChannelTest.java | 4 +--- .../java/org/apache/camel/processor/Pipeline.java | 23 +++++++++++----------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseChannelTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseChannelTest.java index 3b0c3b6..c8e1c69 100644 --- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseChannelTest.java +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseChannelTest.java @@ -24,10 +24,8 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.NotifyBuilder; import org.apache.camel.builder.RouteBuilder; -import org.junit.Ignore; import org.junit.Test; -@Ignore("TODO: Fix me") public class NettyReuseChannelTest extends BaseNettyTest { private final List<Channel> channels = new ArrayList<>(); @@ -44,7 +42,7 @@ public class NettyReuseChannelTest extends BaseNettyTest { assertMockEndpointsSatisfied(); - assertTrue(notify.matchesMockWaitTime()); + assertTrue(notify.matchesWaitTime()); assertEquals(2, channels.size()); assertSame("Should reuse channel", channels.get(0), channels.get(1)); diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java index 6b94f5f..2f50e1b 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java @@ -18,6 +18,7 @@ package org.apache.camel.processor; import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; @@ -82,26 +83,25 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo @Override public boolean process(Exchange exchange, AsyncCallback callback) { if (exchange.isTransacted()) { - camelContext.getReactiveExecutor().scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors, 0)); + camelContext.getReactiveExecutor().scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true)); } else { - camelContext.getReactiveExecutor().scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors, 0)); + camelContext.getReactiveExecutor().scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true)); } return false; } - protected void doProcess(Exchange exchange, AsyncCallback callback, List<AsyncProcessor> processors, int index) { - if (continueRouting(processors, index, exchange) - && (index == 0 || continueProcessing(exchange, "so breaking out of pipeline", log))) { + protected void doProcess(Exchange exchange, AsyncCallback callback, Iterator<AsyncProcessor> processors, boolean first) { + if (continueRouting(processors, exchange) + && (first || continueProcessing(exchange, "so breaking out of pipeline", log))) { // prepare for next run ExchangeHelper.prepareOutToIn(exchange); // get the next processor - AsyncProcessor processor = processors.get(index); + AsyncProcessor processor = processors.next(); - final Integer idx = index + 1; processor.process(exchange, doneSync -> - camelContext.getReactiveExecutor().schedule(() -> doProcess(exchange, callback, processors, idx))); + camelContext.getReactiveExecutor().schedule(() -> doProcess(exchange, callback, processors, false))); } else { ExchangeHelper.copyResults(exchange, exchange); @@ -116,19 +116,19 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo } } - protected boolean continueRouting(List<AsyncProcessor> processors, int index, Exchange exchange) { + protected boolean continueRouting(Iterator<AsyncProcessor> it, Exchange exchange) { Object stop = exchange.getProperty(Exchange.ROUTE_STOP); if (stop != null) { boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop); if (doStop) { - if (log.isTraceEnabled()) { + if (log.isDebugEnabled()) { log.debug("ExchangeId: {} is marked to stop routing: {}", exchange.getExchangeId(), exchange); } return false; } } // continue if there are more processors to route - boolean answer = index < processors.size(); + boolean answer = it.hasNext(); if (log.isTraceEnabled()) { log.trace("ExchangeId: {} should continue routing: {}", exchange.getExchangeId(), answer); } @@ -150,6 +150,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo return id; } + @SuppressWarnings("unchecked") public List<Processor> getProcessors() { return (List) processors; }