This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 5485d343755a30cf5e10eb9f5165a682f9c94278 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Apr 7 18:11:23 2021 +0200 CAMEL-16462: camel-core - Optimize RecipientList EIP to reduce object allocations. --- .../apache/camel/processor/MulticastProcessor.java | 34 +++++++++++----------- .../camel/processor/RecipientListProcessor.java | 2 +- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java index cf76c89..39e59a7 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -215,8 +215,8 @@ public class MulticastProcessor extends AsyncProcessorSupport this.shareUnitOfWork = shareUnitOfWork; this.parallelAggregate = parallelAggregate; this.stopOnAggregateException = stopOnAggregateException; - if (this instanceof Splitter || this instanceof RecipientListProcessor) { - // not supported for splitter/recipient-list + if (this instanceof Splitter) { + // not supported for splitter this.processorExchangeFactory = null; } else { this.processorExchangeFactory = camelContext.adapt(ExtendedCamelContext.class) @@ -788,21 +788,6 @@ public class MulticastProcessor extends AsyncProcessorSupport Exchange original, Exchange subExchange, final Iterable<ProcessorExchangePair> pairs, AsyncCallback callback, boolean doneSync, boolean forceExhaust) { - if (processorExchangeFactory != null && pairs != null) { - // the exchanges on the pairs was created with a factory, so they should be released - try { - for (ProcessorExchangePair pair : pairs) { - processorExchangeFactory.release(pair.getExchange()); - } - } catch (Throwable e) { - LOG.warn("Error releasing exchange due to " + e.getMessage() + ". This exception is ignored.", e); - } - } - // we are done so close the pairs iterator - if (pairs instanceof Closeable) { - IOHelper.close((Closeable) pairs, "pairs", LOG); - } - AggregationStrategy strategy = getAggregationStrategy(subExchange); // invoke the on completion callback if (strategy != null) { @@ -835,6 +820,21 @@ public class MulticastProcessor extends AsyncProcessorSupport } } + if (processorExchangeFactory != null && pairs != null) { + // the exchanges on the pairs was created with a factory, so they should be released + try { + for (ProcessorExchangePair pair : pairs) { + processorExchangeFactory.release(pair.getExchange()); + } + } catch (Throwable e) { + LOG.warn("Error releasing exchange due to " + e.getMessage() + ". This exception is ignored.", e); + } + } + // we are done so close the pairs iterator + if (pairs instanceof Closeable) { + IOHelper.close((Closeable) pairs, "pairs", LOG); + } + // .. and then if there was an exception we need to configure the redelivery exhaust // for example the noErrorHandler will not cause redelivery exhaust so if this error // handled has been in use, then the exhaust would be false (if not forced) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java index 720c937..0cba21d 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java @@ -288,7 +288,7 @@ public class RecipientListProcessor extends MulticastProcessor { int index, Endpoint endpoint, Producer producer, Exchange exchange, ExchangePattern pattern, boolean prototypeEndpoint) { // copy exchange, and do not share the unit of work - Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); + Exchange copy = processorExchangeFactory.createCorrelatedCopy(exchange, false); // if we share unit of work, we need to prepare the child exchange if (isShareUnitOfWork()) {