This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new fbed41e  CAMEL-16451: camel-core - ExchangePooling for EIPs. Multicast 
EIP
fbed41e is described below

commit fbed41eb2135368ae22046f1cdeeeb16de0d5d1e
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed Apr 7 06:54:54 2021 +0200

    CAMEL-16451: camel-core - ExchangePooling for EIPs. Multicast EIP
---
 .../apache/camel/processor/MulticastProcessor.java | 36 ++++++++++------------
 .../camel/processor/RecipientListProcessor.java    |  2 +-
 .../org/apache/camel/reifier/MulticastReifier.java |  7 -----
 3 files changed, 17 insertions(+), 28 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index d157327..d9390cc 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -162,11 +162,11 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
     }
 
     protected final Processor onPrepare;
+    protected final ProcessorExchangeFactory processorExchangeFactory;
     private final CamelContext camelContext;
     private final InternalProcessorFactory internalProcessorFactory;
     private final Route route;
     private final ReactiveExecutor reactiveExecutor;
-    private ProcessorExchangeFactory processorExchangeFactory;
     private Processor errorHandler;
     private String id;
     private String routeId;
@@ -229,6 +229,13 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         this.shareUnitOfWork = shareUnitOfWork;
         this.parallelAggregate = parallelAggregate;
         this.stopOnAggregateException = stopOnAggregateException;
+        if (this instanceof Splitter) {
+            // not supported for splitter
+            this.processorExchangeFactory = null;
+        } else {
+            this.processorExchangeFactory = 
camelContext.adapt(ExtendedCamelContext.class)
+                    
.getProcessorExchangeFactory().newProcessorExchangeFactory(this);
+        }
     }
 
     @Override
@@ -266,14 +273,6 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         return errorHandler;
     }
 
-    public ProcessorExchangeFactory getProcessorExchangeFactory() {
-        return processorExchangeFactory;
-    }
-
-    public void setProcessorExchangeFactory(ProcessorExchangeFactory 
processorExchangeFactory) {
-        this.processorExchangeFactory = processorExchangeFactory;
-    }
-
     @Override
     public String getTraceLabel() {
         return "multicast";
@@ -791,12 +790,14 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
             Exchange original, Exchange subExchange, final 
Iterable<ProcessorExchangePair> pairs,
             AsyncCallback callback, boolean doneSync, boolean forceExhaust) {
 
-        if (processorExchangeFactory != null) {
+        if (processorExchangeFactory != null && pairs != null) {
             // the exchanges on the pairs was created with a factory, so they 
should be released
-            Iterator<ProcessorExchangePair> it = pairs.iterator();
-            while (it.hasNext()) {
-                ProcessorExchangePair pair = it.next();
-                processorExchangeFactory.release(pair.getExchange());
+            try {
+                for (ProcessorExchangePair pair : pairs) {
+                    processorExchangeFactory.release(pair.getExchange());
+                }
+            } catch (Throwable e) {
+                LOG.warn("Error releasing exchange due to " + e.getMessage() + 
". This exception is ignored.", e);
             }
         }
         // we are done so close the pairs iterator
@@ -924,12 +925,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         int index = 0;
         for (Processor processor : processors) {
             // copy exchange, and do not share the unit of work
-            Exchange copy;
-            if (processorExchangeFactory != null) {
-                copy = processorExchangeFactory.createCorrelatedCopy(exchange, 
false);
-            } else {
-                copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
-            }
+            Exchange copy = 
processorExchangeFactory.createCorrelatedCopy(exchange, false);
 
             if (streamCache != null) {
                 if (index > 0) {
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index e1dacf3..4d9f76f 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -241,7 +241,7 @@ public class RecipientListProcessor extends 
MulticastProcessor {
             int index, Endpoint endpoint, Producer producer,
             Exchange exchange, ExchangePattern pattern, boolean 
prototypeEndpoint) {
         // copy exchange, and do not share the unit of work
-        Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
+        Exchange copy = 
processorExchangeFactory.createCorrelatedCopy(exchange, false);
 
         // if we share unit of work, we need to prepare the child exchange
         if (isShareUnitOfWork()) {
diff --git 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java
 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java
index 80632ad..2e376f3 100644
--- 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java
+++ 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java
@@ -22,7 +22,6 @@ import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.AggregationStrategy;
 import org.apache.camel.CamelContextAware;
-import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.model.MulticastDefinition;
@@ -31,7 +30,6 @@ import org.apache.camel.processor.MulticastProcessor;
 import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
 import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
-import org.apache.camel.spi.ProcessorExchangeFactory;
 
 public class MulticastReifier extends ProcessorReifier<MulticastDefinition> {
 
@@ -80,11 +78,6 @@ public class MulticastReifier extends 
ProcessorReifier<MulticastDefinition> {
                 isStopOnException, timeout, definition.getOnPrepare(), 
isShareUnitOfWork, isParallelAggregate,
                 isStopOnAggregateException);
 
-        // multicast EIP supports exchange pooling so lets inject the factory 
for this
-        ProcessorExchangeFactory pef = 
camelContext.adapt(ExtendedCamelContext.class)
-                
.getProcessorExchangeFactory().newProcessorExchangeFactory(answer);
-        answer.setProcessorExchangeFactory(pef);
-
         return answer;
     }
 

Reply via email to