This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 6011aa0bb482efa4a4a12db94b05f6e4da881275 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Fri Apr 9 13:14:31 2021 +0200 CAMEL-16462: camel-core - Optimize Splitter EIP to reduce object allocations. --- .../apache/camel/spi/ProcessorExchangeFactory.java | 9 +++- .../engine/PooledProcessorExchangeFactory.java | 50 ++++++++++++++++------ .../engine/PrototypeProcessorExchangeFactory.java | 5 +++ .../apache/camel/processor/MulticastProcessor.java | 9 +--- .../java/org/apache/camel/processor/Splitter.java | 19 +++++--- 5 files changed, 65 insertions(+), 27 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java index c48ad15..214635f 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java @@ -57,7 +57,14 @@ public interface ProcessorExchangeFactory extends PooledObjectFactory<Exchange>, /** * Gets a copy of the given {@link Exchange} * - * @param exchange original copy of the exchange + * @param exchange original exchange + */ + Exchange createCopy(Exchange exchange); + + /** + * Gets a copy of the given {@link Exchange} the the copy is correlated to the source + * + * @param exchange original exchange * @param handover whether the on completion callbacks should be handed over to the new copy. */ Exchange createCorrelatedCopy(Exchange exchange, boolean handover); diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java index 1826659..fe3c1d2 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java @@ -58,26 +58,36 @@ public class PooledProcessorExchangeFactory extends PrototypeProcessorExchangeFa } @Override - public Exchange createCorrelatedCopy(Exchange exchange, boolean handover) { - Exchange answer = pool.poll(); + public Exchange createCopy(Exchange exchange) { + ExtendedExchange answer = (ExtendedExchange) pool.poll(); if (answer == null) { + if (statisticsEnabled) { + statistics.created.increment(); + } // create a new exchange as there was no free from the pool - PooledExchange pe = new DefaultPooledExchange(exchange); - ExchangeHelper.copyResults(pe, exchange); - // do not reuse message id on copy - pe.getIn().setMessageId(null); - // do not share the unit of work - pe.setUnitOfWork(null); - if (handover) { - // Need to hand over the completion for async invocation - pe.adapt(ExtendedExchange.class).handoverCompletions(exchange); + answer = new DefaultPooledExchange(exchange); + } else { + if (statisticsEnabled) { + statistics.acquired.increment(); } - // set a correlation id so we can track back the original exchange - pe.setProperty(ExchangePropertyKey.CORRELATION_ID, exchange.getExchangeId()); + // reset exchange for reuse + PooledExchange ee = (PooledExchange) answer; + ee.reset(System.currentTimeMillis()); + } + + ExchangeHelper.copyResults(answer, exchange); + return answer; + } + + @Override + public Exchange createCorrelatedCopy(Exchange exchange, boolean handover) { + ExtendedExchange answer = (ExtendedExchange) pool.poll(); + if (answer == null) { if (statisticsEnabled) { statistics.created.increment(); } - answer = pe; + // create a new exchange as there was no free from the pool + answer = new DefaultPooledExchange(exchange); } else { if (statisticsEnabled) { statistics.acquired.increment(); @@ -86,6 +96,18 @@ public class PooledProcessorExchangeFactory extends PrototypeProcessorExchangeFa PooledExchange ee = (PooledExchange) answer; ee.reset(System.currentTimeMillis()); } + + ExchangeHelper.copyResults(answer, exchange); + // do not reuse message id on copy + answer.getIn().setMessageId(null); + // do not share the unit of work + answer.setUnitOfWork(null); + if (handover) { + // Need to hand over the completion for async invocation + answer.handoverCompletions(exchange); + } + // set a correlation id so we can track back the original exchange + answer.setProperty(ExchangePropertyKey.CORRELATION_ID, exchange.getExchangeId()); return answer; } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java index 3a4a1f1..65de8f4 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java @@ -81,6 +81,11 @@ public class PrototypeProcessorExchangeFactory extends PooledObjectFactorySuppor } @Override + public Exchange createCopy(Exchange exchange) { + return exchange.copy(); + } + + @Override public Exchange createCorrelatedCopy(Exchange exchange, boolean handover) { return ExchangeHelper.createCorrelatedCopy(exchange, handover); } diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 39e59a7..199a392 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -215,13 +215,8 @@ public class MulticastProcessor extends AsyncProcessorSupport this.shareUnitOfWork = shareUnitOfWork; this.parallelAggregate = parallelAggregate; this.stopOnAggregateException = stopOnAggregateException; - if (this instanceof Splitter) { - // not supported for splitter - this.processorExchangeFactory = null; - } else { - this.processorExchangeFactory = camelContext.adapt(ExtendedCamelContext.class) - .getProcessorExchangeFactory().newProcessorExchangeFactory(this); - } + this.processorExchangeFactory = camelContext.adapt(ExtendedCamelContext.class) + .getProcessorExchangeFactory().newProcessorExchangeFactory(this); } @Override diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java index 64fd105..a90091d 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java @@ -190,7 +190,7 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac // this avoids any side effect reflected upon the incoming exchange final Object value; final Iterator<?> iterator; - private final Exchange copy; + private Exchange copy; private final Route route; private final Exchange original; @@ -251,7 +251,7 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac if (part != null) { // create a correlated copy as the new exchange to be routed in the splitter from the copy // and do not share the unit of work - Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false); + Exchange newExchange = processorExchangeFactory.createCorrelatedCopy(copy, false); // If the splitter has an aggregation strategy // then the StreamCache created by the child routes must not be // closed by the unit of work of the child route, but by the unit of @@ -284,7 +284,12 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac @Override public void close() throws IOException { - IOHelper.closeIterator(value); + if (copy != null) { + processorExchangeFactory.release(copy); + // null copy to avoid releasing it back again as close may be called multiple times + copy = null; + IOHelper.closeIterator(value); + } } } @@ -337,8 +342,12 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac return expression; } - private static Exchange copyAndPrepareSubExchange(Exchange exchange, boolean preserveExchangeId) { - Exchange answer = ExchangeHelper.createCopy(exchange, preserveExchangeId); + private Exchange copyAndPrepareSubExchange(Exchange exchange, boolean preserveExchangeId) { + Exchange answer = processorExchangeFactory.createCopy(exchange); + if (preserveExchangeId) { + // must preserve exchange id + answer.setExchangeId(exchange.getExchangeId()); + } if (exchange.getContext().isMessageHistory()) { // we do not want to copy the message history for splitted sub-messages answer.removeProperty(ExchangePropertyKey.MESSAGE_HISTORY);