This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 9e6f960a9f764b8ed7a5bd20a648e718b89e1fd4 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Apr 6 11:35:40 2021 +0200 CAMEL-16451: camel-core - ExchangePooling for EIPs. Enricher EIP --- .../java/org/apache/camel/processor/Enricher.java | 30 ++++++++++++++-------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java index 41232dd..8c5095a 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java @@ -34,6 +34,7 @@ import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.NormalizedEndpointUri; +import org.apache.camel.spi.ProcessorExchangeFactory; import org.apache.camel.spi.ProducerCache; import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorConverterHelper; @@ -75,6 +76,7 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA private boolean shareUnitOfWork; private int cacheSize; private boolean ignoreInvalidEndpoint; + private ProcessorExchangeFactory processorExchangeFactory; public Enricher(Expression expression) { this.expression = expression; @@ -246,9 +248,6 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA } catch (Throwable e) { // if the aggregationStrategy threw an exception, set it on the original exchange exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e)); - callback.done(false); - // we failed so break out now - return; } } @@ -266,6 +265,9 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA ServiceHelper.stopAndShutdownService(endpoint); } + // and release resource exchange back in pool + processorExchangeFactory.release(resourceExchange); + callback.done(false); } }); @@ -308,9 +310,6 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA } catch (Throwable e) { // if the aggregationStrategy threw an exception, set it on the original exchange exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e)); - callback.done(true); - // we failed so break out now - return true; } } @@ -328,6 +327,9 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA ServiceHelper.stopAndShutdownService(endpoint); } + // and release resource exchange back in pool + processorExchangeFactory.release(resourceExchange); + callback.done(true); return true; } @@ -387,7 +389,7 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA */ protected Exchange createResourceExchange(Exchange source, ExchangePattern pattern) { // copy exchange, and do not share the unit of work - Exchange target = ExchangeHelper.createCorrelatedCopy(source, false); + Exchange target = processorExchangeFactory.createCorrelatedCopy(source, false); target.setPattern(pattern); // if we share unit of work, we need to prepare the resource exchange @@ -415,14 +417,22 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA } @Override - protected void doStart() throws Exception { + protected void doBuild() throws Exception { + // create a per processor exchange factory + this.processorExchangeFactory = getCamelContext().adapt(ExtendedCamelContext.class) + .getProcessorExchangeFactory().newProcessorExchangeFactory(this); + if (aggregationStrategy == null) { aggregationStrategy = defaultAggregationStrategy(); } if (aggregationStrategy instanceof CamelContextAware) { ((CamelContextAware) aggregationStrategy).setCamelContext(camelContext); } + ServiceHelper.buildService(processorExchangeFactory); + } + @Override + protected void doStart() throws Exception { if (producerCache == null) { if (cacheSize < 0) { producerCache = new EmptyProducerCache(this, camelContext); @@ -433,12 +443,12 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA } } - ServiceHelper.startService(producerCache, aggregationStrategy); + ServiceHelper.startService(processorExchangeFactory, producerCache, aggregationStrategy); } @Override protected void doStop() throws Exception { - ServiceHelper.stopService(aggregationStrategy, producerCache); + ServiceHelper.stopService(aggregationStrategy, producerCache, processorExchangeFactory); } private static class CopyAggregationStrategy implements AggregationStrategy {