This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit ca506d7fdfcd340562e64998969162ca3c000784
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Thu Feb 18 15:38:33 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../java/org/apache/camel/spi/ExchangeFactory.java |  9 ++++++
 .../camel/impl/engine/AbstractCamelContext.java    |  1 +
 .../camel/impl/engine/DefaultExchangeFactory.java  | 11 ++++---
 .../camel/impl/engine/PooledExchangeFactory.java   | 36 ++++++++++++++--------
 .../org/apache/camel/support/DefaultConsumer.java  | 20 ++++++++----
 5 files changed, 55 insertions(+), 22 deletions(-)

diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java 
b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
index 95586c4..8e429f2 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.spi;
 
+import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 
@@ -38,6 +39,14 @@ public interface ExchangeFactory {
     String FACTORY = "exchange-factory";
 
     /**
+     * Creates a new {@link ExchangeFactory} that is private for the given 
consumer.
+     *
+     * @param consumer the consumer that will use the created {@link 
ExchangeFactory}
+     * @return the created factory.
+     */
+    ExchangeFactory newExchangeFactory(Consumer consumer);
+
+    /**
      * Gets a new {@link Exchange}
      *
      * @param autoRelease whether to auto release the exchange when routing is 
complete via {@link UnitOfWork}
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index 3e1451f..22195da 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -3624,6 +3624,7 @@ public abstract class AbstractCamelContext extends 
BaseService
         getDataFormatResolver();
 
         getExecutorServiceManager();
+        getExchangeFactory();
         getShutdownStrategy();
         getUuidGenerator();
 
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
index ec17772..a8db865 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
@@ -16,10 +16,7 @@
  */
 package org.apache.camel.impl.engine;
 
-import org.apache.camel.CamelContext;
-import org.apache.camel.CamelContextAware;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
+import org.apache.camel.*;
 import org.apache.camel.spi.ExchangeFactory;
 import org.apache.camel.support.DefaultExchange;
 
@@ -41,6 +38,12 @@ public class DefaultExchangeFactory implements 
ExchangeFactory, CamelContextAwar
     }
 
     @Override
+    public ExchangeFactory newExchangeFactory(Consumer consumer) {
+        // we just use a shared factory
+        return this;
+    }
+
+    @Override
     public Exchange create(boolean autoRelease) {
         return new DefaultExchange(camelContext);
     }
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index 30094cd..0541691 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -19,18 +19,12 @@ package org.apache.camel.impl.engine;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.camel.CamelContext;
-import org.apache.camel.CamelContextAware;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.Experimental;
-import org.apache.camel.ExtendedExchange;
-import org.apache.camel.NonManagedService;
-import org.apache.camel.StaticService;
+import org.apache.camel.*;
 import org.apache.camel.spi.ExchangeFactory;
 import org.apache.camel.support.DefaultExchange;
 import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,6 +37,7 @@ public class PooledExchangeFactory extends ServiceSupport
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PooledExchangeFactory.class);
 
+    private final Consumer consumer;
     private final ReleaseOnCompletion onCompletion = new ReleaseOnCompletion();
     private final ConcurrentLinkedQueue<Exchange> pool = new 
ConcurrentLinkedQueue<>();
     private final AtomicLong acquired = new AtomicLong();
@@ -53,6 +48,16 @@ public class PooledExchangeFactory extends ServiceSupport
     private CamelContext camelContext;
     private boolean statisticsEnabled = true;
 
+    public PooledExchangeFactory() {
+        this.consumer = null;
+    }
+
+    private PooledExchangeFactory(Consumer consumer, CamelContext 
camelContext, boolean statisticsEnabled) {
+        this.consumer = consumer;
+        this.camelContext = camelContext;
+        this.statisticsEnabled = statisticsEnabled;
+    }
+
     @Override
     public CamelContext getCamelContext() {
         return camelContext;
@@ -63,6 +68,11 @@ public class PooledExchangeFactory extends ServiceSupport
         this.camelContext = camelContext;
     }
 
+    @Override
+    public ExchangeFactory newExchangeFactory(Consumer consumer) {
+        return new PooledExchangeFactory(consumer, camelContext, 
statisticsEnabled);
+    }
+
     public boolean isStatisticsEnabled() {
         return statisticsEnabled;
     }
@@ -132,7 +142,6 @@ public class PooledExchangeFactory extends ServiceSupport
             if (statisticsEnabled) {
                 discarded.incrementAndGet();
             }
-            // ignore
             LOG.debug("Error resetting exchange: {}. This exchange is 
discarded.", exchange);
         }
     }
@@ -141,9 +150,12 @@ public class PooledExchangeFactory extends ServiceSupport
     protected void doStop() throws Exception {
         pool.clear();
 
-        if (statisticsEnabled) {
-            LOG.info("PooledExchangeFactory usage [created: {}, acquired: {}, 
released: {}, discarded: {}]",
-                    created.get(), acquired.get(), released.get(), 
discarded.get());
+        if (statisticsEnabled && consumer != null) {
+            String uri = consumer.getEndpoint().getEndpointBaseUri();
+            uri = URISupport.sanitizeUri(uri);
+
+            LOG.info("PooledExchangeFactory ({}) usage [created: {}, reused: 
{}, released: {}, discarded: {}]",
+                    uri, created.get(), acquired.get(), released.get(), 
discarded.get());
         }
 
         created.set(0);
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
index c8e594f..ee12829 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
@@ -56,7 +56,9 @@ public class DefaultConsumer extends ServiceSupport 
implements Consumer, RouteAw
         this.processor = processor;
         this.asyncProcessor = AsyncProcessorConverterHelper.convert(processor);
         this.exceptionHandler = new 
LoggingExceptionHandler(endpoint.getCamelContext(), getClass());
-        this.exchangeFactory = 
endpoint.getCamelContext().adapt(ExtendedCamelContext.class).getExchangeFactory();
+        // create a per consumer exchange factory
+        this.exchangeFactory = 
endpoint.getCamelContext().adapt(ExtendedCamelContext.class)
+                .getExchangeFactory().newExchangeFactory(this);
     }
 
     @Override
@@ -167,19 +169,25 @@ public class DefaultConsumer extends ServiceSupport 
implements Consumer, RouteAw
     @Override
     protected void doInit() throws Exception {
         LOG.debug("Init consumer: {}", this);
-        ServiceHelper.initService(processor);
+        ServiceHelper.initService(exchangeFactory, processor);
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        LOG.debug("Starting consumer: {}", this);
+        ServiceHelper.startService(exchangeFactory, processor);
     }
 
     @Override
     protected void doStop() throws Exception {
         LOG.debug("Stopping consumer: {}", this);
-        ServiceHelper.stopService(processor);
+        ServiceHelper.stopService(exchangeFactory, processor);
     }
 
     @Override
-    protected void doStart() throws Exception {
-        LOG.debug("Starting consumer: {}", this);
-        ServiceHelper.startService(processor);
+    protected void doShutdown() throws Exception {
+        LOG.debug("Shutting down consumer: {}", this);
+        ServiceHelper.stopAndShutdownServices(exchangeFactory, processor);
     }
 
     /**

Reply via email to