This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 31ba781  CAMEL-16279: camel-core - Optimize core to reduce object 
allocations … (#5183)
31ba781 is described below

commit 31ba781997e413bf4e5a57653f47c8b6af6271b3
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Mon Mar 8 16:48:38 2021 +0100

    CAMEL-16279: camel-core - Optimize core to reduce object allocations … 
(#5183)
    
    CAMEL-16279: camel-core - Optimize core to reduce object allocations by 
pooloing reusable tasks in the routing engine.
    
    CAMEL-16314: camel-core - Some components does not work well with pooled 
exchanges
---
 .../KinesisConsumerClosedShardWithSilentTest.java  |   5 +-
 .../camel/component/hdfs/HdfsConsumerTest.java     |   6 +
 .../component/jetty/CamelContinuationServlet.java  |   4 +-
 .../component/jms/EndpointMessageListener.java     |  24 +--
 .../camel/component/jpa/AbstractJpaMethodTest.java |   7 +-
 .../camel/component/jpa/JpaWithNamedQueryTest.java |   3 +-
 .../component/master/EndpointUriEncodingTest.java  |   3 +-
 .../http/handlers/HttpServerChannelHandler.java    |   3 +-
 .../camel/component/netty/http/BaseNettyTest.java  |   1 +
 .../netty/handlers/ServerChannelHandler.java       |   5 +-
 .../camel/component/netty/BaseNettyTest.java       |   1 +
 .../component/quickfixj/QuickfixjEndpoint.java     |   3 +
 .../reactive/streams/ReactiveStreamsConsumer.java  |   4 +-
 .../engine/ReactorStreamsServiceTestSupport.java   |   5 +
 .../engine/RxJavaStreamsServiceTestSupport.java    |   5 +
 .../sjms/consumer/EndpointMessageListener.java     |  25 +--
 .../springrabbit/EndpointMessageListener.java      |  24 +--
 .../yammer/YammerMessageAndUserRouteTest.java      |   7 -
 .../java/org/apache/camel/spi/ExchangeFactory.java |  86 +---------
 .../apache/camel/spi/InternalProcessorFactory.java |   4 +-
 .../org/apache/camel/spi/PooledObjectFactory.java  | 125 +++++++++++++++
 .../camel/impl/engine/AbstractCamelContext.java    |   3 +
 .../camel/impl/engine/CamelInternalProcessor.java  |   1 +
 .../camel/impl/engine/PooledExchangeFactory.java   |  17 +-
 .../impl/engine/PrototypeExchangeFactory.java      | 111 ++-----------
 .../camel/impl/engine/SimpleCamelContext.java      |   6 +-
 .../InterceptSendToEndpointProcessor.java          |  55 ++++---
 .../java/org/apache/camel/processor/Pipeline.java  |  61 +++++--
 .../apache/camel/processor/PooledExchangeTask.java |  41 +++++
 .../camel/processor/PooledExchangeTaskFactory.java |  56 +++++++
 .../apache/camel/processor/PooledTaskFactory.java  |  72 +++++++++
 .../camel/processor/PrototypeTaskFactory.java      |  48 ++++++
 .../errorhandler/RedeliveryErrorHandler.java       | 175 ++++++++++++++++-----
 .../camel/support/PooledObjectFactorySupport.java  | 152 ++++++++++++++++++
 .../support/PrototypeObjectFactorySupport.java     | 139 ++++++++++++++++
 35 files changed, 931 insertions(+), 356 deletions(-)

diff --git 
a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
 
b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
index d43f23f..ce1ebac 100644
--- 
a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
+++ 
b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
@@ -67,7 +67,7 @@ public class KinesisConsumerClosedShardWithSilentTest {
         configuration.setIteratorType(ShardIteratorType.LATEST);
         configuration.setShardClosed(Kinesis2ShardClosedStrategyEnum.silent);
         configuration.setStreamName("streamName");
-        Kinesis2Endpoint endpoint = new Kinesis2Endpoint(null, configuration, 
component);
+        Kinesis2Endpoint endpoint = new Kinesis2Endpoint("aws2-kinesis:foo", 
configuration, component);
         endpoint.start();
         undertest = new Kinesis2Consumer(endpoint, processor);
 
@@ -84,6 +84,9 @@ public class KinesisConsumerClosedShardWithSilentTest {
                         
.streamDescription(StreamDescription.builder().shards(shardList).build()).build());
         
when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class)))
                 
.thenReturn(GetShardIteratorResponse.builder().shardIterator("shardIterator").build());
+
+        context.start();
+        undertest.start();
     }
 
     @Test
diff --git 
a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
 
b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
index b1ca851..1681487 100644
--- 
a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
+++ 
b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
@@ -21,8 +21,10 @@ import java.io.InputStream;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.engine.PrototypeExchangeFactory;
 import org.apache.camel.support.DefaultExchange;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -83,6 +85,8 @@ public class HdfsConsumerTest {
         
when(endpointConfig.getOpenedSuffix()).thenReturn(DEFAULT_OPENED_SUFFIX);
 
         context = new DefaultCamelContext();
+        // this test is mocking and its easier to test with prototype scoped
+        context.adapt(ExtendedCamelContext.class).setExchangeFactory(new 
PrototypeExchangeFactory());
     }
 
     @Test
@@ -159,6 +163,7 @@ public class HdfsConsumerTest {
         ArgumentCaptor<Exchange> exchangeCaptor = 
ArgumentCaptor.forClass(Exchange.class);
 
         underTest = new HdfsConsumer(endpoint, processor, endpointConfig, 
hdfsInfoFactory, new StringBuilder(hdfsPath));
+        underTest.start();
 
         // when
         int actual = underTest.doPoll();
@@ -211,6 +216,7 @@ public class HdfsConsumerTest {
         ArgumentCaptor<Exchange> exchangeCaptor = 
ArgumentCaptor.forClass(Exchange.class);
 
         underTest = new HdfsConsumer(endpoint, processor, endpointConfig, 
hdfsInfoFactory, new StringBuilder(hdfsPath));
+        underTest.start();
 
         // when
         int actual = underTest.doPoll();
diff --git 
a/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
 
b/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
index fdd4974..65db856 100644
--- 
a/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
+++ 
b/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
@@ -181,7 +181,8 @@ public class CamelContinuationServlet extends CamelServlet {
             }
 
             // a new request so create an exchange
-            final Exchange exchange = consumer.createExchange(false);
+            // must be prototype scoped (not pooled) so we create the exchange 
via endpoint
+            final Exchange exchange = endpoint.createExchange();
             exchange.setPattern(ExchangePattern.InOut);
 
             if (consumer.getEndpoint().isBridgeEndpoint()) {
@@ -269,7 +270,6 @@ public class CamelContinuationServlet extends CamelServlet {
             throw new ServletException(e);
         } finally {
             consumer.doneUoW(result);
-            consumer.releaseExchange(result, false);
         }
     }
 
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
index 872b2af..5bb1f9f 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
@@ -26,7 +26,6 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
-import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Processor;
 import org.apache.camel.RollbackExchangeException;
 import org.apache.camel.RuntimeCamelException;
@@ -150,9 +149,6 @@ public class EndpointMessageListener implements 
SessionAwareMessageListener {
             // if we failed processed the exchange from the async callback 
task, then grab the exception
             rce = exchange.getException(RuntimeCamelException.class);
 
-            // the exchange is now done so release it
-            consumer.releaseExchange(exchange, false);
-
         } catch (Exception e) {
             rce = wrapRuntimeCamelException(e);
         }
@@ -256,31 +252,15 @@ public class EndpointMessageListener implements 
SessionAwareMessageListener {
                     }
                 }
             }
