This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 54c5f2c3242f0d4c7d9e61bc4e911d9fbad556f7
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Mon Apr 5 17:36:34 2021 +0200

    CAMEL-16451: camel-core - ExchangePooling for EIPs. Wiretap EIP
---
 .../apache/camel/spi/ProcessorExchangeFactory.java |  9 ++++++++-
 .../engine/PooledProcessorExchangeFactory.java     | 22 ++++++++++++++++++++++
 .../engine/PrototypeProcessorExchangeFactory.java  |  8 ++++++++
 .../apache/camel/processor/WireTapProcessor.java   | 13 ++++++-------
 4 files changed, 44 insertions(+), 8 deletions(-)

diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java
 
b/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java
index 77142d4..c48ad15 100644
--- 
a/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java
+++ 
b/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java
@@ -16,7 +16,9 @@
  */
 package org.apache.camel.spi;
 
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.NonManagedService;
 import org.apache.camel.Processor;
 
@@ -53,7 +55,7 @@ public interface ProcessorExchangeFactory extends 
PooledObjectFactory<Exchange>,
     ProcessorExchangeFactory newProcessorExchangeFactory(Processor processor);
 
     /**
-     * Creates a copy of the given {@link Exchange}
+     * Gets a copy of the given {@link Exchange}
      *
      * @param exchange original copy of the exchange
      * @param handover whether the on completion callbacks should be handed 
over to the new copy.
@@ -61,6 +63,11 @@ public interface ProcessorExchangeFactory extends 
PooledObjectFactory<Exchange>,
     Exchange createCorrelatedCopy(Exchange exchange, boolean handover);
 
     /**
+     * Gets a new {@link Exchange}
+     */
+    Exchange create(Endpoint fromEndpoint, ExchangePattern exchangePattern);
+
+    /**
      * Releases the exchange back into the pool
      *
      * @param  exchange the exchange
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
index d3c37a6..697e02f 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
@@ -16,7 +16,9 @@
  */
 package org.apache.camel.impl.engine;
 
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.ExtendedExchange;
 import org.apache.camel.PooledExchange;
@@ -90,6 +92,26 @@ public class PooledProcessorExchangeFactory extends 
PrototypeProcessorExchangeFa
     }
 
     @Override
+    public Exchange create(Endpoint fromEndpoint, ExchangePattern 
exchangePattern) {
+        Exchange answer = pool.poll();
+        if (answer == null) {
+            // create a new exchange as there was no free from the pool
+            answer = super.create(fromEndpoint, exchangePattern);
+            if (statisticsEnabled) {
+                statistics.created.increment();
+            }
+        } else {
+            if (statisticsEnabled) {
+                statistics.acquired.increment();
+            }
+            // reset exchange for reuse
+            PooledExchange ee = (PooledExchange) answer;
+            ee.reset(System.currentTimeMillis());
+        }
+        return answer;
+    }
+
+    @Override
     public boolean release(Exchange exchange) {
         try {
             // done exchange before returning back to pool
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java
index 0b10211..e4f4d18 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java
@@ -16,9 +16,12 @@
  */
 package org.apache.camel.impl.engine;
 
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.ProcessorExchangeFactory;
+import org.apache.camel.support.DefaultExchange;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.PooledObjectFactorySupport;
 import org.slf4j.Logger;
@@ -83,6 +86,11 @@ public class PrototypeProcessorExchangeFactory extends 
PooledObjectFactorySuppor
     }
 
     @Override
+    public Exchange create(Endpoint fromEndpoint, ExchangePattern 
exchangePattern) {
+        return new DefaultExchange(fromEndpoint, exchangePattern);
+    }
+
+    @Override
     public Exchange acquire() {
         throw new UnsupportedOperationException("Not in use");
     }
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index fff7cb9..f3b90d4 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -287,8 +287,7 @@ public class WireTapProcessor extends AsyncProcessorSupport
     }
 
     private Exchange configureNewExchange(Exchange exchange) {
-        // no copy so lets just create a new exchange always
-        return new DefaultExchange(exchange.getFromEndpoint(), 
ExchangePattern.InOnly);
+        return processorExchangeFactory.create(exchange.getFromEndpoint(), 
ExchangePattern.InOnly);
     }
 
     public List<Processor> getNewExchangeProcessors() {
@@ -352,11 +351,11 @@ public class WireTapProcessor extends 
AsyncProcessorSupport
 
     @Override
     protected void doBuild() throws Exception {
-        if (copy) {
-            // create a per processor exchange factory
-            this.processorExchangeFactory = 
getCamelContext().adapt(ExtendedCamelContext.class)
-                    
.getProcessorExchangeFactory().newProcessorExchangeFactory(this);
-        }
+        // create a per processor exchange factory
+        this.processorExchangeFactory = 
getCamelContext().adapt(ExtendedCamelContext.class)
+                
.getProcessorExchangeFactory().newProcessorExchangeFactory(this);
+        this.processorExchangeFactory.setRouteId(getRouteId());
+        this.processorExchangeFactory.setId(getId());
 
         boolean pooled = 
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled();
         if (pooled) {

Reply via email to