This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 5a16e22 CAMEL-14354: Optimize core
5a16e22 is described below
commit 5a16e223c8f962dbba7693fba5db2b1514f42841
Author: Claus Ibsen <[email protected]>
AuthorDate: Sat Jan 25 10:09:50 2020 +0100
CAMEL-14354: Optimize core
---
.../java/org/apache/camel/processor/Pipeline.java | 35 ++++++++--------------
.../org/apache/camel/support/ExchangeHelper.java | 1 +
2 files changed, 13 insertions(+), 23 deletions(-)
diff --git
a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
index 0fe1ef6..0c45a0e 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -40,7 +40,6 @@ import org.apache.camel.support.service.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import static org.apache.camel.processor.PipelineHelper.continueProcessing;
/**
@@ -53,7 +52,8 @@ public class Pipeline extends AsyncProcessorSupport
implements Navigate<Processo
private final CamelContext camelContext;
private final ReactiveExecutor reactiveExecutor;
- private List<AsyncProcessor> processors;
+ private final List<AsyncProcessor> processors;
+ private final int size;
private String id;
private String routeId;
@@ -61,6 +61,7 @@ public class Pipeline extends AsyncProcessorSupport
implements Navigate<Processo
this.camelContext = camelContext;
this.reactiveExecutor =
camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
this.processors =
processors.stream().map(AsyncProcessorConverterHelper::convert).collect(Collectors.toList());
+ this.size = processors.size();
}
public static Processor newInstance(CamelContext camelContext,
List<Processor> processors) {
@@ -92,18 +93,21 @@ public class Pipeline extends AsyncProcessorSupport
implements Navigate<Processo
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
if (exchange.isTransacted()) {
- reactiveExecutor.scheduleSync(() ->
Pipeline.this.doProcess(exchange, callback, processors, new AtomicInteger(),
true));
+ reactiveExecutor.scheduleSync(() ->
Pipeline.this.doProcess(exchange, callback, processors, size, new
AtomicInteger(), true));
} else {
- reactiveExecutor.scheduleMain(() ->
Pipeline.this.doProcess(exchange, callback, processors, new AtomicInteger(),
true));
+ reactiveExecutor.scheduleMain(() ->
Pipeline.this.doProcess(exchange, callback, processors, size, new
AtomicInteger(), true));
}
return false;
}
- protected void doProcess(Exchange exchange, AsyncCallback callback,
List<AsyncProcessor> processors, AtomicInteger index, boolean first) {
+ protected void doProcess(Exchange exchange, AsyncCallback callback,
List<AsyncProcessor> processors, int size, AtomicInteger index, boolean first) {
// optimize to use an atomic index counter for tracking how long we
are in the processors list (uses less memory than iterator on array list)
- if (continueRouting(processors, index, exchange)
- && (first || continueProcessing(exchange, "so breaking out of
pipeline", LOG))) {
+ boolean stop = exchange.isRouteStop();
+ boolean more = index.get() < size;
+
+ if (!stop && more &&
+ (first || continueProcessing(exchange, "so breaking out of
pipeline", LOG))) {
// prepare for next run
ExchangeHelper.prepareOutToIn(exchange);
@@ -112,7 +116,7 @@ public class Pipeline extends AsyncProcessorSupport
implements Navigate<Processo
AsyncProcessor processor = processors.get(index.getAndIncrement());
processor.process(exchange, doneSync ->
- reactiveExecutor.schedule(() -> doProcess(exchange,
callback, processors, index, false)));
+ reactiveExecutor.schedule(() -> doProcess(exchange,
callback, processors, size, index, false)));
} else {
ExchangeHelper.copyResults(exchange, exchange);
@@ -127,21 +131,6 @@ public class Pipeline extends AsyncProcessorSupport
implements Navigate<Processo
}
}
- protected boolean continueRouting(List<AsyncProcessor> list, AtomicInteger
index, Exchange exchange) {
- if (exchange.isRouteStop()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("ExchangeId: {} is marked to stop routing: {}",
exchange.getExchangeId(), exchange);
- }
- return false;
- }
- // continue if there are more processors to route
- boolean answer = index.get() < list.size();
- if (LOG.isTraceEnabled()) {
- LOG.trace("ExchangeId: {} should continue routing: {}",
exchange.getExchangeId(), answer);
- }
- return answer;
- }
-
@Override
protected void doStart() throws Exception {
ServiceHelper.startService(processors);
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index 0696b72..5f6b938 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -680,6 +680,7 @@ public final class ExchangeHelper {
* @return <tt>true</tt> if handled already by error handler,
<tt>false</tt> otherwise
*/
public static boolean hasExceptionBeenHandledByErrorHandler(Exchange
exchange) {
+ // TODO: optimize this
return
Boolean.TRUE.equals(exchange.getProperty(Exchange.ERRORHANDLER_HANDLED));
}