-
-            // if we completed from async processing then we should release 
the exchange
-            // the sync processing will release the exchange outside this 
callback
-            if (!doneSync) {
-                consumer.releaseExchange(exchange, false);
-            }
         }
     }
 
     public Exchange createExchange(Message message, Session session, Object 
replyDestination) {
-        Exchange exchange = consumer.createExchange(false);
+        // must be prototype scoped (not pooled) so we create the exchange via 
endpoint
+        Exchange exchange = endpoint.createExchange(message, session);
         JmsBinding binding = getBinding();
         exchange.setProperty(Exchange.BINDING, binding);
 
-        // optimize: either create a new JmsMessage or reuse existing if exists
-        JmsMessage msg = 
exchange.adapt(ExtendedExchange.class).getInOrNull(JmsMessage.class);
-        if (msg == null) {
-            msg = new JmsMessage(exchange, message, session, binding);
-            exchange.setIn(msg);
-        } else {
-            msg.setJmsMessage(message);
-            msg.setJmsSession(session);
-            msg.setBinding(binding);
-        }
-
         // lets set to an InOut if we have some kind of reply-to destination
         if (replyDestination != null && !disableReplyTo) {
             // only change pattern if not already out capable
diff --git 
a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/AbstractJpaMethodTest.java
 
b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/AbstractJpaMethodTest.java
index 5bcf4f6..154dd41 100644
--- 
a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/AbstractJpaMethodTest.java
+++ 
b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/AbstractJpaMethodTest.java
@@ -45,7 +45,7 @@ public abstract class AbstractJpaMethodTest extends 
CamelTestSupport {
     protected EntityManager entityManager;
     protected TransactionTemplate transactionTemplate;
     protected Consumer consumer;
-    protected Exchange receivedExchange;
+    protected Customer receivedCustomer;
 
     abstract boolean usePersist();
 
@@ -123,7 +123,7 @@ public abstract class AbstractJpaMethodTest extends 
CamelTestSupport {
 
         consumer = endpoint.createConsumer(new Processor() {
             public void process(Exchange e) {
-                receivedExchange = e;
+                receivedCustomer = e.getIn().getBody(Customer.class);
                 assertNotNull(e.getIn().getHeader(JpaConstants.ENTITY_MANAGER, 
EntityManager.class));
                 latch.countDown();
             }
@@ -135,8 +135,7 @@ public abstract class AbstractJpaMethodTest extends 
CamelTestSupport {
         consumer.stop();
         Thread.sleep(1000);
 
-        assertNotNull(receivedExchange);
-        Customer receivedCustomer = 
receivedExchange.getIn().getBody(Customer.class);
+        assertNotNull(receivedCustomer);
         assertEquals(customer.getName(), receivedCustomer.getName());
         assertEquals(customer.getId(), receivedCustomer.getId());
         assertEquals(customer.getAddress().getAddressLine1(), 
receivedCustomer.getAddress().getAddressLine1());
diff --git 
a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryTest.java
 
b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryTest.java
index 55c35db..04bc452 100644
--- 
a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryTest.java
+++ 
b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryTest.java
@@ -95,7 +95,8 @@ public class JpaWithNamedQueryTest {
         consumer = endpoint.createConsumer(new Processor() {
             public void process(Exchange e) {
                 LOG.info("Received exchange: " + e.getIn());
-                receivedExchange = e;
+                // make defensive copy
+                receivedExchange = e.copy();
                 latch.countDown();
             }
         });
diff --git 
a/components/camel-master/src/test/java/org/apache/camel/component/master/EndpointUriEncodingTest.java
 
b/components/camel-master/src/test/java/org/apache/camel/component/master/EndpointUriEncodingTest.java
index c78a06c..9ec78a6 100644
--- 
a/components/camel-master/src/test/java/org/apache/camel/component/master/EndpointUriEncodingTest.java
+++ 
b/components/camel-master/src/test/java/org/apache/camel/component/master/EndpointUriEncodingTest.java
@@ -85,7 +85,8 @@ public class EndpointUriEncodingTest extends CamelTestSupport 
{
                 public Consumer createConsumer(Processor processor) {
                     return new DefaultConsumer(this, processor) {
                         @Override
-                        public void start() {
+                        protected void doStart() throws Exception {
+                            super.doStart();
                             Exchange exchange = createExchange(true);
                             exchange.getMessage().setHeader("foo", foo);
                             exchange.getMessage().setHeader("bar", bar);
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
index 1831d64..d6873a6 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
@@ -343,7 +343,8 @@ public class HttpServerChannelHandler extends 
ServerChannelHandler {
 
     @Override
     protected Exchange createExchange(ChannelHandlerContext ctx, Object 
message) throws Exception {
-        Exchange exchange = consumer.createExchange(false);
+        // must be prototype scoped (not pooled) so we create the exchange via 
endpoint
+        Exchange exchange = consumer.getEndpoint().createExchange();
 
         // create a new IN message as we cannot reuse with netty
         Message in;
diff --git 
a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/BaseNettyTest.java
 
b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/BaseNettyTest.java
index adafadd..d2b41c4 100644
--- 
a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/BaseNettyTest.java
+++ 
b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/BaseNettyTest.java
@@ -48,6 +48,7 @@ public class BaseNettyTest extends CamelTestSupport {
     public static void startLeakDetection() {
         System.setProperty("io.netty.leakDetection.maxRecords", "100");
         System.setProperty("io.netty.leakDetection.acquireAndReleaseOnly", 
"true");
+        System.setProperty("io.netty.leakDetection.targetRecords", "100");
         ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
     }
 
diff --git 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
index 7c0eaed..3a6a796 100644
--- 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
+++ 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
@@ -113,7 +113,8 @@ public class ServerChannelHandler extends 
SimpleChannelInboundHandler<Object> {
     }
 
     protected Exchange createExchange(ChannelHandlerContext ctx, Object 
message) throws Exception {
-        Exchange exchange = consumer.createExchange(false);
+        // must be prototype scoped (not pooled) so we create the exchange via 
endpoint
+        Exchange exchange = consumer.getEndpoint().createExchange();
         consumer.getEndpoint().updateMessageHeader(exchange.getIn(), ctx);
         NettyPayloadHelper.setIn(exchange, message);
         return exchange;
@@ -140,7 +141,6 @@ public class ServerChannelHandler extends 
SimpleChannelInboundHandler<Object> {
             consumer.getExceptionHandler().handleException(e);
         } finally {
             consumer.doneUoW(exchange);
-            consumer.releaseExchange(exchange, false);
         }
     }
 
@@ -155,7 +155,6 @@ public class ServerChannelHandler extends 
SimpleChannelInboundHandler<Object> {
                 consumer.getExceptionHandler().handleException(e);
             } finally {
                 consumer.doneUoW(exchange);
-                consumer.releaseExchange(exchange, false);
             }
         });
     }
diff --git 
a/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java
 
b/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java
index b98e501..8d40832 100644
--- 
a/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java
+++ 
b/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java
@@ -49,6 +49,7 @@ public class BaseNettyTest extends CamelTestSupport {
     public static void startLeakDetection() {
         System.setProperty("io.netty.leakDetection.maxRecords", "100");
         System.setProperty("io.netty.leakDetection.acquireAndReleaseOnly", 
"true");
+        System.setProperty("io.netty.leakDetection.targetRecords", "100");
         ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
     }
 
diff --git 
a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
 
b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
index 0f45d55..b83f0bc 100644
--- 
a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
+++ 
b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
@@ -33,6 +33,7 @@ import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 import org.apache.camel.support.DefaultEndpoint;
+import org.apache.camel.support.service.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import quickfix.Message;
@@ -132,6 +133,8 @@ public class QuickfixjEndpoint extends DefaultEndpoint 
implements QuickfixjEvent
     public void onEvent(QuickfixjEventCategory eventCategory, SessionID 
sessionID, Message message) throws Exception {
         if (this.sessionID == null || isMatching(sessionID)) {
             for (QuickfixjConsumer consumer : consumers) {
+                // ensure consumer is started
+                ServiceHelper.startService(consumer);
                 Exchange exchange
                         = QuickfixjConverters.toExchange(consumer, sessionID, 
message, eventCategory, getExchangePattern());
                 try {
diff --git 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
index 815c3e2..989b901 100644
--- 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
+++ 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
@@ -75,7 +75,7 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
 
     public void onComplete() {
         if (endpoint.isForwardOnComplete()) {
-            Exchange exchange = createExchange(true);
+            Exchange exchange = endpoint.createExchange();
             
exchange.getIn().setHeader(ReactiveStreamsConstants.REACTIVE_STREAMS_EVENT_TYPE,
 "onComplete");
 
             doSend(exchange, done -> {
@@ -85,7 +85,7 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
 
     public void onError(Throwable error) {
         if (endpoint.isForwardOnError()) {
-            Exchange exchange = createExchange(true);
+            Exchange exchange = endpoint.createExchange();
             
exchange.getIn().setHeader(ReactiveStreamsConstants.REACTIVE_STREAMS_EVENT_TYPE,
 "onError");
             exchange.getIn().setBody(error);
 
diff --git 
a/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTestSupport.java
 
b/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTestSupport.java
index 2a86a93..f8723b3 100644
--- 
a/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTestSupport.java
+++ 
b/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTestSupport.java
@@ -17,10 +17,12 @@
 package org.apache.camel.component.reactor.engine;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsConstants;
 import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
 import 
org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.apache.camel.impl.engine.PrototypeExchangeFactory;
 import org.apache.camel.test.junit5.CamelTestSupport;
 import org.apache.camel.util.ObjectHelper;
 
@@ -31,6 +33,9 @@ class ReactorStreamsServiceTestSupport extends 
CamelTestSupport {
     protected CamelContext createCamelContext() throws Exception {
         CamelContext context = super.createCamelContext();
 
+        // camel-reactor does not work with pooled exchanges
+        context.adapt(ExtendedCamelContext.class).setExchangeFactory(new 
PrototypeExchangeFactory());
+
         context.addComponent(
                 ReactiveStreamsConstants.SCHEME,
                 
ReactiveStreamsComponent.withServiceType(ReactorStreamsConstants.SERVICE_NAME));
diff --git 
a/components/camel-rxjava/src/test/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsServiceTestSupport.java
 
b/components/camel-rxjava/src/test/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsServiceTestSupport.java
index 93519fd..81d8549 100644
--- 
a/components/camel-rxjava/src/test/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsServiceTestSupport.java
+++ 
b/components/camel-rxjava/src/test/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsServiceTestSupport.java
@@ -17,10 +17,12 @@
 package org.apache.camel.component.rxjava.engine;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsConstants;
 import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
 import 
org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.apache.camel.impl.engine.PrototypeExchangeFactory;
 import org.apache.camel.test.junit5.CamelTestSupport;
 import org.apache.camel.util.ObjectHelper;
 
@@ -31,6 +33,9 @@ class RxJavaStreamsServiceTestSupport extends 
CamelTestSupport {
     protected CamelContext createCamelContext() throws Exception {
         CamelContext context = super.createCamelContext();
 
+        // camel-rxjava does not work with pooled exchanges
+        context.adapt(ExtendedCamelContext.class).setExchangeFactory(new 
PrototypeExchangeFactory());
+
         context.addComponent(
                 ReactiveStreamsConstants.SCHEME,
                 
ReactiveStreamsComponent.withServiceType(RxJavaStreamsConstants.SERVICE_NAME));
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
index b2c9515..99ae442 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
@@ -29,7 +29,6 @@ import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
-import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Processor;
 import org.apache.camel.RollbackExchangeException;
 import org.apache.camel.RuntimeCamelException;
@@ -38,7 +37,6 @@ import org.apache.camel.component.sjms.SessionMessageListener;
 import org.apache.camel.component.sjms.SjmsConstants;
 import org.apache.camel.component.sjms.SjmsConsumer;
 import org.apache.camel.component.sjms.SjmsEndpoint;
-import org.apache.camel.component.sjms.SjmsMessage;
 import org.apache.camel.component.sjms.SjmsTemplate;
 import org.apache.camel.component.sjms.jms.JmsMessageHelper;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
@@ -212,9 +210,6 @@ public class EndpointMessageListener implements 
SessionMessageListener {
             // if we failed processed the exchange from the async callback 
task, then grab the exception
             rce = exchange.getException(RuntimeCamelException.class);
 
-            // the exchange is now done so release it
-            consumer.releaseExchange(exchange, false);
-
         } catch (Exception e) {
             rce = wrapRuntimeCamelException(e);
         }
@@ -242,18 +237,8 @@ public class EndpointMessageListener implements 
SessionMessageListener {
     }
 
     public Exchange createExchange(Message message, Session session, Object 
replyDestination) {
-        Exchange exchange = consumer.createExchange(false);
-
-        // optimize: either create a new SjmsMessage or reuse existing if 
exists
-        SjmsMessage msg = 
exchange.adapt(ExtendedExchange.class).getInOrNull(SjmsMessage.class);
-        if (msg == null) {
-            msg = new SjmsMessage(exchange, message, session, 
endpoint.getBinding());
-            exchange.setIn(msg);
-        } else {
-            msg.setJmsMessage(message);
-            msg.setJmsSession(session);
-            msg.setBinding(endpoint.getBinding());
-        }
+        // must be prototype scoped (not pooled) so we create the exchange via 
endpoint
+        Exchange exchange = endpoint.createExchange(message, session);
 
         // lets set to an InOut if we have some kind of reply-to destination
         if (replyDestination != null && !disableReplyTo) {
@@ -474,12 +459,6 @@ public class EndpointMessageListener implements 
SessionMessageListener {
                     }
                 }
             }
-
-            // if we completed from async processing then we should release 
the exchange
-            // the sync processing will release the exchange outside this 
callback
-            if (!doneSync) {
-                consumer.releaseExchange(exchange, false);
-            }
         }
     }
 
diff --git 
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java
 
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java
index 39a7f02..0766bc1 100644
--- 
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java
+++ 
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.component.springrabbit;
 
-import java.util.Map;
-
 import com.rabbitmq.client.Channel;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
@@ -141,9 +139,6 @@ public class EndpointMessageListener implements 
ChannelAwareMessageListener {
             // if we failed processed the exchange from the async callback 
task, then grab the exception
             rce = exchange.getException(RuntimeCamelException.class);
 
-            // the exchange is now done so release it
-            consumer.releaseExchange(exchange, false);
-
         } catch (Exception e) {
             rce = wrapRuntimeCamelException(e);
         }
@@ -160,18 +155,7 @@ public class EndpointMessageListener implements 
ChannelAwareMessageListener {
     }
 
     protected Exchange createExchange(Message message, Channel channel, Object 
replyDestination) {
-        Exchange exchange = consumer.createExchange(false);
-
-        Object body = messageConverter.fromMessage(message);
-        exchange.getMessage().setBody(body);
-
-        // TODO: optimize to use existing headers map
-        Map<String, Object> headers
-                = 
messagePropertiesConverter.fromMessageProperties(message.getMessageProperties(),
 exchange);
-        if (!headers.isEmpty()) {
-            exchange.getMessage().setHeaders(headers);
-        }
-
+        Exchange exchange = endpoint.createExchange(message);
         exchange.setProperty(SpringRabbitMQConstants.CHANNEL, channel);
 
         // lets set to an InOut if we have some kind of reply-to destination
@@ -261,12 +245,6 @@ public class EndpointMessageListener implements 
ChannelAwareMessageListener {
                     }
                 }
             }
-
-            // if we completed from async processing then we should release 
the exchange
-            // the sync processing will release the exchange outside this 
callback
-            if (!doneSync) {
-                consumer.releaseExchange(exchange, false);
-            }
         }
 
         private void sendReply(Address replyDestination, Message message, 
Exchange exchange, org.apache.camel.Message out) {
diff --git 
a/components/camel-yammer/src/test/java/org/apache/camel/component/yammer/YammerMessageAndUserRouteTest.java
 
b/components/camel-yammer/src/test/java/org/apache/camel/component/yammer/YammerMessageAndUserRouteTest.java
index eadb46d..056f558 100644
--- 
a/components/camel-yammer/src/test/java/org/apache/camel/component/yammer/YammerMessageAndUserRouteTest.java
+++ 
b/components/camel-yammer/src/test/java/org/apache/camel/component/yammer/YammerMessageAndUserRouteTest.java
@@ -21,7 +21,6 @@ import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.yammer.model.Messages;
-import org.apache.camel.component.yammer.model.User;
 import org.apache.camel.test.junit5.CamelTestSupport;
 import org.junit.jupiter.api.Test;
 
@@ -70,12 +69,6 @@ public class YammerMessageAndUserRouteTest extends 
CamelTestSupport {
         template.sendBody("direct:start", "overwrite me");
 
         userMock.assertIsSatisfied();
-
-        exchange = userMock.getExchanges().get(0);
-        User user = exchange.getIn().getBody(User.class);
-
-        assertEquals("Joe Camel", user.getFullName());
-        assertEquals("jca...@redhat.com", 
user.getContact().getEmailAddresses().get(0).getAddress());
     }
 
     @Override
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java 
b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
index fcedd54..29b3fdb 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
@@ -16,12 +16,10 @@
  */
 package org.apache.camel.spi;
 
-import org.apache.camel.CamelContextAware;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.NonManagedService;
-import org.apache.camel.Service;
 
 /**
  * Factory used by {@link Consumer} to create Camel {@link Exchange} holding 
the incoming message received by the
@@ -36,50 +34,7 @@ import org.apache.camel.Service;
  * The factory is pluggable which allows to use different strategies. The 
default factory will create a new
  * {@link Exchange} instance, and the pooled factory will pool and reuse 
exchanges.
  */
-public interface ExchangeFactory extends Service, CamelContextAware, 
NonManagedService, RouteIdAware {
-
-    /**
-     * Utilization statistics of the this factory.
-     */
-    interface Statistics {
-
-        /**
-         * Number of new exchanges created.
-         */
-        long getCreatedCounter();
-
-        /**
-         * Number of exchanges acquired (reused) when using pooled factory.
-         */
-        long getAcquiredCounter();
-
-        /**
-         * Number of exchanges released back to pool
-         */
-        long getReleasedCounter();
-
-        /**
-         * Number of exchanges discarded (thrown away) such as if no space in 
cache pool.
-         */
-        long getDiscardedCounter();
-
-        /**
-         * Reset the counters
-         */
-        void reset();
-
-        /**
-         * Whether statistics is enabled.
-         */
-        boolean isStatisticsEnabled();
-
-        /**
-         * Sets whether statistics is enabled.
-         *
-         * @param statisticsEnabled <tt>true</tt> to enable
-         */
-        void setStatisticsEnabled(boolean statisticsEnabled);
-    }
+public interface ExchangeFactory extends PooledObjectFactory<Exchange>, 
NonManagedService, RouteIdAware {
 
     /**
      * Service factory key.
@@ -125,43 +80,8 @@ public interface ExchangeFactory extends Service, 
CamelContextAware, NonManagedS
     }
 
     /**
-     * The capacity the pool (for each consumer) uses for storing exchanges. 
The default capacity is 100.
-     */
-    int getCapacity();
-
-    /**
-     * The current number of exchanges in the pool
-     */
-    int getSize();
-
-    /**
-     * The capacity the pool (for each consumer) uses for storing exchanges. 
The default capacity is 100.
-     */
-    void setCapacity(int capacity);
-
-    /**
-     * Whether statistics is enabled.
-     */
-    boolean isStatisticsEnabled();
-
-    /**
-     * Whether statistics is enabled.
-     */
-    void setStatisticsEnabled(boolean statisticsEnabled);
-
-    /**
-     * Reset the statistics
-     */
-    void resetStatistics();
-
-    /**
-     * Purges the internal cache (if pooled)
-     */
-    void purge();
-
-    /**
-     * Gets the usage statistics
+     * Whether the factory is pooled.
      */
-    Statistics getStatistics();
+    boolean isPooled();
 
 }
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
 
b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
index b3f8868..8a56f21 100644
--- 
a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
+++ 
b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
@@ -27,7 +27,9 @@ import org.apache.camel.Route;
 
 /**
  * A factory used internally by Camel to create {@link Processor} and other 
internal building blocks. This factory is
- * used to have loose coupling between the modules in core. Camel user user 
should only use {@link ProcessorFactory}.
+ * used to have loose coupling between the modules in core.
+ *
+ * Camel end user should NOT use this, but use {@link ProcessorFactory} 
instead.
  *
  * @see ProcessorFactory
  */
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java 
b/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java
new file mode 100644
index 0000000..db4c0d1
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Service;
+
+/**
+ * Factory for pooled objects or tasks.
+ */
+public interface PooledObjectFactory<T> extends Service, CamelContextAware {
+
+    /**
+     * Utilization statistics of the this factory.
+     */
+    interface Statistics {
+
+        /**
+         * Number of new exchanges created.
+         */
+        long getCreatedCounter();
+
+        /**
+         * Number of exchanges acquired (reused) when using pooled factory.
+         */
+        long getAcquiredCounter();
+
+        /**
+         * Number of exchanges released back to pool
+         */
+        long getReleasedCounter();
+
+        /**
+         * Number of exchanges discarded (thrown away) such as if no space in 
cache pool.
+         */
+        long getDiscardedCounter();
+
+        /**
+         * Reset the counters
+         */
+        void reset();
+
+        /**
+         * Whether statistics is enabled.
+         */
+        boolean isStatisticsEnabled();
+
+        /**
+         * Sets whether statistics is enabled.
+         *
+         * @param statisticsEnabled <tt>true</tt> to enable
+         */
+        void setStatisticsEnabled(boolean statisticsEnabled);
+    }
+
+    /**
+     * The current number of objects in the pool
+     */
+    int getSize();
+
+    /**
+     * The capacity the pool uses for storing objects. The default capacity is 
100.
+     */
+    int getCapacity();
+
+    /**
+     * The capacity the pool uses for storing objects. The default capacity is 
100.
+     */
+    void setCapacity(int capacity);
+
+    /**
+     * Whether statistics is enabled.
+     */
+    boolean isStatisticsEnabled();
+
+    /**
+     * Whether statistics is enabled.
+     */
+    void setStatisticsEnabled(boolean statisticsEnabled);
+
+    /**
+     * Reset the statistics
+     */
+    void resetStatistics();
+
+    /**
+     * Purges the internal cache (if pooled)
+     */
+    void purge();
+
+    /**
+     * Gets the usage statistics
+     */
+    Statistics getStatistics();
+
+    /**
+     * Acquires an object from the pool (if any)
+     *
+     * @return the object or <tt>null</tt> if the pool is empty
+     */
+    T acquire();
+
+    /**
+     * Releases the object back to the pool
+     *
+     * @param  t the object
+     * @return   true if released into the pool, or false if something went 
wrong and the object was discarded
+     */
+    boolean release(T t);
+
+}
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index d7e6dd4..45fcf6d 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -2882,6 +2882,9 @@ public abstract class AbstractCamelContext extends 
BaseService
         }
         bootstraps.clear();
 
+        if (adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled()) 
{
+            LOG.info("Pooled mode enabled. Camel pools and reuses objects to 
reduce JVM object allocations.");
+        }
         if (isLightweight()) {
             LOG.info("Lightweight mode enabled. Performing optimizations and 
memory reduction.");
             ReifierStrategy.clearReifiers();
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 9696d1b..9e847fe 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -271,6 +271,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
         }
 
         // create internal callback which will execute the advices in reverse 
order when done
+        // TODO: pool this task, and the states array
         AsyncCallback callback = new AsyncAfterTask(states, exchange, 
originalCallback);
 
         if (exchange.isTransacted()) {
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index 4079a46..bd40516 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -168,11 +168,20 @@ public final class PooledExchangeFactory extends 
PrototypeExchangeFactory {
     }
 
     @Override
+    public boolean isPooled() {
+        return true;
+    }
+
+    @Override
     protected void doStop() throws Exception {
-        exchangeFactoryManager.removeExchangeFactory(this);
-        logUsageSummary(LOG, "PooledExchangeFactory", pool.size());
-        statistics.reset();
-        pool.clear();
+        if (exchangeFactoryManager != null) {
+            exchangeFactoryManager.removeExchangeFactory(this);
+        }
+        if (pool != null) {
+            logUsageSummary(LOG, "PooledExchangeFactory", pool.size());
+            statistics.reset();
+            pool.clear();
+        }
 
         // do not call super
     }
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java
index fb17cb0..eef1ea7 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java
@@ -16,9 +16,6 @@
  */
 package org.apache.camel.impl.engine;
 
-import java.util.concurrent.atomic.LongAdder;
-
-import org.apache.camel.CamelContext;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -26,7 +23,7 @@ import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.spi.ExchangeFactory;
 import org.apache.camel.spi.ExchangeFactoryManager;
 import org.apache.camel.support.DefaultExchange;
-import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.support.PooledObjectFactorySupport;
 import org.apache.camel.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,13 +31,11 @@ import org.slf4j.LoggerFactory;
 /**
  * {@link ExchangeFactory} that creates a new {@link Exchange} instance.
  */
-public class PrototypeExchangeFactory extends ServiceSupport implements 
ExchangeFactory {
+public class PrototypeExchangeFactory extends 
PooledObjectFactorySupport<Exchange> implements ExchangeFactory {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PrototypeExchangeFactory.class);
 
-    final UtilizationStatistics statistics = new UtilizationStatistics();
     final Consumer consumer;
-    CamelContext camelContext;
     ExchangeFactoryManager exchangeFactoryManager;
     String routeId;
 
@@ -54,6 +49,7 @@ public class PrototypeExchangeFactory extends ServiceSupport 
implements Exchange
 
     @Override
     protected void doBuild() throws Exception {
+        super.doBuild();
         this.exchangeFactoryManager = 
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactoryManager();
     }
 
@@ -73,16 +69,6 @@ public class PrototypeExchangeFactory extends ServiceSupport 
implements Exchange
     }
 
     @Override
-    public CamelContext getCamelContext() {
-        return camelContext;
-    }
-
-    @Override
-    public void setCamelContext(CamelContext camelContext) {
-        this.camelContext = camelContext;
-    }
-
-    @Override
     public ExchangeFactory newExchangeFactory(Consumer consumer) {
         PrototypeExchangeFactory answer = new 
PrototypeExchangeFactory(consumer);
         answer.setStatisticsEnabled(statistics.isStatisticsEnabled());
@@ -92,6 +78,11 @@ public class PrototypeExchangeFactory extends ServiceSupport 
implements Exchange
     }
 
     @Override
+    public Exchange acquire() {
+        throw new UnsupportedOperationException("Not in use");
+    }
+
+    @Override
     public Exchange create(boolean autoRelease) {
         if (statistics.isStatisticsEnabled()) {
             statistics.created.increment();
@@ -116,47 +107,18 @@ public class PrototypeExchangeFactory extends 
ServiceSupport implements Exchange
     }
 
     @Override
-    public boolean isStatisticsEnabled() {
-        return statistics.isStatisticsEnabled();
-    }
-
-    @Override
-    public void setStatisticsEnabled(boolean statisticsEnabled) {
-        statistics.setStatisticsEnabled(statisticsEnabled);
-    }
-
-    @Override
-    public int getCapacity() {
-        return 0;
-    }
-
-    @Override
-    public int getSize() {
-        return 0;
-    }
-
-    @Override
-    public void setCapacity(int capacity) {
-        // not in use
-    }
-
-    @Override
     public void resetStatistics() {
         statistics.reset();
     }
 
     @Override
-    public void purge() {
-        // not in use
-    }
-
-    @Override
-    public Statistics getStatistics() {
-        return statistics;
+    public boolean isPooled() {
+        return false;
     }
 
     @Override
     protected void doStart() throws Exception {
+        super.doStart();
         if (exchangeFactoryManager != null) {
             exchangeFactoryManager.addExchangeFactory(this);
         }
@@ -164,6 +126,7 @@ public class PrototypeExchangeFactory extends 
ServiceSupport implements Exchange
 
     @Override
     protected void doStop() throws Exception {
+        super.doStop();
         if (exchangeFactoryManager != null) {
             exchangeFactoryManager.removeExchangeFactory(this);
         }
@@ -190,54 +153,4 @@ public class PrototypeExchangeFactory extends 
ServiceSupport implements Exchange
         }
     }
 
-    /**
-     * Represents utilization statistics
-     */
-    final class UtilizationStatistics implements ExchangeFactory.Statistics {
-
-        boolean statisticsEnabled;
-        final LongAdder created = new LongAdder();
-        final LongAdder acquired = new LongAdder();
-        final LongAdder released = new LongAdder();
-        final LongAdder discarded = new LongAdder();
-
-        @Override
-        public void reset() {
-            created.reset();
-            acquired.reset();
-            released.reset();
-            discarded.reset();
-        }
-
-        @Override
-        public long getCreatedCounter() {
-            return created.longValue();
-        }
-
-        @Override
-        public long getAcquiredCounter() {
-            return acquired.longValue();
-        }
-
-        @Override
-        public long getReleasedCounter() {
-            return released.longValue();
-        }
-
-        @Override
-        public long getDiscardedCounter() {
-            return discarded.longValue();
-        }
-
-        @Override
-        public boolean isStatisticsEnabled() {
-            return statisticsEnabled;
-        }
-
-        @Override
-        public void setStatisticsEnabled(boolean statisticsEnabled) {
-            this.statisticsEnabled = statisticsEnabled;
-        }
-    }
-
 }
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
index 057cd5a..efe5d39 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
@@ -16,9 +16,6 @@
  */
 package org.apache.camel.impl.engine;
 
-import java.util.Map;
-import java.util.Optional;
-
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Processor;
@@ -85,6 +82,9 @@ import org.apache.camel.support.ResolverHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+import java.util.Optional;
+
 /**
  * Represents the context used to configure routes and the policies to use.
  */
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java
index 859a48d..275fa6a 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java
@@ -46,6 +46,7 @@ public class InterceptSendToEndpointProcessor extends 
DefaultAsyncProducer {
     private final Endpoint delegate;
     private final AsyncProducer producer;
     private final boolean skip;
+    private AsyncProcessor pipeline;
 
     public InterceptSendToEndpointProcessor(InterceptSendToEndpoint endpoint, 
Endpoint delegate, AsyncProducer producer,
                                             boolean skip) {
@@ -71,24 +72,9 @@ public class InterceptSendToEndpointProcessor extends 
DefaultAsyncProducer {
         // add header with the real endpoint uri
         exchange.getIn().setHeader(Exchange.INTERCEPTED_ENDPOINT, 
delegate.getEndpointUri());
 
-        if (endpoint.getBefore() != null || endpoint.getAfter() != null) {
-            // detour the exchange using synchronous processing
-            AsyncProcessor before = null;
-            if (endpoint.getBefore() != null) {
-                before = 
AsyncProcessorConverterHelper.convert(endpoint.getBefore());
-            }
-            AsyncProcessor ascb = new AsyncProcessorSupport() {
-                @Override
-                public boolean process(Exchange exchange, AsyncCallback 
callback) {
-                    return callback(exchange, callback, true);
-                }
-            };
-            AsyncProcessor after = null;
-            if (endpoint.getAfter() != null) {
-                after = 
AsyncProcessorConverterHelper.convert(endpoint.getAfter());
-            }
-
-            return new Pipeline(exchange.getContext(), Arrays.asList(before, 
ascb, after)).process(exchange, callback);
+        if (pipeline != null) {
+            // detour the exchange with the pipeline that has before and after 
included
+            return pipeline.process(exchange, callback);
         }
 
         return callback(exchange, callback, true);
@@ -140,19 +126,38 @@ public class InterceptSendToEndpointProcessor extends 
DefaultAsyncProducer {
 
     @Override
     protected void doBuild() throws Exception {
-        ServiceHelper.buildService(producer);
+        // build pipeline with befofe/after processors
+        if (endpoint.getBefore() != null || endpoint.getAfter() != null) {
+            // detour the exchange using synchronous processing
+            AsyncProcessor before = null;
+            if (endpoint.getBefore() != null) {
+                before = 
AsyncProcessorConverterHelper.convert(endpoint.getBefore());
+            }
+            AsyncProcessor ascb = new AsyncProcessorSupport() {
+                @Override
+                public boolean process(Exchange exchange, AsyncCallback 
callback) {
+                    return callback(exchange, callback, true);
+                }
+            };
+            AsyncProcessor after = null;
+            if (endpoint.getAfter() != null) {
+                after = 
AsyncProcessorConverterHelper.convert(endpoint.getAfter());
+            }
+
+            pipeline = new Pipeline(getEndpoint().getCamelContext(), 
Arrays.asList(before, ascb, after));
+        }
+
+        ServiceHelper.buildService(producer, pipeline);
     }
 
     @Override
     protected void doInit() throws Exception {
-        ServiceHelper.initService(producer);
+        ServiceHelper.initService(producer, pipeline);
     }
 
     @Override
     protected void doStart() throws Exception {
-        ServiceHelper.startService(endpoint.getBefore(), endpoint.getAfter());
-        // here we also need to start the producer
-        ServiceHelper.startService(producer);
+        ServiceHelper.startService(producer, pipeline);
     }
 
     @Override
@@ -162,4 +167,8 @@ public class InterceptSendToEndpointProcessor extends 
DefaultAsyncProducer {
         ServiceHelper.stopService(producer);
     }
 
+    @Override
+    protected void doShutdown() throws Exception {
+        ServiceHelper.stopAndShutdownServices(producer, pipeline);
+    }
 }
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java
index 3bbbf14..6d6b3d1 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -53,18 +53,32 @@ public class Pipeline extends AsyncProcessorSupport 
implements Navigate<Processo
     private final ReactiveExecutor reactiveExecutor;
     private final List<AsyncProcessor> processors;
     private final int size;
+    private PooledExchangeTaskFactory taskFactory;
+
     private String id;
     private String routeId;
 
-    private final class PipelineTask implements Runnable, AsyncCallback {
+    private final class PipelineTask implements PooledExchangeTask, 
AsyncCallback {
 
-        private final Exchange exchange;
-        private final AsyncCallback callback;
+        private Exchange exchange;
+        private AsyncCallback callback;
         private int index;
 
-        PipelineTask(Exchange exchange, AsyncCallback callback) {
+        PipelineTask() {
+        }
+
+        @Override
+        public void prepare(Exchange exchange, AsyncCallback callback) {
             this.exchange = exchange;
             this.callback = callback;
+            this.index = 0;
+        }
+
+        @Override
+        public void reset() {
+            this.exchange = null;
+            this.callback = null;
+            this.index = 0;
         }
 
         @Override
@@ -101,7 +115,9 @@ public class Pipeline extends AsyncProcessorSupport 
implements Navigate<Processo
                     LOG.trace("Processing complete for exchangeId: {} >>> {}", 
exchange.getExchangeId(), exchange);
                 }
 
-                reactiveExecutor.schedule(callback);
+                AsyncCallback cb = callback;
+                taskFactory.release(this);
+                reactiveExecutor.schedule(cb);
             }
         }
     }
@@ -142,7 +158,7 @@ public class Pipeline extends AsyncProcessorSupport 
implements Navigate<Processo
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         // create task which has state used during routing
-        PipelineTask task = new PipelineTask(exchange, callback);
+        PooledExchangeTask task = taskFactory.acquire(exchange, callback);
 
         if (exchange.isTransacted()) {
             reactiveExecutor.scheduleSync(task);
@@ -154,22 +170,47 @@ public class Pipeline extends AsyncProcessorSupport 
implements Navigate<Processo
 
     @Override
     protected void doBuild() throws Exception {
-        ServiceHelper.buildService(processors);
+        boolean pooled = 
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled();
+        if (pooled) {
+            taskFactory = new PooledTaskFactory() {
+                @Override
+                public PooledExchangeTask create(Exchange exchange, 
AsyncCallback callback) {
+                    return new PipelineTask();
+                }
+            };
+            int capacity = 
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity();
+            taskFactory.setCapacity(capacity);
+        } else {
+            taskFactory = new PrototypeTaskFactory() {
+                @Override
+                public PooledExchangeTask create(Exchange exchange, 
AsyncCallback callback) {
+                    return new PipelineTask();
+                }
+            };
+        }
+        LOG.trace("Using TaskFactory: {}", taskFactory);
+
+        ServiceHelper.buildService(taskFactory, processors);
     }
 
     @Override
     protected void doInit() throws Exception {
-        ServiceHelper.initService(processors);
+        ServiceHelper.initService(taskFactory, processors);
     }
 
     @Override
     protected void doStart() throws Exception {
-        ServiceHelper.startService(processors);
+        ServiceHelper.startService(taskFactory, processors);
     }
 
     @Override
     protected void doStop() throws Exception {
-        ServiceHelper.stopService(processors);
+        ServiceHelper.stopService(taskFactory, processors);
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        ServiceHelper.stopAndShutdownServices(taskFactory, processors);
     }
 
     @Override
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTask.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTask.java
new file mode 100644
index 0000000..d4e0226
--- /dev/null
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTask.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+
+/**
+ * A task that EIPs and internal routing engine uses to store state when 
processing an {@link Exchange}.
+ *
+ * @see org.apache.camel.processor.PooledExchangeTaskFactory
+ */
+public interface PooledExchangeTask extends Runnable {
+
+    /**
+     * Prepares the task for the given exchange and its callback
+     *
+     * @param exchange the exchange
+     * @param callback the callback
+     */
+    void prepare(Exchange exchange, AsyncCallback callback);
+
+    /**
+     * Resets the task after its done and can be reused for another exchange.
+     */
+    void reset();
+}
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTaskFactory.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTaskFactory.java
new file mode 100644
index 0000000..b90e9f5
--- /dev/null
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTaskFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.PooledObjectFactory;
+
+/**
+ * Factory to create {@link PooledExchangeTask}.
+ *
+ * @see PooledExchangeTask
+ */
+public interface PooledExchangeTaskFactory extends 
PooledObjectFactory<PooledExchangeTask> {
+
+    /**
+     * Creates a new task to use for processing the exchange.
+     *
+     * @param  exchange the current exchange
+     * @param  callback the callback for the exchange
+     * @return          the task
+     */
+    PooledExchangeTask create(Exchange exchange, AsyncCallback callback);
+
+    /**
+     * Attempts to acquire a pooled task to use for processing the exchange, 
if not possible then a new task is created.
+     *
+     * @param  exchange the current exchange
+     * @param  callback the callback for the exchange
+     * @return          the task
+     */
+    PooledExchangeTask acquire(Exchange exchange, AsyncCallback callback);
+
+    /**
+     * Releases the task after its done being used
+     *
+     * @param  task the task
+     * @return      true if the task was released, and false if the task 
failed to be released or no space in pool, and
+     *              the task was discarded.
+     */
+    boolean release(PooledExchangeTask task);
+}
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java
new file mode 100644
index 0000000..3775032
--- /dev/null
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.PooledObjectFactorySupport;
+
+public abstract class PooledTaskFactory extends 
PooledObjectFactorySupport<PooledExchangeTask>
+        implements PooledExchangeTaskFactory {
+
+    @Override
+    public PooledExchangeTask acquire() {
+        return pool.poll();
+    }
+
+    public PooledExchangeTask acquire(Exchange exchange, AsyncCallback 
callback) {
+        PooledExchangeTask task = acquire();
+        if (task == null) {
+            if (statistics.isStatisticsEnabled()) {
+                statistics.created.increment();
+            }
+            task = create(exchange, callback);
+        } else {
+            if (statistics.isStatisticsEnabled()) {
+                statistics.acquired.increment();
+            }
+        }
+        task.prepare(exchange, callback);
+        return task;
+    }
+
+    @Override
+    public boolean release(PooledExchangeTask task) {
+        try {
+            task.reset();
+            boolean inserted = pool.offer(task);
+            if (statistics.isStatisticsEnabled()) {
+                if (inserted) {
+                    statistics.released.increment();
+                } else {
+                    statistics.discarded.increment();
+                }
+            }
+            return inserted;
+        } catch (Throwable e) {
+            if (statistics.isStatisticsEnabled()) {
+                statistics.discarded.increment();
+            }
+            return false;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "PooledTaskFactory[capacity: " + getCapacity() + "]";
+    }
+}
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PrototypeTaskFactory.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PrototypeTaskFactory.java
new file mode 100644
index 0000000..bd113d5
--- /dev/null
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PrototypeTaskFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.PrototypeObjectFactorySupport;
+
+public abstract class PrototypeTaskFactory extends 
PrototypeObjectFactorySupport<PooledExchangeTask>
+        implements PooledExchangeTaskFactory {
+
+    @Override
+    public PooledExchangeTask acquire(Exchange exchange, AsyncCallback 
callback) {
+        PooledExchangeTask task = create(exchange, callback);
+        task.prepare(exchange, callback);
+        return task;
+    }
+
+    @Override
+    public PooledExchangeTask acquire() {
+        throw new UnsupportedOperationException("Not in use");
+    }
+
+    @Override
+    public boolean release(PooledExchangeTask task) {
+        // not pooled so no need to reset task
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "PrototypeTaskFactory";
+    }
+}
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 3c0eaf3..ad0ac3f 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -38,6 +38,10 @@ import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.processor.PooledExchangeTask;
+import org.apache.camel.processor.PooledExchangeTaskFactory;
+import org.apache.camel.processor.PooledTaskFactory;
+import org.apache.camel.processor.PrototypeTaskFactory;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.CamelLogger;
 import org.apache.camel.spi.ErrorHandlerRedeliveryCustomizer;
@@ -69,6 +73,9 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport
 
     private static final Logger LOG = 
LoggerFactory.getLogger(RedeliveryErrorHandler.class);
 
+    // factory
+    protected PooledExchangeTaskFactory taskFactory;
+
     // state
     protected final AtomicInteger redeliverySleepCounter = new AtomicInteger();
     protected ScheduledExecutorService executorService;
@@ -169,12 +176,8 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport
     @Override
     public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
         // Create the redelivery task object for this exchange (optimize to 
only create task can do redelivery or not)
-        Runnable task;
-        if (simpleTask) {
-            task = new SimpleTask(exchange, callback);
-        } else {
-            task = new RedeliveryTask(exchange, callback);
-        }
+        Runnable task = taskFactory.acquire(exchange, callback);
+
         // Run it
         if (exchange.isTransacted()) {
             reactiveExecutor.scheduleSync(task);
@@ -345,14 +348,18 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport
     /**
      * Simple task to perform calling the processor with no redelivery support
      */
-    protected class SimpleTask implements Runnable, AsyncCallback {
-        private final ExtendedExchange exchange;
-        private final AsyncCallback callback;
-        private boolean first = true;
+    protected class SimpleTask implements PooledExchangeTask, Runnable, 
AsyncCallback {
+        private ExtendedExchange exchange;
+        private AsyncCallback callback;
+        private boolean first;
 
-        SimpleTask(Exchange exchange, AsyncCallback callback) {
+        public SimpleTask() {
+        }
+
+        public void prepare(Exchange exchange, AsyncCallback callback) {
             this.exchange = (ExtendedExchange) exchange;
             this.callback = callback;
+            this.first = true;
         }
 
         @Override
@@ -360,6 +367,12 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport
             return "SimpleTask";
         }
 
+        public void reset() {
+            this.exchange = null;
+            this.callback = null;
+            this.first = true;
+        }
+
         @Override
         public void done(boolean doneSync) {
             // the run method decides what to do when we are done
@@ -385,7 +398,9 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport
                 if (exchange.getException() == null) {
                     exchange.setException(new RejectedExecutionException());
                 }
-                callback.done(false);
+                AsyncCallback cb = callback;
+                taskFactory.release(this);
+                cb.done(false);
                 return;
             }
             if (exchange.isInterrupted()) {
@@ -396,7 +411,9 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport
                 }
                 exchange.setRouteStop(true);
                 // we should not continue routing so call callback
-                callback.done(false);
+                AsyncCallback cb = callback;
+                taskFactory.release(this);
+                cb.done(false);
                 return;
             }
 
@@ -413,14 +430,18 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport
                 onExceptionOccurred();
                 prepareExchangeAfterFailure(exchange);
                 // we do not support redelivery so continue callback
-                reactiveExecutor.schedule(callback);
+                AsyncCallback cb = callback;
+                taskFactory.release(this);
+                reactiveExecutor.schedule(cb);
             } else if (first) {
                 // first time call the target processor
                 first = false;
                 outputAsync.process(exchange, this);
             } else {
                 // we are done so continue callback
-                reactiveExecutor.schedule(callback);
+                AsyncCallback cb = callback;
+                taskFactory.release(this);
+                reactiveExecutor.schedule(cb);
             }
         }
 
@@ -585,15 +606,16 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport
     /**
      * Task to perform calling the processor and handling redelivery if it 
fails (more advanced than ProcessTask)
      */
-    protected class RedeliveryTask implements Runnable {
-        private final Exchange original;
-        private final ExtendedExchange exchange;
-        private final AsyncCallback callback;
+    protected class RedeliveryTask implements PooledExchangeTask, Runnable {
+        // state
+        private Exchange original;
+        private ExtendedExchange exchange;
+        private AsyncCallback callback;
         private int redeliveryCounter;
         private long redeliveryDelay;
-        private Predicate retryWhilePredicate;
 
         // default behavior which can be overloaded on a per exception basis
+        private Predicate retryWhilePredicate;
         private RedeliveryPolicy currentRedeliveryPolicy;
         private Processor failureProcessor;
         private Processor onRedeliveryProcessor;
@@ -603,7 +625,16 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport
         private boolean useOriginalInMessage;
         private boolean useOriginalInBody;
 
-        public RedeliveryTask(Exchange exchange, AsyncCallback callback) {
+        public RedeliveryTask() {
+        }
+
+        @Override
+        public String toString() {
+            return "RedeliveryTask";
+        }
+
+        @Override
+        public void prepare(Exchange exchange, AsyncCallback callback) {
             this.retryWhilePredicate = retryWhilePolicy;
             this.currentRedeliveryPolicy = redeliveryPolicy;
             this.handledPredicate = getDefaultHandledPredicate();
@@ -611,7 +642,6 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport
             this.useOriginalInBody = useOriginalBodyPolicy;
             this.onRedeliveryProcessor = redeliveryProcessor;
             this.onExceptionProcessor = 
RedeliveryErrorHandler.this.onExceptionProcessor;
-
             // do a defensive copy of the original Exchange, which is needed 
for redelivery so we can ensure the
             // original Exchange is being redelivered, and not a mutated 
Exchange
             this.original = redeliveryEnabled ? 
defensiveCopyExchangeIfNeeded(exchange) : null;
@@ -620,8 +650,20 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport
         }
 
         @Override
-        public String toString() {
-            return "RedeliveryTask";
+        public void reset() {
+            this.retryWhilePredicate = null;
+            this.currentRedeliveryPolicy = null;
+            this.handledPredicate = null;
+            this.continuedPredicate = null;
+            this.useOriginalInMessage = false;
+            this.useOriginalInBody = false;
+            this.onRedeliveryProcessor = null;
+            this.onExceptionProcessor = null;
+            this.original = null;
+            this.exchange = null;
+            this.callback = null;
+            this.redeliveryCounter = 0;
+            this.redeliveryDelay = 0;
         }
 
         /**
@@ -635,14 +677,17 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport
                 if (exchange.getException() == null) {
                     exchange.setException(new RejectedExecutionException());
                 }
-                callback.done(false);
+                AsyncCallback cb = callback;
+                taskFactory.release(this);
+                cb.done(false);
                 return;
             }
 
             try {
                 doRun();
             } catch (Throwable e) {
-                // unexpected exception during running so break out
+                // unexpected exception during running so set exception and 
trigger callback
+                // (do not do taskFactory.release as that happens later)
                 exchange.setException(e);
                 callback.done(false);
             }
@@ -804,7 +849,9 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport
                 // only process if the exchange hasn't failed
                 // and it has not been handled by the error processor
                 if (isDone(exchange)) {
-                    reactiveExecutor.schedule(callback);
+                    AsyncCallback cb = callback;
+                    taskFactory.release(this);
+                    reactiveExecutor.schedule(cb);
                 } else {
                     // error occurred so loop back around which we do by 
invoking the processAsyncErrorHandler
                     reactiveExecutor.schedule(this);
@@ -1043,6 +1090,7 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport
             // because you can continue and still let the failure processor do 
some routing
             // before continue in the main route.
             boolean allowFailureProcessor = !shouldContinue || 
!isDeadLetterChannel;
+            final boolean fHandled = handled;
 
             if (allowFailureProcessor && processor != null) {
 
@@ -1108,6 +1156,23 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport
                     } finally {
                         // if the fault was handled asynchronously, this 
should be reflected in the callback as well
                         reactiveExecutor.schedule(callback);
+
+                        // create log message
+                        String msg = "Failed delivery for " + 
ExchangeHelper.logIds(exchange);
+                        msg = msg + ". Exhausted after delivery attempt: " + 
redeliveryCounter + " caught: " + caught;
+                        if (processor != null) {
+                            if (isDeadLetterChannel && deadLetterUri != null) {
+                                msg = msg + ". Handled by DeadLetterChannel: 
[" + URISupport.sanitizeUri(deadLetterUri) + "]";
+                            } else {
+                                msg = msg + ". Processed by failure processor: 
" + processor;
+                            }
+                        }
+
+                        // log that we failed delivery as we are exhausted
+                        logFailedDelivery(false, false, fHandled, false, 
isDeadLetterChannel, exchange, msg, null);
+
+                        // we are done so we can release the task
+                        taskFactory.release(this);
                     }
                 });
             } else {
@@ -1127,22 +1192,25 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport
                 } finally {
                     // callback we are done
                     reactiveExecutor.schedule(callback);
-                }
-            }
 
-            // create log message
-            String msg = "Failed delivery for " + 
ExchangeHelper.logIds(exchange);
-            msg = msg + ". Exhausted after delivery attempt: " + 
redeliveryCounter + " caught: " + caught;
-            if (processor != null) {
-                if (isDeadLetterChannel && deadLetterUri != null) {
-                    msg = msg + ". Handled by DeadLetterChannel: [" + 
URISupport.sanitizeUri(deadLetterUri) + "]";
-                } else {
-                    msg = msg + ". Processed by failure processor: " + 
processor;
+                    // create log message
+                    String msg = "Failed delivery for " + 
ExchangeHelper.logIds(exchange);
+                    msg = msg + ". Exhausted after delivery attempt: " + 
redeliveryCounter + " caught: " + caught;
+                    if (processor != null) {
+                        if (isDeadLetterChannel && deadLetterUri != null) {
+                            msg = msg + ". Handled by DeadLetterChannel: [" + 
URISupport.sanitizeUri(deadLetterUri) + "]";
+                        } else {
+                            msg = msg + ". Processed by failure processor: " + 
processor;
+                        }
+                    }
+
+                    // log that we failed delivery as we are exhausted
+                    logFailedDelivery(false, false, fHandled, false, 
isDeadLetterChannel, exchange, msg, null);
+
+                    // we are done so we can release the task
+                    taskFactory.release(this);
                 }
             }
-
-            // log that we failed delivery as we are exhausted
-            logFailedDelivery(false, false, handled, false, 
isDeadLetterChannel, exchange, msg, null);
         }
 
         protected void prepareExchangeAfterFailure(
@@ -1503,8 +1571,6 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport
 
     @Override
     protected void doStart() throws Exception {
-        ServiceHelper.startService(output, outputAsync, deadLetter);
-
         // determine if redeliver is enabled or not
         redeliveryEnabled = determineIfRedeliveryIsEnabled();
         if (LOG.isTraceEnabled()) {
@@ -1531,6 +1597,28 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport
         // however if we dont then its less memory overhead (and a bit less 
cpu) of using the simple task
         simpleTask = deadLetter == null && !redeliveryEnabled && 
(exceptionPolicies == null || exceptionPolicies.isEmpty())
                 && onPrepareProcessor == null;
+
+        boolean pooled = 
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled();
+        if (pooled) {
+            taskFactory = new PooledTaskFactory() {
+                @Override
+                public PooledExchangeTask create(Exchange exchange, 
AsyncCallback callback) {
+                    return simpleTask ? new SimpleTask() : new 
RedeliveryTask();
+                }
+            };
+            int capacity = 
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity();
+            taskFactory.setCapacity(capacity);
+        } else {
+            taskFactory = new PrototypeTaskFactory() {
+                @Override
+                public PooledExchangeTask create(Exchange exchange, 
AsyncCallback callback) {
+                    return simpleTask ? new SimpleTask() : new 
RedeliveryTask();
+                }
+            };
+        }
+        LOG.trace("Using TaskFactory: {}", taskFactory);
+
+        ServiceHelper.startService(taskFactory, output, outputAsync, 
deadLetter);
     }
 
     @Override
@@ -1542,6 +1630,7 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport
 
     @Override
     protected void doShutdown() throws Exception {
-        ServiceHelper.stopAndShutdownServices(deadLetter, output, outputAsync);
+        ServiceHelper.stopAndShutdownServices(deadLetter, output, outputAsync, 
taskFactory);
     }
+
 }
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java
new file mode 100644
index 0000000..64867b3
--- /dev/null
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.PooledObjectFactory;
+import org.apache.camel.support.service.ServiceSupport;
+
+public abstract class PooledObjectFactorySupport<T> extends ServiceSupport 
implements PooledObjectFactory<T> {
+
+    protected final UtilizationStatistics statistics = new 
UtilizationStatistics();
+
+    protected CamelContext camelContext;
+    protected BlockingQueue<T> pool;
+    protected int capacity = 100;
+
+    @Override
+    protected void doBuild() throws Exception {
+        super.doBuild();
+        this.pool = new ArrayBlockingQueue<>(capacity);
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public boolean isStatisticsEnabled() {
+        return statistics.isStatisticsEnabled();
+    }
+
+    @Override
+    public void setStatisticsEnabled(boolean statisticsEnabled) {
+        statistics.setStatisticsEnabled(statisticsEnabled);
+    }
+
+    @Override
+    public int getSize() {
+        if (pool != null) {
+            return pool.size();
+        } else {
+            return 0;
+        }
+    }
+
+    @Override
+    public int getCapacity() {
+        return capacity;
+    }
+
+    @Override
+    public void setCapacity(int capacity) {
+        this.capacity = capacity;
+    }
+
+    @Override
+    public void resetStatistics() {
+        statistics.reset();
+    }
+
+    @Override
+    public void purge() {
+        pool.clear();
+    }
+
+    @Override
+    public Statistics getStatistics() {
+        return statistics;
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        super.doShutdown();
+        statistics.reset();
+        pool.clear();
+    }
+
+    /**
+     * Represents utilization statistics
+     */
+    protected final class UtilizationStatistics implements 
PooledObjectFactory.Statistics {
+
+        public final LongAdder created = new LongAdder();
+        public final LongAdder acquired = new LongAdder();
+        public final LongAdder released = new LongAdder();
+        public final LongAdder discarded = new LongAdder();
+        private boolean statisticsEnabled;
+
+        @Override
+        public void reset() {
+            created.reset();
+            acquired.reset();
+            released.reset();
+            discarded.reset();
+        }
+
+        @Override
+        public long getCreatedCounter() {
+            return created.longValue();
+        }
+
+        @Override
+        public long getAcquiredCounter() {
+            return acquired.longValue();
+        }
+
+        @Override
+        public long getReleasedCounter() {
+            return released.longValue();
+        }
+
+        @Override
+        public long getDiscardedCounter() {
+            return discarded.longValue();
+        }
+
+        @Override
+        public boolean isStatisticsEnabled() {
+            return statisticsEnabled;
+        }
+
+        @Override
+        public void setStatisticsEnabled(boolean statisticsEnabled) {
+            this.statisticsEnabled = statisticsEnabled;
+        }
+    }
+
+}
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java
new file mode 100644
index 0000000..8c038e4
--- /dev/null
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support;
+
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.PooledObjectFactory;
+import org.apache.camel.support.service.ServiceSupport;
+
+/**
+ * {@link org.apache.camel.spi.PooledObjectFactory} that creates a new 
instance (does not pool).
+ */
+public abstract class PrototypeObjectFactorySupport<T> extends ServiceSupport 
implements PooledObjectFactory<T> {
+
+    protected final UtilizationStatistics statistics = new 
UtilizationStatistics();
+    private CamelContext camelContext;
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public boolean isStatisticsEnabled() {
+        return statistics.isStatisticsEnabled();
+    }
+
+    @Override
+    public void setStatisticsEnabled(boolean statisticsEnabled) {
+        statistics.setStatisticsEnabled(statisticsEnabled);
+    }
+
+    @Override
+    public int getSize() {
+        return 0;
+    }
+
+    @Override
+    public int getCapacity() {
+        return 0;
+    }
+
+    @Override
+    public void setCapacity(int capacity) {
+        // not in use
+    }
+
+    @Override
+    public void resetStatistics() {
+        statistics.reset();
+    }
+
+    @Override
+    public void purge() {
+        // not in use
+    }
+
+    @Override
+    public Statistics getStatistics() {
+        return statistics;
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        super.doShutdown();
+        statistics.reset();
+    }
+
+    /**
+     * Represents utilization statistics
+     */
+    protected final class UtilizationStatistics implements Statistics {
+
+        public final LongAdder created = new LongAdder();
+        public final LongAdder acquired = new LongAdder();
+        public final LongAdder released = new LongAdder();
+        public final LongAdder discarded = new LongAdder();
+        private boolean statisticsEnabled;
+
+        @Override
+        public void reset() {
+            created.reset();
+            acquired.reset();
+            released.reset();
+            discarded.reset();
+        }
+
+        @Override
+        public long getCreatedCounter() {
+            return created.longValue();
+        }
+
+        @Override
+        public long getAcquiredCounter() {
+            return acquired.longValue();
+        }
+
+        @Override
+        public long getReleasedCounter() {
+            return released.longValue();
+        }
+
+        @Override
+        public long getDiscardedCounter() {
+            return discarded.longValue();
+        }
+
+        @Override
+        public boolean isStatisticsEnabled() {
+            return statisticsEnabled;
+        }
+
+        @Override
+        public void setStatisticsEnabled(boolean statisticsEnabled) {
+            this.statisticsEnabled = statisticsEnabled;
+        }
+    }
+
+}

Reply via email to