This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 161ed6966a7c8185fc7818b51d0bc58fde19ec1b
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed Feb 24 12:36:54 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../org/apache/camel/ExtendedCamelContext.java     |  11 ++
 .../java/org/apache/camel/spi/ExchangeFactory.java |  73 +++++++++-
 .../apache/camel/spi/ExchangeFactoryManager.java   |  78 ++++++++++
 .../camel/impl/engine/AbstractCamelContext.java    |  26 +++-
 .../camel/impl/engine/DefaultExchangeFactory.java  | 161 ++++++++++++++++++++-
 .../impl/engine/DefaultExchangeFactoryManager.java | 101 +++++++++++++
 .../camel/impl/engine/PooledExchangeFactory.java   | 110 ++++++--------
 .../camel/impl/engine/SimpleCamelContext.java      |   6 +
 .../camel/impl/ExtendedCamelContextConfigurer.java |   6 +
 .../camel/impl/lw/LightweightCamelContext.java     |  11 ++
 .../impl/lw/LightweightRuntimeCamelContext.java    |  13 ++
 .../api/management/mbean/CamelOpenMBeanTypes.java  |  13 ++
 .../mbean/ManagedExchangeFactoryManagerMBean.java  |  44 ++++++
 .../management/JmxManagementLifecycleStrategy.java |   4 +
 .../mbean/ManagedExchangeFactoryManager.java       | 110 ++++++++++++++
 .../org/apache/camel/support/DefaultConsumer.java  |   5 +-
 16 files changed, 693 insertions(+), 79 deletions(-)

diff --git 
a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java 
b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
index 6cc0fc5..4689df9 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
@@ -37,6 +37,7 @@ import org.apache.camel.spi.DeferServiceFactory;
 import org.apache.camel.spi.EndpointStrategy;
 import org.apache.camel.spi.EndpointUriFactory;
 import org.apache.camel.spi.ExchangeFactory;
+import org.apache.camel.spi.ExchangeFactoryManager;
 import org.apache.camel.spi.FactoryFinder;
 import org.apache.camel.spi.FactoryFinderResolver;
 import org.apache.camel.spi.HeadersMapFactory;
