This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d878f3  CAMEL-16451: camel-core - ExchangePooling for EIPs. Multicast 
EIP
8d878f3 is described below

commit 8d878f323864d5a1e31de3f5a1e92b82a3f78a5e
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Tue Apr 6 20:20:53 2021 +0200

    CAMEL-16451: camel-core - ExchangePooling for EIPs. Multicast EIP
---
 .../engine/PooledProcessorExchangeFactory.java     |  4 +--
 .../impl/engine/PrototypeExchangeFactory.java      |  2 +-
 .../engine/PrototypeProcessorExchangeFactory.java  |  2 +-
 .../apache/camel/processor/MulticastProcessor.java | 40 +++++++++++++++++++---
 .../org/apache/camel/reifier/MulticastReifier.java |  8 +++++
 5 files changed, 47 insertions(+), 9 deletions(-)

diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
index ab86b3d..1826659 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
@@ -67,9 +67,7 @@ public class PooledProcessorExchangeFactory extends 
PrototypeProcessorExchangeFa
             // do not reuse message id on copy
             pe.getIn().setMessageId(null);
             // do not share the unit of work
-            if (pe.getUnitOfWork() != null) {
-                pe.getUnitOfWork().reset();
-            }
+            pe.setUnitOfWork(null);
             if (handover) {
                 // Need to hand over the completion for async invocation
                 pe.adapt(ExtendedExchange.class).handoverCompletions(exchange);
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java
index da06d70..9752f4d 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java
@@ -152,7 +152,7 @@ public class PrototypeExchangeFactory extends 
PooledObjectFactorySupport<Exchang
                 boolean leak = created + acquired > released + discarded;
                 if (leak) {
                     long leaks = (created + acquired) - (released + discarded);
-                    log.info(
+                    log.warn(
                             "{} {} ({}) usage (leaks detected: {}) [pooled: 
{}, created: {}, acquired: {} released: {}, discarded: {}]",
                             name, id, uri, leaks, pooled, created, acquired, 
released, discarded);
                 } else {
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java
index e4f4d18..3a4a1f1 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java
@@ -130,7 +130,7 @@ public class PrototypeProcessorExchangeFactory extends 
PooledObjectFactorySuppor
                 boolean leak = created + acquired > released + discarded;
                 if (leak) {
                     long leaks = (created + acquired) - (released + discarded);
-                    log.info(
+                    log.warn(
                             "{} {} ({}) usage (leaks detected: {}) [pooled: 
{}, created: {}, acquired: {} released: {}, discarded: {}]",
                             name, rid, pid, leaks, pooled, created, acquired, 
released, discarded);
                 } else {
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 1a2b977..d157327 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -58,6 +58,7 @@ import 
org.apache.camel.processor.errorhandler.ErrorHandlerSupport;
 import org.apache.camel.spi.ErrorHandlerAware;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.InternalProcessorFactory;
+import org.apache.camel.spi.ProcessorExchangeFactory;
 import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.spi.UnitOfWork;
@@ -165,6 +166,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
     private final InternalProcessorFactory internalProcessorFactory;
     private final Route route;
     private final ReactiveExecutor reactiveExecutor;
+    private ProcessorExchangeFactory processorExchangeFactory;
     private Processor errorHandler;
     private String id;
     private String routeId;
@@ -264,6 +266,14 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         return errorHandler;
     }
 
+    public ProcessorExchangeFactory getProcessorExchangeFactory() {
+        return processorExchangeFactory;
+    }
+
+    public void setProcessorExchangeFactory(ProcessorExchangeFactory 
processorExchangeFactory) {
+        this.processorExchangeFactory = processorExchangeFactory;
+    }
+
     @Override
     public String getTraceLabel() {
         return "multicast";
@@ -275,6 +285,11 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
 
     @Override
     protected void doBuild() throws Exception {
+        if (processorExchangeFactory != null) {
+            processorExchangeFactory.setId(id);
+            processorExchangeFactory.setRouteId(routeId);
+        }
+
         // eager load classes
         Object dummy = new MulticastReactiveTask();
         LOG.trace("Loaded {}", dummy.getClass().getName());
@@ -288,6 +303,8 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         }
         Object dummy5 = new DefaultProcessorExchangePair(0, null, null, null);
         LOG.trace("Loaded {}", dummy5.getClass().getName());
+
+        ServiceHelper.buildService(processorExchangeFactory);
     }
 
     @Override
@@ -298,6 +315,8 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
                 wrapInErrorHandler(route, exchange, processor);
             }
         }
