This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 9e6f960a9f764b8ed7a5bd20a648e718b89e1fd4
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Tue Apr 6 11:35:40 2021 +0200

    CAMEL-16451: camel-core - ExchangePooling for EIPs. Enricher EIP
---
 .../java/org/apache/camel/processor/Enricher.java  | 30 ++++++++++++++--------
 1 file changed, 20 insertions(+), 10 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
index 41232dd..8c5095a 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java
@@ -34,6 +34,7 @@ import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.NormalizedEndpointUri;
+import org.apache.camel.spi.ProcessorExchangeFactory;
 import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
@@ -75,6 +76,7 @@ public class Enricher extends AsyncProcessorSupport 
implements IdAware, RouteIdA
     private boolean shareUnitOfWork;
     private int cacheSize;
     private boolean ignoreInvalidEndpoint;
+    private ProcessorExchangeFactory processorExchangeFactory;
 
     public Enricher(Expression expression) {
         this.expression = expression;
@@ -246,9 +248,6 @@ public class Enricher extends AsyncProcessorSupport 
implements IdAware, RouteIdA
                     } catch (Throwable e) {
                         // if the aggregationStrategy threw an exception, set 
it on the original exchange
                         exchange.setException(new 
CamelExchangeException("Error occurred during aggregation", exchange, e));
-                        callback.done(false);
-                        // we failed so break out now
-                        return;
                     }
                 }
 
@@ -266,6 +265,9 @@ public class Enricher extends AsyncProcessorSupport 
implements IdAware, RouteIdA
                     ServiceHelper.stopAndShutdownService(endpoint);
                 }
 
+                // and release resource exchange back in pool
+                processorExchangeFactory.release(resourceExchange);
+
                 callback.done(false);
             }
         });
@@ -308,9 +310,6 @@ public class Enricher extends AsyncProcessorSupport 
implements IdAware, RouteIdA
             } catch (Throwable e) {
                 // if the aggregationStrategy threw an exception, set it on 
the original exchange
                 exchange.setException(new CamelExchangeException("Error 
occurred during aggregation", exchange, e));
-                callback.done(true);
-                // we failed so break out now
-                return true;
             }
         }
 
@@ -328,6 +327,9 @@ public class Enricher extends AsyncProcessorSupport 
implements IdAware, RouteIdA
             ServiceHelper.stopAndShutdownService(endpoint);
         }
 
+        // and release resource exchange back in pool
+        processorExchangeFactory.release(resourceExchange);
+
         callback.done(true);
         return true;
     }
@@ -387,7 +389,7 @@ public class Enricher extends AsyncProcessorSupport 
implements IdAware, RouteIdA
      */
     protected Exchange createResourceExchange(Exchange source, ExchangePattern 
pattern) {
         // copy exchange, and do not share the unit of work
-        Exchange target = ExchangeHelper.createCorrelatedCopy(source, false);
+        Exchange target = 
processorExchangeFactory.createCorrelatedCopy(source, false);
         target.setPattern(pattern);
 
         // if we share unit of work, we need to prepare the resource exchange
@@ -415,14 +417,22 @@ public class Enricher extends AsyncProcessorSupport 
implements IdAware, RouteIdA
     }
 
     @Override
-    protected void doStart() throws Exception {
+    protected void doBuild() throws Exception {
+        // create a per processor exchange factory
+        this.processorExchangeFactory = 
getCamelContext().adapt(ExtendedCamelContext.class)
+                
.getProcessorExchangeFactory().newProcessorExchangeFactory(this);
+
         if (aggregationStrategy == null) {
             aggregationStrategy = defaultAggregationStrategy();
         }
         if (aggregationStrategy instanceof CamelContextAware) {
             ((CamelContextAware) 
aggregationStrategy).setCamelContext(camelContext);
         }
+        ServiceHelper.buildService(processorExchangeFactory);
+    }
 
+    @Override
+    protected void doStart() throws Exception {
         if (producerCache == null) {
             if (cacheSize < 0) {
                 producerCache = new EmptyProducerCache(this, camelContext);
@@ -433,12 +443,12 @@ public class Enricher extends AsyncProcessorSupport 
implements IdAware, RouteIdA
             }
         }
 
-        ServiceHelper.startService(producerCache, aggregationStrategy);
+        ServiceHelper.startService(processorExchangeFactory, producerCache, 
aggregationStrategy);
     }
 
     @Override
     protected void doStop() throws Exception {
-        ServiceHelper.stopService(aggregationStrategy, producerCache);
+        ServiceHelper.stopService(aggregationStrategy, producerCache, 
processorExchangeFactory);
     }
 
     private static class CopyAggregationStrategy implements 
AggregationStrategy {

Reply via email to