This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 8d878f3 CAMEL-16451: camel-core - ExchangePooling for EIPs. Multicast EIP 8d878f3 is described below commit 8d878f323864d5a1e31de3f5a1e92b82a3f78a5e Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Apr 6 20:20:53 2021 +0200 CAMEL-16451: camel-core - ExchangePooling for EIPs. Multicast EIP --- .../engine/PooledProcessorExchangeFactory.java | 4 +-- .../impl/engine/PrototypeExchangeFactory.java | 2 +- .../engine/PrototypeProcessorExchangeFactory.java | 2 +- .../apache/camel/processor/MulticastProcessor.java | 40 +++++++++++++++++++--- .../org/apache/camel/reifier/MulticastReifier.java | 8 +++++ 5 files changed, 47 insertions(+), 9 deletions(-) diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java index ab86b3d..1826659 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java @@ -67,9 +67,7 @@ public class PooledProcessorExchangeFactory extends PrototypeProcessorExchangeFa // do not reuse message id on copy pe.getIn().setMessageId(null); // do not share the unit of work - if (pe.getUnitOfWork() != null) { - pe.getUnitOfWork().reset(); - } + pe.setUnitOfWork(null); if (handover) { // Need to hand over the completion for async invocation pe.adapt(ExtendedExchange.class).handoverCompletions(exchange); diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java index da06d70..9752f4d 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java @@ -152,7 +152,7 @@ public class PrototypeExchangeFactory extends PooledObjectFactorySupport<Exchang boolean leak = created + acquired > released + discarded; if (leak) { long leaks = (created + acquired) - (released + discarded); - log.info( + log.warn( "{} {} ({}) usage (leaks detected: {}) [pooled: {}, created: {}, acquired: {} released: {}, discarded: {}]", name, id, uri, leaks, pooled, created, acquired, released, discarded); } else { diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java index e4f4d18..3a4a1f1 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java @@ -130,7 +130,7 @@ public class PrototypeProcessorExchangeFactory extends PooledObjectFactorySuppor boolean leak = created + acquired > released + discarded; if (leak) { long leaks = (created + acquired) - (released + discarded); - log.info( + log.warn( "{} {} ({}) usage (leaks detected: {}) [pooled: {}, created: {}, acquired: {} released: {}, discarded: {}]", name, rid, pid, leaks, pooled, created, acquired, released, discarded); } else { diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 1a2b977..d157327 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -58,6 +58,7 @@ import org.apache.camel.processor.errorhandler.ErrorHandlerSupport; import org.apache.camel.spi.ErrorHandlerAware; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.InternalProcessorFactory; +import org.apache.camel.spi.ProcessorExchangeFactory; import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.spi.RouteIdAware; import org.apache.camel.spi.UnitOfWork; @@ -165,6 +166,7 @@ public class MulticastProcessor extends AsyncProcessorSupport private final InternalProcessorFactory internalProcessorFactory; private final Route route; private final ReactiveExecutor reactiveExecutor; + private ProcessorExchangeFactory processorExchangeFactory; private Processor errorHandler; private String id; private String routeId; @@ -264,6 +266,14 @@ public class MulticastProcessor extends AsyncProcessorSupport return errorHandler; } + public ProcessorExchangeFactory getProcessorExchangeFactory() { + return processorExchangeFactory; + } + + public void setProcessorExchangeFactory(ProcessorExchangeFactory processorExchangeFactory) { + this.processorExchangeFactory = processorExchangeFactory; + } + @Override public String getTraceLabel() { return "multicast"; @@ -275,6 +285,11 @@ public class MulticastProcessor extends AsyncProcessorSupport @Override protected void doBuild() throws Exception { + if (processorExchangeFactory != null) { + processorExchangeFactory.setId(id); + processorExchangeFactory.setRouteId(routeId); + } + // eager load classes Object dummy = new MulticastReactiveTask(); LOG.trace("Loaded {}", dummy.getClass().getName()); @@ -288,6 +303,8 @@ public class MulticastProcessor extends AsyncProcessorSupport } Object dummy5 = new DefaultProcessorExchangePair(0, null, null, null); LOG.trace("Loaded {}", dummy5.getClass().getName()); + + ServiceHelper.buildService(processorExchangeFactory); } @Override @@ -298,6 +315,8 @@ public class MulticastProcessor extends AsyncProcessorSupport wrapInErrorHandler(route, exchange, processor); } } + + ServiceHelper.initService(processorExchangeFactory); } @Override @@ -772,6 +791,14 @@ public class MulticastProcessor extends AsyncProcessorSupport Exchange original, Exchange subExchange, final Iterable<ProcessorExchangePair> pairs, AsyncCallback callback, boolean doneSync, boolean forceExhaust) { + if (processorExchangeFactory != null) { + // the exchanges on the pairs was created with a factory, so they should be released + Iterator<ProcessorExchangePair> it = pairs.iterator(); + while (it.hasNext()) { + ProcessorExchangePair pair = it.next(); + processorExchangeFactory.release(pair.getExchange()); + } + } // we are done so close the pairs iterator if (pairs instanceof Closeable) { IOHelper.close((Closeable) pairs, "pairs", LOG); @@ -897,7 +924,12 @@ public class MulticastProcessor extends AsyncProcessorSupport int index = 0; for (Processor processor : processors) { // copy exchange, and do not share the unit of work - Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); + Exchange copy; + if (processorExchangeFactory != null) { + copy = processorExchangeFactory.createCorrelatedCopy(exchange, false); + } else { + copy = ExchangeHelper.createCorrelatedCopy(exchange, false); + } if (streamCache != null) { if (index > 0) { @@ -1082,7 +1114,7 @@ public class MulticastProcessor extends AsyncProcessorSupport ((CamelContextAware) aggregationStrategy).setCamelContext(camelContext); } - ServiceHelper.startService(aggregationStrategy, processors); + ServiceHelper.startService(aggregationStrategy, processors, processorExchangeFactory); } /** @@ -1099,12 +1131,12 @@ public class MulticastProcessor extends AsyncProcessorSupport @Override protected void doStop() throws Exception { - ServiceHelper.stopService(processors, errorHandlers, aggregationStrategy); + ServiceHelper.stopService(processors, errorHandlers, aggregationStrategy, processorExchangeFactory); } @Override protected void doShutdown() throws Exception { - ServiceHelper.stopAndShutdownServices(processors, errorHandlers, aggregationStrategy); + ServiceHelper.stopAndShutdownServices(processors, errorHandlers, aggregationStrategy, processorExchangeFactory); // only clear error handlers when shutting down errorHandlers.clear(); diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java index a7ff33d..80632ad 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java @@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService; import org.apache.camel.AggregationStrategy; import org.apache.camel.CamelContextAware; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.model.MulticastDefinition; @@ -30,6 +31,7 @@ import org.apache.camel.processor.MulticastProcessor; import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy; import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; +import org.apache.camel.spi.ProcessorExchangeFactory; public class MulticastReifier extends ProcessorReifier<MulticastDefinition> { @@ -77,6 +79,12 @@ public class MulticastReifier extends ProcessorReifier<MulticastDefinition> { camelContext, route, list, strategy, isParallelProcessing, threadPool, shutdownThreadPool, isStreaming, isStopOnException, timeout, definition.getOnPrepare(), isShareUnitOfWork, isParallelAggregate, isStopOnAggregateException); + + // multicast EIP supports exchange pooling so lets inject the factory for this + ProcessorExchangeFactory pef = camelContext.adapt(ExtendedCamelContext.class) + .getProcessorExchangeFactory().newProcessorExchangeFactory(answer); + answer.setProcessorExchangeFactory(pef); + return answer; }