+
+        ServiceHelper.initService(processorExchangeFactory);
     }
 
     @Override
@@ -772,6 +791,14 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
             Exchange original, Exchange subExchange, final 
Iterable<ProcessorExchangePair> pairs,
             AsyncCallback callback, boolean doneSync, boolean forceExhaust) {
 
+        if (processorExchangeFactory != null) {
+            // the exchanges on the pairs was created with a factory, so they 
should be released
+            Iterator<ProcessorExchangePair> it = pairs.iterator();
+            while (it.hasNext()) {
+                ProcessorExchangePair pair = it.next();
+                processorExchangeFactory.release(pair.getExchange());
+            }
+        }
         // we are done so close the pairs iterator
         if (pairs instanceof Closeable) {
             IOHelper.close((Closeable) pairs, "pairs", LOG);
@@ -897,7 +924,12 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         int index = 0;
         for (Processor processor : processors) {
             // copy exchange, and do not share the unit of work
-            Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, 
false);
+            Exchange copy;
+            if (processorExchangeFactory != null) {
+                copy = processorExchangeFactory.createCorrelatedCopy(exchange, 
false);
+            } else {
+                copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
+            }
 
             if (streamCache != null) {
                 if (index > 0) {
@@ -1082,7 +1114,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
             ((CamelContextAware) 
aggregationStrategy).setCamelContext(camelContext);
         }
 
-        ServiceHelper.startService(aggregationStrategy, processors);
+        ServiceHelper.startService(aggregationStrategy, processors, 
processorExchangeFactory);
     }
 
     /**
@@ -1099,12 +1131,12 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
 
     @Override
     protected void doStop() throws Exception {
-        ServiceHelper.stopService(processors, errorHandlers, 
aggregationStrategy);
+        ServiceHelper.stopService(processors, errorHandlers, 
aggregationStrategy, processorExchangeFactory);
     }
 
     @Override
     protected void doShutdown() throws Exception {
-        ServiceHelper.stopAndShutdownServices(processors, errorHandlers, 
aggregationStrategy);
+        ServiceHelper.stopAndShutdownServices(processors, errorHandlers, 
aggregationStrategy, processorExchangeFactory);
         // only clear error handlers when shutting down
         errorHandlers.clear();
 
diff --git 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java
 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java
index a7ff33d..80632ad 100644
--- 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java
+++ 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java
@@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.AggregationStrategy;
 import org.apache.camel.CamelContextAware;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.model.MulticastDefinition;
@@ -30,6 +31,7 @@ import org.apache.camel.processor.MulticastProcessor;
 import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
 import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
+import org.apache.camel.spi.ProcessorExchangeFactory;
 
 public class MulticastReifier extends ProcessorReifier<MulticastDefinition> {
 
@@ -77,6 +79,12 @@ public class MulticastReifier extends 
ProcessorReifier<MulticastDefinition> {
                 camelContext, route, list, strategy, isParallelProcessing, 
threadPool, shutdownThreadPool, isStreaming,
                 isStopOnException, timeout, definition.getOnPrepare(), 
isShareUnitOfWork, isParallelAggregate,
                 isStopOnAggregateException);
+
+        // multicast EIP supports exchange pooling so lets inject the factory 
for this
+        ProcessorExchangeFactory pef = 
camelContext.adapt(ExtendedCamelContext.class)
+                
.getProcessorExchangeFactory().newProcessorExchangeFactory(answer);
+        answer.setProcessorExchangeFactory(pef);
+
         return answer;
     }
 

Reply via email to