This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/exchange-factory by this push:
     new 8b92a7f  CAMEL-16222: PooledExchangeFactory experiment
8b92a7f is described below

commit 8b92a7f340a335c2968fea9179eab386d22c8b75
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Thu Feb 18 14:59:18 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../camel/catalog/components/vertx-http.json       |  2 +
 .../camel/component/dataset/DataSetConsumer.java   | 30 ++++++++++---
 .../apache/camel/component/file/FileConsumer.java  |  9 ++++
 .../camel/component/file/GenericFileConsumer.java  |  8 +++-
 .../component/file/remote/RemoteFileConsumer.java  |  9 ++++
 .../apache/camel/component/http/HttpProducer.java  |  2 +-
 .../component/scheduler/SchedulerConsumer.java     | 19 ++++++++-
 .../camel/component/timer/TimerConsumer.java       | 14 +++++--
 .../src/main/java/org/apache/camel/Consumer.java   | 17 ++++++++
 .../src/main/java/org/apache/camel/Endpoint.java   |  7 ++++
 .../java/org/apache/camel/ExtendedExchange.java    |  2 +
 .../java/org/apache/camel/spi/ExchangeFactory.java | 12 +++++-
 .../main/java/org/apache/camel/spi/UnitOfWork.java |  7 ++++
 .../camel/impl/engine/DefaultExchangeFactory.java  |  5 ++-
 .../camel/impl/engine/DefaultUnitOfWork.java       | 12 ++++++
 .../camel/impl/engine/PooledExchangeFactory.java   | 49 +++++++++++++++-------
 .../org/apache/camel/builder/ExchangeBuilder.java  |  4 +-
 .../apache/camel/processor/WireTapProcessor.java   |  7 +---
 .../component/dataset/DataSetTestEndpointTest.java | 12 +++++-
 .../org/apache/camel/support/DefaultConsumer.java  | 18 ++++++++
 .../org/apache/camel/support/DefaultEndpoint.java  | 23 ++++------
 .../org/apache/camel/support/DefaultExchange.java  | 16 +++++--
 .../support/DefaultInterceptSendToEndpoint.java    |  5 +++
 .../camel/support/PollingConsumerSupport.java      | 14 +++++--
 24 files changed, 244 insertions(+), 59 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/vertx-http.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/vertx-http.json
