This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new fbed41e CAMEL-16451: camel-core - ExchangePooling for EIPs. Multicast EIP fbed41e is described below commit fbed41eb2135368ae22046f1cdeeeb16de0d5d1e Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Apr 7 06:54:54 2021 +0200 CAMEL-16451: camel-core - ExchangePooling for EIPs. Multicast EIP --- .../apache/camel/processor/MulticastProcessor.java | 36 ++++++++++------------ .../camel/processor/RecipientListProcessor.java | 2 +- .../org/apache/camel/reifier/MulticastReifier.java | 7 ----- 3 files changed, 17 insertions(+), 28 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java index d157327..d9390cc 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -162,11 +162,11 @@ public class MulticastProcessor extends AsyncProcessorSupport } protected final Processor onPrepare; + protected final ProcessorExchangeFactory processorExchangeFactory; private final CamelContext camelContext; private final InternalProcessorFactory internalProcessorFactory; private final Route route; private final ReactiveExecutor reactiveExecutor; - private ProcessorExchangeFactory processorExchangeFactory; private Processor errorHandler; private String id; private String routeId; @@ -229,6 +229,13 @@ public class MulticastProcessor extends AsyncProcessorSupport this.shareUnitOfWork = shareUnitOfWork; this.parallelAggregate = parallelAggregate; this.stopOnAggregateException = stopOnAggregateException; + if (this instanceof Splitter) { + // not supported for splitter + this.processorExchangeFactory = null; + } else { + this.processorExchangeFactory = camelContext.adapt(ExtendedCamelContext.class) + .getProcessorExchangeFactory().newProcessorExchangeFactory(this); + } } @Override @@ -266,14 +273,6 @@ public class MulticastProcessor extends AsyncProcessorSupport return errorHandler; } - public ProcessorExchangeFactory getProcessorExchangeFactory() { - return processorExchangeFactory; - } - - public void setProcessorExchangeFactory(ProcessorExchangeFactory processorExchangeFactory) { - this.processorExchangeFactory = processorExchangeFactory; - } - @Override public String getTraceLabel() { return "multicast"; @@ -791,12 +790,14 @@ public class MulticastProcessor extends AsyncProcessorSupport Exchange original, Exchange subExchange, final Iterable<ProcessorExchangePair> pairs, AsyncCallback callback, boolean doneSync, boolean forceExhaust) { - if (processorExchangeFactory != null) { + if (processorExchangeFactory != null && pairs != null) { // the exchanges on the pairs was created with a factory, so they should be released - Iterator<ProcessorExchangePair> it = pairs.iterator(); - while (it.hasNext()) { - ProcessorExchangePair pair = it.next(); - processorExchangeFactory.release(pair.getExchange()); + try { + for (ProcessorExchangePair pair : pairs) { + processorExchangeFactory.release(pair.getExchange()); + } + } catch (Throwable e) { + LOG.warn("Error releasing exchange due to " + e.getMessage() + ". This exception is ignored.", e); } } // we are done so close the pairs iterator @@ -924,12 +925,7 @@ public class MulticastProcessor extends AsyncProcessorSupport int index = 0; for (Processor processor : processors) { // copy exchange, and do not share the unit of work - Exchange copy; - if (processorExchangeFactory != null) { - copy = processorExchangeFactory.createCorrelatedCopy(exchange, false); - } else { - copy = ExchangeHelper.createCorrelatedCopy(exchange, false); - } + Exchange copy = processorExchangeFactory.createCorrelatedCopy(exchange, false); if (streamCache != null) { if (index > 0) { diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java index e1dacf3..4d9f76f 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java @@ -241,7 +241,7 @@ public class RecipientListProcessor extends MulticastProcessor { int index, Endpoint endpoint, Producer producer, Exchange exchange, ExchangePattern pattern, boolean prototypeEndpoint) { // copy exchange, and do not share the unit of work - Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); + Exchange copy = processorExchangeFactory.createCorrelatedCopy(exchange, false); // if we share unit of work, we need to prepare the child exchange if (isShareUnitOfWork()) { diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java index 80632ad..2e376f3 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java @@ -22,7 +22,6 @@ import java.util.concurrent.ExecutorService; import org.apache.camel.AggregationStrategy; import org.apache.camel.CamelContextAware; -import org.apache.camel.ExtendedCamelContext; import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.model.MulticastDefinition; @@ -31,7 +30,6 @@ import org.apache.camel.processor.MulticastProcessor; import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy; import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; -import org.apache.camel.spi.ProcessorExchangeFactory; public class MulticastReifier extends ProcessorReifier<MulticastDefinition> { @@ -80,11 +78,6 @@ public class MulticastReifier extends ProcessorReifier<MulticastDefinition> { isStopOnException, timeout, definition.getOnPrepare(), isShareUnitOfWork, isParallelAggregate, isStopOnAggregateException); - // multicast EIP supports exchange pooling so lets inject the factory for this - ProcessorExchangeFactory pef = camelContext.adapt(ExtendedCamelContext.class) - .getProcessorExchangeFactory().newProcessorExchangeFactory(answer); - answer.setProcessorExchangeFactory(pef); - return answer; }