@@ -225,6 +226,16 @@ public interface ExtendedCamelContext extends CamelContext 
{
     void setExchangeFactory(ExchangeFactory exchangeFactory);
 
     /**
+     * Gets the exchange factory manager to use.
+     */
+    ExchangeFactoryManager getExchangeFactoryManager();
+
+    /**
+     * Sets a custom exchange factory manager to use.
+     */
+    void setExchangeFactoryManager(ExchangeFactoryManager 
exchangeFactoryManager);
+
+    /**
      * Returns the bean post processor used to do any bean customization.
      *
      * @return the bean post processor.
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java 
b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
index e2047e1..ac8d7e6 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
@@ -16,9 +16,12 @@
  */
 package org.apache.camel.spi;
 
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.NonManagedService;
+import org.apache.camel.Service;
 
 /**
  * Factory used by {@link Consumer} to create Camel {@link Exchange} holding 
the incoming message received by the
@@ -33,7 +36,50 @@ import org.apache.camel.Exchange;
  * The factory is pluggable which allows to use different strategies. The 
default factory will create a new
  * {@link Exchange} instance, and the pooled factory will pool and reuse 
exchanges.
  */
-public interface ExchangeFactory {
+public interface ExchangeFactory extends Service, CamelContextAware, 
NonManagedService {
+
+    /**
+     * Utilization statistics of the this factory.
+     */
+    interface Statistics {
+
+        /**
+         * Number of new exchanges created.
+         */
+        long getCreatedCounter();
+
+        /**
+         * Number of exchanges acquired (reused) when using pooled factory.
+         */
+        long getAcquiredCounter();
+
+        /**
+         * Number of exchanges released back to pool
+         */
+        long getReleasedCounter();
+
+        /**
+         * Number of exchanges discarded (thrown away) such as if no space in 
cache pool.
+         */
+        long getDiscardedCounter();
+
+        /**
+         * Reset the counters
+         */
+        void reset();
+
+        /**
+         * Whether statistics is enabled.
+         */
+        boolean isStatisticsEnabled();
+
+        /**
+         * Sets whether statistics is enabled.
+         *
+         * @param statisticsEnabled <tt>true</tt> to enable
+         */
+        void setStatisticsEnabled(boolean statisticsEnabled);
+    }
 
     /**
      * Service factory key.
@@ -41,6 +87,11 @@ public interface ExchangeFactory {
     String FACTORY = "exchange-factory";
 
     /**
+     * The consumer using this factory.
+     */
+    Consumer getConsumer();
+
+    /**
      * Creates a new {@link ExchangeFactory} that is private for the given 
consumer.
      *
      * @param  consumer the consumer that will use the created {@link 
ExchangeFactory}
@@ -79,6 +130,11 @@ public interface ExchangeFactory {
     int getCapacity();
 
     /**
+     * The current number of exchanges in the pool
+     */
+    int getSize();
+
+    /**
      * The capacity the pool (for each consumer) uses for storing exchanges. 
The default capacity is 100.
      */
     void setCapacity(int capacity);
@@ -93,4 +149,19 @@ public interface ExchangeFactory {
      */
     void setStatisticsEnabled(boolean statisticsEnabled);
 
+    /**
+     * Reset the statistics
+     */
+    void resetStatistics();
+
+    /**
+     * Purges the internal cache (if pooled)
+     */
+    void purge();
+
+    /**
+     * Gets the usage statistics
+     */
+    Statistics getStatistics();
+
 }
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactoryManager.java 
b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactoryManager.java
new file mode 100644
index 0000000..a46884a
--- /dev/null
+++ 
b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactoryManager.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import java.util.Collection;
+
+import org.apache.camel.StaticService;
+
+/**
+ * Manages {@link ExchangeFactory}.
+ */
+public interface ExchangeFactoryManager extends StaticService {
+
+    /**
+     * Adds the {@link ExchangeFactory} to be managed.
+     *
+     * @param exchangeFactory the exchange factory
+     */
+    void addExchangeFactory(ExchangeFactory exchangeFactory);
+
+    /**
+     * Removes the {@link ExchangeFactory} from being managed (such as when a 
route is stopped/removed) or during
+     * shutdown.
+     *
+     * @param exchangeFactory the exchange factory
+     */
+    void removeExchangeFactory(ExchangeFactory exchangeFactory);
+
+    /**
+     * Returns a read-only view of the managed factories.
+     */
+    Collection<ExchangeFactory> getExchangeFactories();
+
+    /**
+     * Number of consumers currently being managed
+     */
+    int getSize();
+
+    /**
+     * The capacity the pool (for each consumer) uses for storing exchanges. 
The default capacity is 100.
+     */
+    int getCapacity();
+
+    /**
+     * Whether statistics is enabled.
+     */
+    boolean isStatisticsEnabled();
+
+    /**
+     * Whether statistics is enabled.
+     */
+    void setStatisticsEnabled(boolean statisticsEnabled);
+
+    /**
+     * Reset the statistics
+     */
+    void resetStatistics();
+
+    /**
+     * Purges the internal caches (if pooled)
+     */
+    void purge();
+
+}
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index 22195da..305bfa9 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -102,6 +102,7 @@ import org.apache.camel.spi.EndpointStrategy;
 import org.apache.camel.spi.EndpointUriFactory;
 import org.apache.camel.spi.EventNotifier;
 import org.apache.camel.spi.ExchangeFactory;
+import org.apache.camel.spi.ExchangeFactoryManager;
 import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.spi.FactoryFinder;
 import org.apache.camel.spi.FactoryFinderResolver;
@@ -265,6 +266,7 @@ public abstract class AbstractCamelContext extends 
BaseService
     private volatile String version;
     private volatile PropertiesComponent propertiesComponent;
     private volatile CamelContextNameStrategy nameStrategy;
+    private volatile ExchangeFactoryManager exchangeFactoryManager = new 
DefaultExchangeFactoryManager();
     private volatile ExchangeFactory exchangeFactory;
     private volatile ReactiveExecutor reactiveExecutor;
     private volatile ManagementNameStrategy managementNameStrategy;
@@ -3684,6 +3686,7 @@ public abstract class AbstractCamelContext extends 
BaseService
         reactiveExecutor = null;
         asyncProcessorAwaitManager = null;
         exchangeFactory = null;
+        exchangeFactoryManager = null;
     }
 
     /**
@@ -4657,7 +4660,26 @@ public abstract class AbstractCamelContext extends 
BaseService
 
     @Override
     public void setExchangeFactory(ExchangeFactory exchangeFactory) {
-        this.exchangeFactory = doAddService(exchangeFactory);
+        // automatic inject camel context
+        exchangeFactory.setCamelContext(this);
+        this.exchangeFactory = exchangeFactory;
+    }
+
+    @Override
+    public ExchangeFactoryManager getExchangeFactoryManager() {
+        if (exchangeFactoryManager == null) {
+            synchronized (lock) {
+                if (exchangeFactoryManager == null) {
+                    setExchangeFactoryManager(createExchangeFactoryManager());
+                }
+            }
+        }
+        return exchangeFactoryManager;
+    }
+
+    @Override
+    public void setExchangeFactoryManager(ExchangeFactoryManager 
exchangeFactoryManager) {
+        this.exchangeFactoryManager = doAddService(exchangeFactoryManager);
     }
 
     @Override
@@ -4754,6 +4776,8 @@ public abstract class AbstractCamelContext extends 
BaseService
 
     protected abstract ExchangeFactory createExchangeFactory();
 
+    protected abstract ExchangeFactoryManager createExchangeFactoryManager();
+
     protected abstract HealthCheckRegistry createHealthCheckRegistry();
 
     protected abstract ReactiveExecutor createReactiveExecutor();
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
index 1ca6740..ae7e601 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
@@ -16,20 +16,51 @@
  */
 package org.apache.camel.impl.engine;
 
+import java.util.concurrent.atomic.LongAdder;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.spi.ExchangeFactory;
+import org.apache.camel.spi.ExchangeFactoryManager;
 import org.apache.camel.support.DefaultExchange;
+import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Default {@link ExchangeFactory} that creates a new {@link Exchange} 
instance.
  */
-public final class DefaultExchangeFactory implements ExchangeFactory, 
CamelContextAware {
+public class DefaultExchangeFactory extends ServiceSupport implements 
ExchangeFactory, CamelContextAware {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultExchangeFactory.class);
 
-    private CamelContext camelContext;
+    final UtilizationStatistics statistics = new UtilizationStatistics();
+    final Consumer consumer;
+    CamelContext camelContext;
+    ExchangeFactoryManager exchangeFactoryManager;
+
+    public DefaultExchangeFactory() {
+        this.consumer = null;
+    }
+
+    public DefaultExchangeFactory(Consumer consumer) {
+        this.consumer = consumer;
+    }
+
+    @Override
+    protected void doBuild() throws Exception {
+        this.exchangeFactoryManager = 
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactoryManager();
+    }
+
+    @Override
+    public Consumer getConsumer() {
+        return consumer;
+    }
 
     @Override
     public CamelContext getCamelContext() {
@@ -43,28 +74,44 @@ public final class DefaultExchangeFactory implements 
ExchangeFactory, CamelConte
 
     @Override
     public ExchangeFactory newExchangeFactory(Consumer consumer) {
-        // we just use a shared factory
-        return this;
+        DefaultExchangeFactory answer = new DefaultExchangeFactory(consumer);
+        answer.setStatisticsEnabled(statistics.isStatisticsEnabled());
+        answer.setCamelContext(camelContext);
+        return answer;
     }
 
     @Override
     public Exchange create(boolean autoRelease) {
+        if (statistics.isStatisticsEnabled()) {
+            statistics.created.increment();
+        }
         return new DefaultExchange(camelContext);
     }
 
     @Override
     public Exchange create(Endpoint fromEndpoint, boolean autoRelease) {
+        if (statistics.isStatisticsEnabled()) {
+            statistics.created.increment();
+        }
         return new DefaultExchange(fromEndpoint);
     }
 
     @Override
+    public boolean release(Exchange exchange) {
+        if (statistics.isStatisticsEnabled()) {
+            statistics.released.increment();
+        }
+        return true;
+    }
+
+    @Override
     public boolean isStatisticsEnabled() {
-        return false;
+        return statistics.isStatisticsEnabled();
     }
 
     @Override
     public void setStatisticsEnabled(boolean statisticsEnabled) {
-        // not in use
+        statistics.setStatisticsEnabled(statisticsEnabled);
     }
 
     @Override
@@ -73,7 +120,109 @@ public final class DefaultExchangeFactory implements 
ExchangeFactory, CamelConte
     }
 
     @Override
+    public int getSize() {
+        return 0;
+    }
+
+    @Override
     public void setCapacity(int capacity) {
         // not in use
     }
+
+    @Override
+    public void resetStatistics() {
+        statistics.reset();
+    }
+
+    @Override
+    public void purge() {
+        // not in use
+    }
+
+    @Override
+    public Statistics getStatistics() {
+        return statistics;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        exchangeFactoryManager.addExchangeFactory(this);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        exchangeFactoryManager.removeExchangeFactory(this);
+        logUsageSummary(LOG, "DefaultExchangeFactory", 0);
+        statistics.reset();
+    }
+
+    void logUsageSummary(Logger log, String name, int pooled) {
+        if (statistics.isStatisticsEnabled() && consumer != null) {
+            // only log if there is any usage
+            long created = statistics.getCreatedCounter();
+            long acquired = statistics.getAcquiredCounter();
+            long released = statistics.getReleasedCounter();
+            long discarded = statistics.getDiscardedCounter();
+            boolean shouldLog = pooled > 0 || created > 0 || acquired > 0 || 
released > 0 || discarded > 0;
+            if (shouldLog) {
+                String uri = consumer.getEndpoint().getEndpointBaseUri();
+                uri = URISupport.sanitizeUri(uri);
+
+                LOG.info("{} ({}) usage [pooled: {}, created: {}, acquired: {} 
released: {}, discarded: {}]",
+                        name, uri, pooled, created, acquired, released, 
discarded);
+            }
+        }
+    }
+
+    /**
+     * Represents utilization statistics
+     */
+    final class UtilizationStatistics implements ExchangeFactory.Statistics {
+
+        private boolean statisticsEnabled;
+
+        final LongAdder created = new LongAdder();
+        final LongAdder acquired = new LongAdder();
+        final LongAdder released = new LongAdder();
+        final LongAdder discarded = new LongAdder();
+
+        @Override
+        public void reset() {
+            created.reset();
+            acquired.reset();
+            released.reset();
+            discarded.reset();
+        }
+
+        @Override
+        public long getCreatedCounter() {
+            return created.longValue();
+        }
+
+        @Override
+        public long getAcquiredCounter() {
+            return acquired.longValue();
+        }
+
+        @Override
+        public long getReleasedCounter() {
+            return released.longValue();
+        }
+
+        @Override
+        public long getDiscardedCounter() {
+            return discarded.longValue();
+        }
+
+        @Override
+        public boolean isStatisticsEnabled() {
+            return statisticsEnabled;
+        }
+
+        @Override
+        public void setStatisticsEnabled(boolean statisticsEnabled) {
+            this.statisticsEnabled = statisticsEnabled;
+        }
+    }
+
 }
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactoryManager.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactoryManager.java
new file mode 100644
index 0000000..7b5ffc1
--- /dev/null
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactoryManager.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.engine;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Consumer;
+import org.apache.camel.spi.ExchangeFactory;
+import org.apache.camel.spi.ExchangeFactoryManager;
+import org.apache.camel.support.service.ServiceSupport;
+
+public class DefaultExchangeFactoryManager extends ServiceSupport implements 
ExchangeFactoryManager, CamelContextAware {
+
+    private CamelContext camelContext;
+    private final Map<Consumer, ExchangeFactory> factories = new 
ConcurrentHashMap<>();
+    private int capacity;
+    private boolean statisticsEnabled;
+
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public void addExchangeFactory(ExchangeFactory exchangeFactory) {
+        factories.put(exchangeFactory.getConsumer(), exchangeFactory);
+        // same for all factories
+        capacity = exchangeFactory.getCapacity();
+        statisticsEnabled = exchangeFactory.isStatisticsEnabled();
+    }
+
+    @Override
+    public void removeExchangeFactory(ExchangeFactory exchangeFactory) {
+        factories.remove(exchangeFactory.getConsumer());
+    }
+
+    @Override
+    public Collection<ExchangeFactory> getExchangeFactories() {
+        return Collections.unmodifiableCollection(factories.values());
+    }
+
+    @Override
+    public int getSize() {
+        return factories.size();
+    }
+
+    @Override
+    public int getCapacity() {
+        return capacity;
+    }
+
+    @Override
+    public boolean isStatisticsEnabled() {
+        return statisticsEnabled;
+    }
+
+    @Override
+    public void setStatisticsEnabled(boolean statisticsEnabled) {
+        this.statisticsEnabled = statisticsEnabled;
+        for (ExchangeFactory ef : factories.values()) {
+            ef.setStatisticsEnabled(statisticsEnabled);
+        }
+    }
+
+    @Override
+    public void resetStatistics() {
+        factories.values().forEach(ExchangeFactory::resetStatistics);
+    }
+
+    @Override
+    public void purge() {
+        factories.values().forEach(ExchangeFactory::purge);
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        factories.clear();
+    }
+}
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index dcd0a03..e83fbfd 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -18,102 +18,85 @@ package org.apache.camel.impl.engine;
 
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.camel.CamelContext;
-import org.apache.camel.CamelContextAware;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.NonManagedService;
 import org.apache.camel.PooledExchange;
-import org.apache.camel.StaticService;
 import org.apache.camel.spi.ExchangeFactory;
 import org.apache.camel.support.DefaultPooledExchange;
-import org.apache.camel.support.service.ServiceSupport;
-import org.apache.camel.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Pooled {@link ExchangeFactory} that reuses {@link Exchange} instance from a 
pool.
  */
-public final class PooledExchangeFactory extends ServiceSupport
-        implements ExchangeFactory, CamelContextAware, StaticService, 
NonManagedService {
+public final class PooledExchangeFactory extends DefaultExchangeFactory {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PooledExchangeFactory.class);
 
     private final ReleaseOnDoneTask onDone = new ReleaseOnDoneTask();
     private final Consumer consumer;
     private BlockingQueue<Exchange> pool;
-    private final AtomicLong acquired = new AtomicLong();
-    private final AtomicLong created = new AtomicLong();
-    private final AtomicLong released = new AtomicLong();
-    private final AtomicLong discarded = new AtomicLong();
-
-    private CamelContext camelContext;
     private int capacity = 100;
-    private boolean statisticsEnabled;
 
     public PooledExchangeFactory() {
         this.consumer = null;
     }
 
-    private PooledExchangeFactory(Consumer consumer, CamelContext 
camelContext, boolean statisticsEnabled, int capacity) {
+    public PooledExchangeFactory(Consumer consumer) {
         this.consumer = consumer;
-        this.camelContext = camelContext;
-        this.statisticsEnabled = statisticsEnabled;
-        this.capacity = capacity;
     }
 
     @Override
     protected void doBuild() throws Exception {
+        super.doBuild();
         this.pool = new ArrayBlockingQueue<>(capacity);
     }
 
     @Override
-    public CamelContext getCamelContext() {
-        return camelContext;
-    }
-
-    @Override
-    public void setCamelContext(CamelContext camelContext) {
-        this.camelContext = camelContext;
+    public Consumer getConsumer() {
+        return consumer;
     }
 
     @Override
     public ExchangeFactory newExchangeFactory(Consumer consumer) {
-        return new PooledExchangeFactory(consumer, camelContext, 
statisticsEnabled, capacity);
+        PooledExchangeFactory answer = new PooledExchangeFactory(consumer);
+        answer.setCamelContext(camelContext);
+        answer.setCapacity(capacity);
+        answer.setStatisticsEnabled(isStatisticsEnabled());
+        return answer;
     }
 
     public int getCapacity() {
         return capacity;
     }
 
-    public void setCapacity(int capacity) {
-        this.capacity = capacity;
-    }
-
-    public boolean isStatisticsEnabled() {
-        return statisticsEnabled;
+    @Override
+    public int getSize() {
+        if (pool != null) {
+            return pool.size();
+        } else {
+            return 0;
+        }
     }
 
-    public void setStatisticsEnabled(boolean statisticsEnabled) {
-        this.statisticsEnabled = statisticsEnabled;
+    public void setCapacity(int capacity) {
+        this.capacity = capacity;
     }
 
     @Override
     public Exchange create(boolean autoRelease) {
         Exchange exchange = pool.poll();
         if (exchange == null) {
-            if (statisticsEnabled) {
-                created.incrementAndGet();
-            }
             // create a new exchange as there was no free from the pool
             exchange = createPooledExchange(null, autoRelease);
+            if (statistics.isStatisticsEnabled()) {
+                statistics.created.increment();
+            }
         } else {
-            if (statisticsEnabled) {
-                acquired.incrementAndGet();
+            if (statistics.isStatisticsEnabled()) {
+                statistics.acquired.increment();
             }
             // reset exchange for reuse
             PooledExchange ee = exchange.adapt(PooledExchange.class);
@@ -126,14 +109,14 @@ public final class PooledExchangeFactory extends 
ServiceSupport
     public Exchange create(Endpoint fromEndpoint, boolean autoRelease) {
         Exchange exchange = pool.poll();
         if (exchange == null) {
-            if (statisticsEnabled) {
-                created.incrementAndGet();
-            }
             // create a new exchange as there was no free from the pool
             exchange = new DefaultPooledExchange(fromEndpoint);
+            if (statistics.isStatisticsEnabled()) {
+                statistics.created.increment();
+            }
         } else {
-            if (statisticsEnabled) {
-                acquired.incrementAndGet();
+            if (statistics.isStatisticsEnabled()) {
+                statistics.acquired.increment();
             }
             // reset exchange for reuse
             PooledExchange ee = exchange.adapt(PooledExchange.class);
@@ -154,17 +137,17 @@ public final class PooledExchangeFactory extends 
ServiceSupport
             // only release back in pool if reset was success
             boolean inserted = pool.offer(exchange);
 
-            if (statisticsEnabled) {
+            if (statistics.isStatisticsEnabled()) {
                 if (inserted) {
-                    released.incrementAndGet();
+                    statistics.released.increment();
                 } else {
-                    discarded.incrementAndGet();
+                    statistics.discarded.increment();
                 }
             }
             return inserted;
         } catch (Exception e) {
-            if (statisticsEnabled) {
-                discarded.incrementAndGet();
+            if (statistics.isStatisticsEnabled()) {
+                statistics.discarded.increment();
             }
             LOG.debug("Error resetting exchange: {}. This exchange is 
discarded.", exchange);
             return false;
@@ -187,25 +170,18 @@ public final class PooledExchangeFactory extends 
ServiceSupport
     }
 
     @Override
-    protected void doStop() throws Exception {
+    public void purge() {
         pool.clear();
+    }
 
-        if (statisticsEnabled && consumer != null) {
-            // only log if there is any usage
-            boolean shouldLog = created.get() > 0 || acquired.get() > 0 || 
released.get() > 0 || discarded.get() > 0;
-            if (shouldLog) {
-                String uri = consumer.getEndpoint().getEndpointBaseUri();
-                uri = URISupport.sanitizeUri(uri);
-
-                LOG.info("PooledExchangeFactory ({}) usage [created: {}, 
reused: {}, released: {}, discarded: {}]",
-                        uri, created.get(), acquired.get(), released.get(), 
discarded.get());
-            }
-        }
+    @Override
+    protected void doStop() throws Exception {
+        exchangeFactoryManager.removeExchangeFactory(this);
+        logUsageSummary(LOG, "PooledExchangeFactory", pool.size());
+        statistics.reset();
+        pool.clear();
 
-        created.set(0);
-        acquired.set(0);
-        released.set(0);
-        discarded.set(0);
+        // do not call super
     }
 
     private final class ReleaseOnDoneTask implements PooledExchange.OnDoneTask 
{
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
index b91710c..90f7a62 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
@@ -42,6 +42,7 @@ import org.apache.camel.spi.DataFormatResolver;
 import org.apache.camel.spi.DeferServiceFactory;
 import org.apache.camel.spi.EndpointRegistry;
 import org.apache.camel.spi.ExchangeFactory;
+import org.apache.camel.spi.ExchangeFactoryManager;
 import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.spi.FactoryFinder;
 import org.apache.camel.spi.FactoryFinderResolver;
@@ -555,6 +556,11 @@ public class SimpleCamelContext extends 
AbstractCamelContext {
     }
 
     @Override
+    protected ExchangeFactoryManager createExchangeFactoryManager() {
+        return new DefaultExchangeFactoryManager();
+    }
+
+    @Override
     protected ReactiveExecutor createReactiveExecutor() {
         Optional<ReactiveExecutor> result = ResolverHelper.resolveService(
                 getCamelContextReference(),
diff --git 
a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
 
b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
index b6e2a47..fc3b1b3 100644
--- 
a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
+++ 
b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
@@ -67,6 +67,8 @@ public class ExtendedCamelContextConfigurer extends 
org.apache.camel.support.com
         case "EventNotificationApplicable": 
target.setEventNotificationApplicable(property(camelContext, boolean.class, 
value)); return true;
         case "exchangefactory":
         case "ExchangeFactory": 
target.setExchangeFactory(property(camelContext, 
org.apache.camel.spi.ExchangeFactory.class, value)); return true;
+        case "exchangefactorymanager":
+        case "ExchangeFactoryManager": 
target.setExchangeFactoryManager(property(camelContext, 
org.apache.camel.spi.ExchangeFactoryManager.class, value)); return true;
         case "executorservicemanager":
         case "ExecutorServiceManager": 
target.setExecutorServiceManager(property(camelContext, 
org.apache.camel.spi.ExecutorServiceManager.class, value)); return true;
         case "factoryfinderresolver":
@@ -236,6 +238,8 @@ public class ExtendedCamelContextConfigurer extends 
org.apache.camel.support.com
         case "EventNotificationApplicable": return boolean.class;
         case "exchangefactory":
         case "ExchangeFactory": return 
org.apache.camel.spi.ExchangeFactory.class;
+        case "exchangefactorymanager":
+        case "ExchangeFactoryManager": return 
org.apache.camel.spi.ExchangeFactoryManager.class;
         case "executorservicemanager":
         case "ExecutorServiceManager": return 
org.apache.camel.spi.ExecutorServiceManager.class;
         case "factoryfinderresolver":
@@ -406,6 +410,8 @@ public class ExtendedCamelContextConfigurer extends 
org.apache.camel.support.com
         case "EventNotificationApplicable": return 
target.isEventNotificationApplicable();
         case "exchangefactory":
         case "ExchangeFactory": return target.getExchangeFactory();
+        case "exchangefactorymanager":
+        case "ExchangeFactoryManager": return 
target.getExchangeFactoryManager();
         case "executorservicemanager":
         case "ExecutorServiceManager": return 
target.getExecutorServiceManager();
         case "factoryfinderresolver":
diff --git 
a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
 
b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
index 7732b81..85f642e 100644
--- 
a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
+++ 
b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java
@@ -89,6 +89,7 @@ import org.apache.camel.spi.EndpointRegistry;
 import org.apache.camel.spi.EndpointStrategy;
 import org.apache.camel.spi.EndpointUriFactory;
 import org.apache.camel.spi.ExchangeFactory;
+import org.apache.camel.spi.ExchangeFactoryManager;
 import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.spi.FactoryFinder;
 import org.apache.camel.spi.FactoryFinderResolver;
@@ -1452,6 +1453,16 @@ public class LightweightCamelContext implements 
ExtendedCamelContext, CatalogCam
     }
 
     @Override
+    public ExchangeFactoryManager getExchangeFactoryManager() {
+        return getExtendedCamelContext().getExchangeFactoryManager();
+    }
+
+    @Override
+    public void setExchangeFactoryManager(ExchangeFactoryManager 
exchangeFactoryManager) {
+        
getExtendedCamelContext().setExchangeFactoryManager(exchangeFactoryManager);
+    }
+
+    @Override
     public ReactiveExecutor getReactiveExecutor() {
         return getExtendedCamelContext().getReactiveExecutor();
     }
diff --git 
a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java
 
b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java
index cc96226..931073b 100644
--- 
a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java
+++ 
b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java
@@ -86,6 +86,7 @@ import org.apache.camel.spi.EndpointRegistry;
 import org.apache.camel.spi.EndpointStrategy;
 import org.apache.camel.spi.EndpointUriFactory;
 import org.apache.camel.spi.ExchangeFactory;
+import org.apache.camel.spi.ExchangeFactoryManager;
 import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.spi.FactoryFinder;
 import org.apache.camel.spi.FactoryFinderResolver;
@@ -169,6 +170,7 @@ public class LightweightRuntimeCamelContext implements 
ExtendedCamelContext, Cat
     private final BeanIntrospection beanIntrospection;
     private final HeadersMapFactory headersMapFactory;
     private final ExchangeFactory exchangeFactory;
+    private final ExchangeFactoryManager exchangeFactoryManager;
     private final ReactiveExecutor reactiveExecutor;
     private final AsyncProcessorAwaitManager asyncProcessorAwaitManager;
     private final ExecutorServiceManager executorServiceManager;
@@ -214,6 +216,7 @@ public class LightweightRuntimeCamelContext implements 
ExtendedCamelContext, Cat
         beanIntrospection = 
context.adapt(ExtendedCamelContext.class).getBeanIntrospection();
         headersMapFactory = 
context.adapt(ExtendedCamelContext.class).getHeadersMapFactory();
         exchangeFactory = 
context.adapt(ExtendedCamelContext.class).getExchangeFactory();
+        exchangeFactoryManager = 
context.adapt(ExtendedCamelContext.class).getExchangeFactoryManager();
         reactiveExecutor = 
context.adapt(ExtendedCamelContext.class).getReactiveExecutor();
         asyncProcessorAwaitManager = 
context.adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager();
         executorServiceManager = context.getExecutorServiceManager();
@@ -1567,6 +1570,16 @@ public class LightweightRuntimeCamelContext implements 
ExtendedCamelContext, Cat
     }
 
     @Override
+    public ExchangeFactoryManager getExchangeFactoryManager() {
+        return exchangeFactoryManager;
+    }
+
+    @Override
+    public void setExchangeFactoryManager(ExchangeFactoryManager 
exchangeFactoryManager) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public void setExchangeFactory(ExchangeFactory exchangeFactory) {
         throw new UnsupportedOperationException();
     }
diff --git 
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
 
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
index dbc5188..96fbcd8 100644
--- 
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
+++ 
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
@@ -65,6 +65,19 @@ public final class CamelOpenMBeanTypes {
                 new OpenType[] { SimpleType.STRING, SimpleType.BOOLEAN, 
SimpleType.BOOLEAN });
     }
 
+    public static TabularType listExchangeFactoryTabularType() throws 
OpenDataException {
+        CompositeType ct = listExchangeFactoryCompositeType();
+        return new TabularType("listExchangeFactory", "Lists all the exchange 
factories", ct, new String[] { "url" });
+    }
+
+    public static CompositeType listExchangeFactoryCompositeType() throws 
OpenDataException {
+        return new CompositeType(
+                "factories", "Factories",
+                new String[] { "url", "capacity", "pooled", "created", 
"released" },
+                new String[] { "Url", "Capacity", "Pooled", "Created", 
"Released" },
+                new OpenType[] { SimpleType.STRING, SimpleType.INTEGER, 
SimpleType.INTEGER, SimpleType.LONG, SimpleType.LONG });
+    }
+
     public static TabularType listRuntimeEndpointsTabularType() throws 
OpenDataException {
         CompositeType ct = listRuntimeEndpointsCompositeType();
         return new TabularType(
diff --git 
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedExchangeFactoryManagerMBean.java
 
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedExchangeFactoryManagerMBean.java
new file mode 100644
index 0000000..bedebc9
--- /dev/null
+++ 
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedExchangeFactoryManagerMBean.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.api.management.mbean;
+
+import javax.management.openmbean.TabularData;
+
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedOperation;
+
+public interface ManagedExchangeFactoryManagerMBean extends 
ManagedServiceMBean {
+
+    @ManagedAttribute(description = "Max capacity per consumer for exchange 
pooling")
+    Integer getCapacity();
+
+    @ManagedAttribute(description = "Whether statistics is enabled")
+    Boolean getStatisticsEnabled();
+
+    @ManagedAttribute(description = "Whether statistics is enabled")
+    void setStatisticsEnabled(Boolean statisticsEnabled);
+
+    @ManagedOperation(description = "Reset statistics")
+    void resetStatistics();
+
+    @ManagedOperation(description = "Purges the pool")
+    void purge();
+
+    @ManagedOperation(description = "Lists all the consumers and their pooling 
statistics")
+    TabularData listPools();
+
+}
diff --git 
a/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java
 
b/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java
index aae3e70..943584f 100644
--- 
a/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java
+++ 
b/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java
@@ -60,6 +60,7 @@ import org.apache.camel.management.mbean.ManagedCamelContext;
 import org.apache.camel.management.mbean.ManagedConsumerCache;
 import org.apache.camel.management.mbean.ManagedEndpoint;
 import org.apache.camel.management.mbean.ManagedEndpointRegistry;
+import org.apache.camel.management.mbean.ManagedExchangeFactoryManager;
 import org.apache.camel.management.mbean.ManagedInflightRepository;
 import org.apache.camel.management.mbean.ManagedProducerCache;
 import org.apache.camel.management.mbean.ManagedRestRegistry;
@@ -86,6 +87,7 @@ import org.apache.camel.spi.ConsumerCache;
 import org.apache.camel.spi.DataFormat;
 import org.apache.camel.spi.EndpointRegistry;
 import org.apache.camel.spi.EventNotifier;
+import org.apache.camel.spi.ExchangeFactoryManager;
 import org.apache.camel.spi.InflightRepository;
 import org.apache.camel.spi.InternalProcessor;
 import org.apache.camel.spi.LifecycleStrategy;
@@ -537,6 +539,8 @@ public class JmxManagementLifecycleStrategy extends 
ServiceSupport implements Li
             answer = new ManagedConsumerCache(context, (ConsumerCache) 
service);
         } else if (service instanceof ProducerCache) {
             answer = new ManagedProducerCache(context, (ProducerCache) 
service);
+        } else if (service instanceof ExchangeFactoryManager) {
+            answer = new ManagedExchangeFactoryManager(context, 
(ExchangeFactoryManager) service);
         } else if (service instanceof EndpointRegistry) {
             answer = new ManagedEndpointRegistry(context, (EndpointRegistry) 
service);
         } else if (service instanceof BeanIntrospection) {
diff --git 
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedExchangeFactoryManager.java
 
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedExchangeFactoryManager.java
new file mode 100644
index 0000000..bab7a0d
--- /dev/null
+++ 
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedExchangeFactoryManager.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management.mbean;
+
+import java.util.Collection;
+
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes;
+import 
org.apache.camel.api.management.mbean.ManagedExchangeFactoryManagerMBean;
+import org.apache.camel.spi.ExchangeFactory;
+import org.apache.camel.spi.ExchangeFactoryManager;
+import org.apache.camel.spi.ManagementStrategy;
+import org.apache.camel.util.URISupport;
+
+@ManagedResource(description = "Managed ExchangeFactory")
+public class ManagedExchangeFactoryManager extends ManagedService implements 
ManagedExchangeFactoryManagerMBean {
+
+    private ExchangeFactoryManager exchangeFactoryManager;
+    private boolean sanitize;
+
+    public ManagedExchangeFactoryManager(CamelContext context, 
ExchangeFactoryManager exchangeFactoryManager) {
+        super(context, exchangeFactoryManager);
+        this.exchangeFactoryManager = exchangeFactoryManager;
+    }
+
+    @Override
+    public void init(ManagementStrategy strategy) {
+        super.init(strategy);
+        sanitize = strategy.getManagementAgent().getMask() != null ? 
strategy.getManagementAgent().getMask() : false;
+    }
+
+    @Override
+    public Integer getCapacity() {
+        return exchangeFactoryManager.getCapacity();
+    }
+
+    @Override
+    public Boolean getStatisticsEnabled() {
+        return exchangeFactoryManager.isStatisticsEnabled();
+    }
+
+    @Override
+    public void setStatisticsEnabled(Boolean statisticsEnabled) {
+        exchangeFactoryManager.setStatisticsEnabled(statisticsEnabled);
+    }
+
+    @Override
+    public void resetStatistics() {
+        exchangeFactoryManager.resetStatistics();
+    }
+
+    @Override
+    public void purge() {
+        exchangeFactoryManager.purge();
+    }
+
+    @Override
+    public TabularData listPools() {
+        try {
+            TabularData answer = new 
TabularDataSupport(CamelOpenMBeanTypes.listExchangeFactoryTabularType());
+            Collection<ExchangeFactory> factories = 
exchangeFactoryManager.getExchangeFactories();
+            for (ExchangeFactory ef : factories) {
+                CompositeType ct = 
CamelOpenMBeanTypes.listExchangeFactoryCompositeType();
+                String url = ef.getConsumer().getEndpoint().getEndpointUri();
+                if (sanitize) {
+                    url = URISupport.sanitizeUri(url);
+                }
+
+                int capacity = ef.getCapacity();
+                int size = ef.getSize();
+                long created = 0;
+                long released = 0;
+                if (ef.isStatisticsEnabled()) {
+                    created = ef.getStatistics().getCreatedCounter();
+                    released = ef.getStatistics().getReleasedCounter();
+                }
+
+                CompositeData data = new CompositeDataSupport(
+                        ct, new String[] { "url", "capacity", "pooled", 
"created", "released" },
+                        new Object[] { url, capacity, size, created, released 
});
+                answer.put(data);
+            }
+            return answer;
+        } catch (Exception e) {
+            throw RuntimeCamelException.wrapRuntimeCamelException(e);
+        }
+    }
+}
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
index f27f56a..a57ef22 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
@@ -26,7 +26,6 @@ import org.apache.camel.PooledExchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.RouteAware;
-import org.apache.camel.Service;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.ExchangeFactory;
 import org.apache.camel.spi.RouteIdAware;
@@ -174,9 +173,7 @@ public class DefaultConsumer extends ServiceSupport 
implements Consumer, RouteAw
     @Override
     protected void doBuild() throws Exception {
         super.doBuild();
-        if (exchangeFactory instanceof Service) {
-            ((Service) exchangeFactory).build();
-        }
+        exchangeFactory.build();
     }
 
     @Override

Reply via email to