index 852e081..069eb8c 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/vertx-http.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/vertx-http.json
@@ -23,6 +23,7 @@
   },
   "componentProperties": {
     "lazyStartProducer": { "kind": "property", "displayName": "Lazy Start 
Producer", "group": "producer", "label": "producer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Whether the producer 
should be started lazy (on the first message). By starting lazy you can use 
this to allow CamelContext and routes to startup in situations where a producer 
may otherwise fail during star [...]
+    "responsePayloadAsByteArray": { "kind": "property", "displayName": 
"Response Payload As Byte Array", "group": "producer", "label": "producer", 
"required": false, "type": "boolean", "javaType": "boolean", "deprecated": 
false, "autowired": false, "secret": false, "defaultValue": true, 
"description": "Whether the response body should be byte or as 
io.vertx.core.buffer.Buffer" },
     "allowJavaSerializedObject": { "kind": "property", "displayName": "Allow 
Java Serialized Object", "group": "advanced", "label": "advanced", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, "description": 
"Whether to allow java serialization when a request has the Content-Type 
application\/x-java-serialized-object This is disabled by default. If you 
enable this, be aware that Java will deseria [...]
     "autowiredEnabled": { "kind": "property", "displayName": "Autowired 
Enabled", "group": "advanced", "label": "advanced", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": true, "description": "Whether autowiring is 
enabled. This is used for automatic autowiring options (the option must be 
marked as autowired) by looking up in the registry to find if there is a single 
instance of matching type, which t [...]
     "vertx": { "kind": "property", "displayName": "Vertx", "group": 
"advanced", "label": "advanced", "required": false, "type": "object", 
"javaType": "io.vertx.core.Vertx", "deprecated": false, "autowired": false, 
"secret": false, "description": "To use an existing vertx instead of creating a 
new instance" },
@@ -48,6 +49,7 @@
     "httpMethod": { "kind": "parameter", "displayName": "Http Method", 
"group": "producer", "label": "producer", "required": false, "type": "object", 
"javaType": "io.vertx.core.http.HttpMethod", "enum": [ "OPTIONS", "GET", 
"HEAD", "POST", "PUT", "DELETE", "TRACE", "CONNECT", "PATCH", "OTHER" ], 
"deprecated": false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.vertx.http.VertxHttpConfiguration", 
"configurationField": "configuration", "description" [...]
     "lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start 
Producer", "group": "producer", "label": "producer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Whether the producer 
should be started lazy (on the first message). By starting lazy you can use 
this to allow CamelContext and routes to startup in situations where a producer 
may otherwise fail during sta [...]
     "okStatusCodeRange": { "kind": "parameter", "displayName": "Ok Status Code 
Range", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "200-299", "configurationClass": 
"org.apache.camel.component.vertx.http.VertxHttpConfiguration", 
"configurationField": "configuration", "description": "The status codes which 
are considered a success response. The values [...]
+    "responsePayloadAsByteArray": { "kind": "parameter", "displayName": 
"Response Payload As Byte Array", "group": "producer", "label": "producer", 
"required": false, "type": "boolean", "javaType": "boolean", "deprecated": 
false, "autowired": false, "secret": false, "defaultValue": true, 
"configurationClass": 
"org.apache.camel.component.vertx.http.VertxHttpConfiguration", 
"configurationField": "configuration", "description": "Whether the response 
body should be byte or as io.vertx.core.b [...]
     "sessionManagement": { "kind": "parameter", "displayName": "Session 
Management", "group": "producer", "label": "producer", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.vertx.http.VertxHttpConfiguration", 
"configurationField": "configuration", "description": "Enables session 
management via WebClientSession. By default the client is configur [...]
     "throwExceptionOnFailure": { "kind": "parameter", "displayName": "Throw 
Exception On Failure", "group": "producer", "label": "producer", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": true, 
"configurationClass": 
"org.apache.camel.component.vertx.http.VertxHttpConfiguration", 
"configurationField": "configuration", "description": "Disable throwing 
HttpOperationFailedException in case of failed respo [...]
     "timeout": { "kind": "parameter", "displayName": "Timeout", "group": 
"producer", "label": "producer", "required": false, "type": "integer", 
"javaType": "long", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": -1, "configurationClass": 
"org.apache.camel.component.vertx.http.VertxHttpConfiguration", 
"configurationField": "configuration", "description": "The amount of time in 
milliseconds after which if the request does not return any data within the 
timeout per [...]
diff --git 
a/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
 
b/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
index 43fbb4b..81fd019 100644
--- 
a/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
+++ 
b/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
@@ -20,6 +20,7 @@ import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.CamelLogger;
 import org.apache.camel.support.DefaultConsumer;
@@ -86,10 +87,27 @@ public class DataSetConsumer extends DefaultConsumer {
         }
     }
 
+    /**
+     * Creates a message exchange for the given index in the {@link DataSet}
+     */
+    protected Exchange createExchange(long messageIndex) throws Exception {
+        Exchange exchange = createExchange(false);
+
+        endpoint.getDataSet().populateMessage(exchange, messageIndex);
+
+        if (!endpoint.getDataSetIndex().equals("off")) {
+            Message in = exchange.getIn();
+            in.setHeader(Exchange.DATASET_INDEX, messageIndex);
+        }
+
+        return exchange;
+    }
+
     protected void sendMessages(long startIndex, long endIndex) {
-        try {
-            for (long i = startIndex; i < endIndex; i++) {
-                Exchange exchange = endpoint.createExchange(i);
+        for (long i = startIndex; i < endIndex; i++) {
+            Exchange exchange = null;
+            try {
+                exchange = createExchange(i);
                 getProcessor().process(exchange);
 
                 try {
@@ -104,9 +122,11 @@ public class DataSetConsumer extends DefaultConsumer {
                 if (reporter != null) {
                     reporter.process(exchange);
                 }
+            } catch (Exception e) {
+                handleException(e);
+            } finally {
+                releaseExchange(exchange);
             }
-        } catch (Exception e) {
-            handleException(e);
         }
     }
 
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
index 7b9ee71..50087fb 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
@@ -57,6 +57,15 @@ public class FileConsumer extends GenericFileConsumer<File> {
     }
 
     @Override
+    protected Exchange createExchange(GenericFile<File> file) {
+        Exchange exchange = createExchange(true);
+        if (file != null) {
+            file.bindToExchange(exchange, getEndpoint().isProbeContentType());
+        }
+        return exchange;
+    }
+
+    @Override
     protected boolean pollDirectory(String fileName, List<GenericFile<File>> 
fileList, int depth) {
         LOG.trace("pollDirectory from fileName: {}", fileName);
 
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
index 3e6a7b4..7aa46b3 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
@@ -102,6 +102,11 @@ public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsum
     }
 
     /**
+     * Creates the exchange from the polled file
+     */
+    protected abstract Exchange createExchange(GenericFile<T> file);
+
+    /**
      * Poll for files
      */
     @Override
@@ -164,7 +169,7 @@ public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsum
         // use a linked list so we can dequeue the exchanges
         LinkedList<Exchange> exchanges = new LinkedList<>();
         for (GenericFile<T> file : files) {
-            Exchange exchange = endpoint.createExchange(file);
+            Exchange exchange = createExchange(file);
             endpoint.configureExchange(exchange);
             endpoint.configureMessage(file, exchange.getIn());
             exchanges.add(exchange);
@@ -267,6 +272,7 @@ public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsum
             GenericFile<?> file = 
exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE, GenericFile.class);
             String key = file.getAbsoluteFilePath();
             endpoint.getInProgressRepository().remove(key);
+            releaseExchange(exchange);
         }
     }
 
diff --git 
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
 
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
index d428e24..83e2292 100644
--- 
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
+++ 
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
@@ -59,6 +59,15 @@ public abstract class RemoteFileConsumer<T> extends 
GenericFileConsumer<T> {
     }
 
     @Override
+    protected Exchange createExchange(GenericFile<T> file) {
+        Exchange answer = createExchange(true);
+        if (file != null) {
+            file.bindToExchange(answer);
+        }
+        return answer;
+    }
+
+    @Override
     protected boolean prePollCheck() throws Exception {
         if (LOG.isTraceEnabled()) {
             LOG.trace("prePollCheck on {}", 
getEndpoint().getConfiguration().remoteServerInformation());
diff --git 
a/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
 
b/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
index 41d5205..af521da 100644
--- 
a/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
+++ 
b/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
@@ -623,7 +623,7 @@ public class HttpProducer extends DefaultProducer {
         try {
             if (body == null) {
                 return null;
-            // special optimized for using these 3 type converters for common 
message payload types
+                // special optimized for using these 3 type converters for 
common message payload types
             } else if (body instanceof byte[]) {
                 answer = HttpEntityConverter.toHttpEntity((byte[]) body, 
exchange);
             } else if (body instanceof InputStream) {
diff --git 
a/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerConsumer.java
 
b/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerConsumer.java
index 240e673..04ab2b2 100644
--- 
a/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerConsumer.java
+++ 
b/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerConsumer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.scheduler;
 
 import java.util.Date;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
@@ -44,7 +45,7 @@ public class SchedulerConsumer extends ScheduledPollConsumer {
     }
 
     protected int sendTimerExchange() {
-        final Exchange exchange = getEndpoint().createExchange();
+        final Exchange exchange = createExchange(false);
         exchange.setProperty(Exchange.TIMER_NAME, getEndpoint().getName());
 
         Date now = new Date();
@@ -55,15 +56,28 @@ public class SchedulerConsumer extends 
ScheduledPollConsumer {
         }
 
         if (!getEndpoint().isSynchronous()) {
-            getAsyncProcessor().process(exchange, new AsyncCallback() {
+            final AtomicBoolean polled = new AtomicBoolean(true);
+            boolean doneSync = getAsyncProcessor().process(exchange, new 
AsyncCallback() {
                 @Override
                 public void done(boolean doneSync) {
                     // handle any thrown exception
                     if (exchange.getException() != null) {
                         getExceptionHandler().handleException("Error 
processing exchange", exchange, exchange.getException());
                     }
+                    boolean wasPolled = 
exchange.getProperty(Exchange.SCHEDULER_POLLED_MESSAGES, true, boolean.class);
+                    if (!wasPolled) {
+                        polled.set(false);
+                    }
+
+                    // sync wil release outside this callback
+                    if (!doneSync) {
+                        releaseExchange(exchange);
+                    }
                 }
             });
+            if (!doneSync) {
+                return polled.get() ? 1 : 0;
+            }
         } else {
             try {
                 getProcessor().process(exchange);
@@ -81,6 +95,7 @@ public class SchedulerConsumer extends ScheduledPollConsumer {
         // for example to overrule and indicate no message was polled, which 
can affect the scheduler
         // to leverage backoff on idle etc.
         boolean polled = 
exchange.getProperty(Exchange.SCHEDULER_POLLED_MESSAGES, true, boolean.class);
+        releaseExchange(exchange);
         return polled ? 1 : 0;
     }
 
diff --git 
a/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
 
b/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
index 8cc80c4..40b44c8 100644
--- 
a/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
+++ 
b/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
@@ -185,7 +185,7 @@ public class TimerConsumer extends DefaultConsumer 
implements StartupListener, S
     }
 
     protected void sendTimerExchange(long counter) {
-        final Exchange exchange = endpoint.createExchange();
+        final Exchange exchange = createExchange(false);
 
         if (endpoint.isIncludeMetadata()) {
             exchange.setProperty(Exchange.TIMER_COUNTER, counter);
@@ -211,6 +211,10 @@ public class TimerConsumer extends DefaultConsumer 
implements StartupListener, S
                     if (exchange.getException() != null) {
                         getExceptionHandler().handleException("Error 
processing exchange", exchange, exchange.getException());
                     }
+                    // sync wil release outside this callback
+                    if (!doneSync) {
+                        releaseExchange(exchange);
+                    }
                 }
             });
         } else {
@@ -221,8 +225,12 @@ public class TimerConsumer extends DefaultConsumer 
implements StartupListener, S
             }
 
             // handle any thrown exception
-            if (exchange.getException() != null) {
-                getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
+            try {
+                if (exchange.getException() != null) {
+                    getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
+                }
+            } finally {
+                releaseExchange(exchange);
             }
         }
     }
diff --git a/core/camel-api/src/main/java/org/apache/camel/Consumer.java 
b/core/camel-api/src/main/java/org/apache/camel/Consumer.java
index 0f3ec2e..2360be5 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Consumer.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Consumer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel;
 
+import org.apache.camel.spi.UnitOfWork;
+
 /**
  * A consumer of message exchanges from an {@link Endpoint}.
  * <p/>
@@ -25,6 +27,21 @@ package org.apache.camel;
  */
 public interface Consumer extends Service, EndpointAware {
 
+    /**
+     * The processor that will process the {@link Exchange} that was consumed.
+     */
     Processor getProcessor();
 
+    /**
+     * Creates an {@link Exchange} that was consumed.
+     *
+     * @param autoRelease whether to auto release the exchange when routing is 
complete via {@link UnitOfWork}
+     */
+    Exchange createExchange(boolean autoRelease);
+
+    /**
+     * Releases the {@link Exchange} when its completed processing and no 
longer needed.
+     */
+    void releaseExchange(Exchange exchange);
+
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/Endpoint.java 
b/core/camel-api/src/main/java/org/apache/camel/Endpoint.java
index 10bfd5e..cacdf51 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Endpoint.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Endpoint.java
@@ -79,6 +79,13 @@ public interface Endpoint extends IsSingleton, Service {
     Exchange createExchange(ExchangePattern pattern);
 
     /**
+     * Configures a newly created {@link Exchange}.
+     *
+     * @param exchange the new exchange
+     */
+    void configureExchange(Exchange exchange);
+
+    /**
      * Returns the context which created the endpoint
      *
      * @return the context which created the endpoint
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java 
b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
index c2a1ffd..578a3ee 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
@@ -30,6 +30,8 @@ public interface ExtendedExchange extends Exchange {
 
     /**
      * Clears the exchange from user data so it may be reused.
+     * <p/>
+     * <b>Important:</b> This API is NOT intended for Camel end users, but 
used internally by Camel itself.
      */
     void reset();
 
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java 
b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
index f1eefd0..95586c4 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
@@ -27,6 +27,11 @@ import org.apache.camel.Exchange;
  */
 public interface ExchangeFactory {
 
+    // TODO: new factory per consumer so there is no single race bottleneck
+    // TODO: only use factory on route consumer to limit its scope to most 
significant impact
+    // TODO: release from extended exchange without onCompletion (overhead)
+    // TODO: reuse unit of work (expensive to create)
+
     /**
      * Service factory key.
      */
@@ -34,15 +39,18 @@ public interface ExchangeFactory {
 
     /**
      * Gets a new {@link Exchange}
+     *
+     * @param autoRelease whether to auto release the exchange when routing is 
complete via {@link UnitOfWork}
      */
-    Exchange create();
+    Exchange create(boolean autoRelease);
 
     /**
      * Gets a new {@link Exchange}
      *
+     * @param autoRelease  whether to auto release the exchange when routing 
is complete via {@link UnitOfWork}
      * @param fromEndpoint the from endpoint
      */
-    Exchange create(Endpoint fromEndpoint);
+    Exchange create(Endpoint fromEndpoint, boolean autoRelease);
 
     default void release(Exchange exchange) {
         // noop
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java 
b/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java
index 062f22d..20284fa 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java
@@ -41,6 +41,13 @@ public interface UnitOfWork extends Service {
     String MDC_TRANSACTION_KEY = "camel.transactionKey";
 
     /**
+     * Clears the unit of work from user data so it may be reused.
+     * <p/>
+     * <b>Important:</b> This API is NOT intended for Camel end users, but 
used internally by Camel itself.
+     */
+    void reset();
+
+    /**
      * Adds a synchronization hook
      *
      * @param synchronization the hook
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
index fedae54..ec17772 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
@@ -41,12 +41,13 @@ public class DefaultExchangeFactory implements 
ExchangeFactory, CamelContextAwar
     }
 
     @Override
-    public Exchange create() {
+    public Exchange create(boolean autoRelease) {
         return new DefaultExchange(camelContext);
     }
 
     @Override
-    public Exchange create(Endpoint fromEndpoint) {
+    public Exchange create(Endpoint fromEndpoint, boolean autoRelease) {
         return new DefaultExchange(fromEndpoint);
     }
+
 }
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
index f8576b8..975b8a0 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
@@ -132,6 +132,18 @@ public class DefaultUnitOfWork implements UnitOfWork, 
Service {
     }
 
     @Override
+    public void reset() {
+        routes.clear();
+        if (synchronizations != null) {
+            synchronizations.clear();
+        }
+        originalInMessage = null;
+        if (transactedBy != null) {
+            transactedBy.clear();
+        }
+    }
+
+    @Override
     public void setParentUnitOfWork(UnitOfWork parentUnitOfWork) {
     }
 
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index 88142c9..30094cd 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -48,6 +48,7 @@ public class PooledExchangeFactory extends ServiceSupport
     private final AtomicLong acquired = new AtomicLong();
     private final AtomicLong created = new AtomicLong();
     private final AtomicLong released = new AtomicLong();
+    private final AtomicLong discarded = new AtomicLong();
 
     private CamelContext camelContext;
     private boolean statisticsEnabled = true;
@@ -71,7 +72,7 @@ public class PooledExchangeFactory extends ServiceSupport
     }
 
     @Override
-    public Exchange create() {
+    public Exchange create(boolean autoRelease) {
         Exchange exchange = pool.poll();
         if (exchange == null) {
             if (statisticsEnabled) {
@@ -83,17 +84,16 @@ public class PooledExchangeFactory extends ServiceSupport
             if (statisticsEnabled) {
                 acquired.incrementAndGet();
             }
-            // reset exchange before we use it
-            ExtendedExchange ee = exchange.adapt(ExtendedExchange.class);
-            ee.reset();
         }
-        // add on completion which will return the exchange when done
-        exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
+        if (autoRelease) {
+            // add on completion which will return the exchange when done
+            
exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
+        }
         return exchange;
     }
 
     @Override
-    public Exchange create(Endpoint fromEndpoint) {
+    public Exchange create(Endpoint fromEndpoint, boolean autoRelease) {
         Exchange exchange = pool.poll();
         if (exchange == null) {
             if (statisticsEnabled) {
@@ -108,17 +108,33 @@ public class PooledExchangeFactory extends ServiceSupport
             // need to mark this exchange from the given endpoint
             
exchange.adapt(ExtendedExchange.class).setFromEndpoint(fromEndpoint);
         }
-        // add on completion which will return the exchange when done
-        exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
+        if (autoRelease) {
+            // add on completion which will return the exchange when done
+            
exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
+        }
         return exchange;
     }
 
     @Override
     public void release(Exchange exchange) {
-        if (statisticsEnabled) {
-            released.incrementAndGet();
+        // reset exchange before returning to pool
+        try {
+            // TODO: reset on pool as this then update created to be up-to-date
+            ExtendedExchange ee = exchange.adapt(ExtendedExchange.class);
+            ee.reset();
+
+            // only release back in pool if reset was success
+            if (statisticsEnabled) {
+                released.incrementAndGet();
+            }
+            pool.offer(exchange);
+        } catch (Exception e) {
+            if (statisticsEnabled) {
+                discarded.incrementAndGet();
+            }
+            // ignore
+            LOG.debug("Error resetting exchange: {}. This exchange is 
discarded.", exchange);
         }
-        pool.offer(exchange);
     }
 
     @Override
@@ -126,13 +142,14 @@ public class PooledExchangeFactory extends ServiceSupport
         pool.clear();
 
         if (statisticsEnabled) {
-            LOG.info("PooledExchangeFactory usage [created: {}, acquired: {}, 
released: {}]", created.get(), acquired.get(),
-                    released.get());
+            LOG.info("PooledExchangeFactory usage [created: {}, acquired: {}, 
released: {}, discarded: {}]",
+                    created.get(), acquired.get(), released.get(), 
discarded.get());
         }
 
         created.set(0);
         acquired.set(0);
         released.set(0);
+        discarded.set(0);
     }
 
     private final class ReleaseOnCompletion extends SynchronizationAdapter {
@@ -145,7 +162,9 @@ public class PooledExchangeFactory extends ServiceSupport
 
         @Override
         public void onDone(Exchange exchange) {
-            release(exchange);
+            if (exchange != null) {
+                release(exchange);
+            }
         }
     }
 
diff --git 
a/core/camel-core-model/src/main/java/org/apache/camel/builder/ExchangeBuilder.java
 
b/core/camel-core-model/src/main/java/org/apache/camel/builder/ExchangeBuilder.java
index 1088cdc..8594bfa 100644
--- 
a/core/camel-core-model/src/main/java/org/apache/camel/builder/ExchangeBuilder.java
+++ 
b/core/camel-core-model/src/main/java/org/apache/camel/builder/ExchangeBuilder.java
@@ -22,8 +22,8 @@ import java.util.Map;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
-import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Message;
+import org.apache.camel.support.DefaultExchange;
 
 /**
  * Builder to create {@link Exchange} and add headers and set body on the 
Exchange {@link Message}.
@@ -103,7 +103,7 @@ public final class ExchangeBuilder {
      * @return exchange
      */
     public Exchange build() {
-        Exchange exchange = 
context.adapt(ExtendedCamelContext.class).getExchangeFactory().create();
+        Exchange exchange = new DefaultExchange(context);
         Message message = exchange.getIn();
         message.setBody(body);
         if (headers.size() > 0) {
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index 4ce6bb5..9ca24e6 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -29,7 +29,6 @@ import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Expression;
-import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
@@ -42,6 +41,7 @@ import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.spi.ShutdownAware;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorSupport;
+import org.apache.camel.support.DefaultExchange;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -257,10 +257,7 @@ public class WireTapProcessor extends AsyncProcessorSupport
     }
 
     private Exchange configureNewExchange(Exchange exchange) {
-        Exchange answer
-                = 
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().create(exchange.getFromEndpoint());
-        answer.setPattern(ExchangePattern.InOnly);
-        return answer;
+        return new DefaultExchange(exchange.getFromEndpoint(), 
ExchangePattern.InOnly);
     }
 
     public List<Processor> getNewExchangeProcessors() {
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTestEndpointTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTestEndpointTest.java
index bd09c43..0050d05 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTestEndpointTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTestEndpointTest.java
@@ -81,9 +81,19 @@ public class DataSetTestEndpointTest extends 
ContextTestSupport {
                 }
 
                 @Override
+                public Exchange createExchange(boolean autoRelease) {
+                    return new DefaultExchange(getEndpoint());
+                }
+
+                @Override
+                public void releaseExchange(Exchange exchange) {
+                    // noop
+                }
+
+                @Override
                 public void start() {
                     // when starting then send a message to the processor
-                    Exchange exchange = new DefaultExchange(getEndpoint());
+                    Exchange exchange = createExchange(false);
                     exchange.getIn().setBody(expectedBody);
                     try {
                         processor.process(exchange);
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
index 71d4265..c8e594f 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
@@ -26,6 +26,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.RouteAware;
 import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.spi.ExchangeFactory;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.service.ServiceHelper;
@@ -45,6 +46,7 @@ public class DefaultConsumer extends ServiceSupport 
implements Consumer, RouteAw
     private final Endpoint endpoint;
     private final Processor processor;
     private final AsyncProcessor asyncProcessor;
+    private final ExchangeFactory exchangeFactory;
     private ExceptionHandler exceptionHandler;
     private Route route;
     private String routeId;
@@ -54,6 +56,7 @@ public class DefaultConsumer extends ServiceSupport 
implements Consumer, RouteAw
         this.processor = processor;
         this.asyncProcessor = AsyncProcessorConverterHelper.convert(processor);
         this.exceptionHandler = new 
LoggingExceptionHandler(endpoint.getCamelContext(), getClass());
+        this.exchangeFactory = 
endpoint.getCamelContext().adapt(ExtendedCamelContext.class).getExchangeFactory();
     }
 
     @Override
@@ -121,6 +124,21 @@ public class DefaultConsumer extends ServiceSupport 
implements Consumer, RouteAw
     }
 
     @Override
+    public Exchange createExchange(boolean autoRelease) {
+        Exchange answer = exchangeFactory.create(getEndpoint(), autoRelease);
+        endpoint.configureExchange(answer);
+        answer.adapt(ExtendedExchange.class).setFromRouteId(routeId);
+        return answer;
+    }
+
+    @Override
+    public void releaseExchange(Exchange exchange) {
+        if (exchange != null) {
+            exchangeFactory.release(exchange);
+        }
+    }
+
+    @Override
     public Endpoint getEndpoint() {
         return endpoint;
     }
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java
index aa777e9..70d9aaf 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java
@@ -27,10 +27,8 @@ import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
-import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.spi.ExceptionHandler;
-import org.apache.camel.spi.ExchangeFactory;
 import org.apache.camel.spi.HasId;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.PropertyConfigurer;
@@ -56,7 +54,6 @@ public abstract class DefaultEndpoint extends ServiceSupport 
implements Endpoint
     private volatile String endpointUri;
     private CamelContext camelContext;
     private Component component;
-    private ExchangeFactory exchangeFactory;
 
     @Metadata(label = "advanced", defaultValue = "true",
               description = "Whether autowiring is enabled. This is used for 
automatic autowiring options (the option must be marked as autowired)"
@@ -101,9 +98,6 @@ public abstract class DefaultEndpoint extends ServiceSupport 
implements Endpoint
         this.setEndpointUri(endpointUri);
         if (component != null) {
             this.camelContext = component.getCamelContext();
-            if (this.camelContext != null) {
-                this.exchangeFactory = 
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory();
-            }
         }
     }
 
@@ -234,14 +228,19 @@ public abstract class DefaultEndpoint extends 
ServiceSupport implements Endpoint
 
     @Override
     public Exchange createExchange() {
-        return createExchange(getExchangePattern());
+        return createExchange(exchangePattern);
     }
 
     @Override
     public Exchange createExchange(ExchangePattern pattern) {
-        Exchange exchange = exchangeFactory.create(this);
-        exchange.setPattern(pattern);
-        return exchange;
+        Exchange answer = new DefaultExchange(this, pattern);
+        configureExchange(answer);
+        return answer;
+    }
+
+    @Override
+    public void configureExchange(Exchange exchange) {
+        // noop
     }
 
     /**
@@ -490,10 +489,6 @@ public abstract class DefaultEndpoint extends 
ServiceSupport implements Endpoint
     protected void doInit() throws Exception {
         ObjectHelper.notNull(getCamelContext(), "camelContext");
 
-        if (exchangeFactory == null) {
-            exchangeFactory = 
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory();
-        }
-
         if (autowiredEnabled && getComponent() != null) {
             PropertyConfigurer configurer = 
getComponent().getEndpointPropertyConfigurer();
             if (configurer instanceof PropertyConfigurerGetter) {
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
index fca25fa..5ff30c9 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
@@ -53,6 +53,7 @@ public final class DefaultExchange implements 
ExtendedExchange {
     private Exception exception;
     private String exchangeId;
     private UnitOfWork unitOfWork;
+    private ExchangePattern originalPattern;
     private ExchangePattern pattern;
     private Endpoint fromEndpoint;
     private String fromRouteId;
@@ -74,12 +75,14 @@ public final class DefaultExchange implements 
ExtendedExchange {
         this.context = context;
         this.pattern = ExchangePattern.InOnly;
         this.created = System.currentTimeMillis();
+        this.originalPattern = this.pattern;
     }
 
     public DefaultExchange(CamelContext context, ExchangePattern pattern) {
         this.context = context;
         this.pattern = pattern;
         this.created = System.currentTimeMillis();
+        this.originalPattern = this.pattern;
     }
 
     public DefaultExchange(Exchange parent) {
@@ -89,6 +92,7 @@ public final class DefaultExchange implements 
ExtendedExchange {
         this.fromEndpoint = parent.getFromEndpoint();
         this.fromRouteId = parent.getFromRouteId();
         this.unitOfWork = parent.getUnitOfWork();
+        this.originalPattern = this.pattern;
     }
 
     public DefaultExchange(Endpoint fromEndpoint) {
@@ -96,6 +100,7 @@ public final class DefaultExchange implements 
ExtendedExchange {
         this.pattern = ExchangePattern.InOnly;
         this.created = System.currentTimeMillis();
         this.fromEndpoint = fromEndpoint;
+        this.originalPattern = this.pattern;
     }
 
     public DefaultExchange(Endpoint fromEndpoint, ExchangePattern pattern) {
@@ -103,6 +108,7 @@ public final class DefaultExchange implements 
ExtendedExchange {
         this.pattern = pattern;
         this.created = System.currentTimeMillis();
         this.fromEndpoint = fromEndpoint;
+        this.originalPattern = this.pattern;
     }
 
     @Override
@@ -118,13 +124,17 @@ public final class DefaultExchange implements 
ExtendedExchange {
     public void reset() {
         this.properties.clear();
         this.exchangeId = null;
+        // TODO: This is reset time
         this.created = System.currentTimeMillis();
+        this.in = null;
         this.out = null;
         this.exception = null;
         this.unitOfWork = null;
-        this.pattern = null;
-        this.fromEndpoint = null;
-        this.fromRouteId = null;
+        // reset pattern to original
+        this.pattern = originalPattern;
+        // do not reset endpoint as it would be the same consumer/endpoint 
again
+        // this.fromEndpoint = null;
+        // this.fromRouteId = null;
         if (this.onCompletions != null) {
             this.onCompletions.clear();
         }
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java
index 74725c9..99692e0 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java
@@ -113,6 +113,11 @@ public class DefaultInterceptSendToEndpoint implements 
InterceptSendToEndpoint,
     }
 
     @Override
+    public void configureExchange(Exchange exchange) {
+        delegate.configureExchange(exchange);
+    }
+
+    @Override
     public CamelContext getCamelContext() {
         return delegate.getCamelContext();
     }
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java
index be1759c..ca4930f 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java
@@ -16,9 +16,7 @@
  */
 package org.apache.camel.support;
 
-import org.apache.camel.Endpoint;
-import org.apache.camel.PollingConsumer;
-import org.apache.camel.Processor;
+import org.apache.camel.*;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.support.service.ServiceSupport;
 
@@ -50,6 +48,16 @@ public abstract class PollingConsumerSupport extends 
ServiceSupport implements P
         return null;
     }
 
+    @Override
+    public Exchange createExchange(boolean autoRelease) {
+        throw new UnsupportedOperationException("Not supported on 
PollingConsumer");
+    }
+
+    @Override
+    public void releaseExchange(Exchange exchange) {
+        throw new UnsupportedOperationException("Not supported on 
PollingConsumer");
+    }
+
     public ExceptionHandler getExceptionHandler() {
         return exceptionHandler;
     }

Reply via email to