This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/exchange-factory by this push:
     new a842a54  CAMEL-16222: PooledExchangeFactory experiment
a842a54 is described below

commit a842a540b169dcebb9bc7059e49c1c8faf7bc670
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Fri Feb 19 15:52:29 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../camel/component/dataset/DataSetConsumer.java   |  2 +-
 .../camel/component/file/GenericFileConsumer.java  |  2 +-
 .../component/scheduler/SchedulerConsumer.java     | 32 ++++++++++------------
 .../camel/component/timer/TimerConsumer.java       | 22 ++++++---------
 .../src/main/java/org/apache/camel/Consumer.java   |  4 ++-
 .../camel/impl/engine/PooledExchangeFactory.java   |  7 +----
 .../component/dataset/DataSetTestEndpointTest.java |  2 +-
 .../org/apache/camel/support/DefaultConsumer.java  |  8 +++++-
 .../camel/support/PollingConsumerSupport.java      |  2 +-
 9 files changed, 38 insertions(+), 43 deletions(-)

diff --git 
a/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
 
b/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
index 81fd019..84ef2cf 100644
--- 
a/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
+++ 
b/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
@@ -125,7 +125,7 @@ public class DataSetConsumer extends DefaultConsumer {
             } catch (Exception e) {
                 handleException(e);
             } finally {
-                releaseExchange(exchange);
+                releaseExchange(exchange, false);
             }
         }
     }
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
index 7aa46b3..51b53a0 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
@@ -272,7 +272,7 @@ public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsum
             GenericFile<?> file = 
exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE, GenericFile.class);
             String key = file.getAbsoluteFilePath();
             endpoint.getInProgressRepository().remove(key);
-            releaseExchange(exchange);
+            releaseExchange(exchange, true);
         }
     }
 
diff --git 
a/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerConsumer.java
 
b/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerConsumer.java
index 04ab2b2..3ddbf44 100644
--- 
a/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerConsumer.java
+++ 
b/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerConsumer.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.scheduler;
 import java.util.Date;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.support.ScheduledPollConsumer;
@@ -57,22 +56,19 @@ public class SchedulerConsumer extends 
ScheduledPollConsumer {
 
         if (!getEndpoint().isSynchronous()) {
             final AtomicBoolean polled = new AtomicBoolean(true);
-            boolean doneSync = getAsyncProcessor().process(exchange, new 
AsyncCallback() {
-                @Override
-                public void done(boolean doneSync) {
-                    // handle any thrown exception
-                    if (exchange.getException() != null) {
-                        getExceptionHandler().handleException("Error 
processing exchange", exchange, exchange.getException());
-                    }
-                    boolean wasPolled = 
exchange.getProperty(Exchange.SCHEDULER_POLLED_MESSAGES, true, boolean.class);
-                    if (!wasPolled) {
-                        polled.set(false);
-                    }
-
-                    // sync wil release outside this callback
-                    if (!doneSync) {
-                        releaseExchange(exchange);
-                    }
+            boolean doneSync = getAsyncProcessor().process(exchange, 
cbDoneSync -> {
+                // handle any thrown exception
+                if (exchange.getException() != null) {
+                    getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
+                }
+                boolean wasPolled = 
exchange.getProperty(Exchange.SCHEDULER_POLLED_MESSAGES, true, boolean.class);
+                if (!wasPolled) {
+                    polled.set(false);
+                }
+
+                // sync wil release outside this callback
+                if (!cbDoneSync) {
+                    releaseExchange(exchange, false);
                 }
             });
             if (!doneSync) {
@@ -95,7 +91,7 @@ public class SchedulerConsumer extends ScheduledPollConsumer {
         // for example to overrule and indicate no message was polled, which 
can affect the scheduler
         // to leverage backoff on idle etc.
         boolean polled = 
exchange.getProperty(Exchange.SCHEDULER_POLLED_MESSAGES, true, boolean.class);
-        releaseExchange(exchange);
+        releaseExchange(exchange, false);
         return polled ? 1 : 0;
     }
 
diff --git 
a/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
 
b/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
index 40b44c8..996e228 100644
--- 
a/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
+++ 
b/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
@@ -22,7 +22,6 @@ import java.util.TimerTask;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -204,17 +203,14 @@ public class TimerConsumer extends DefaultConsumer 
implements StartupListener, S
         }
 
         if (!endpoint.isSynchronous()) {
-            getAsyncProcessor().process(exchange, new AsyncCallback() {
-                @Override
-                public void done(boolean doneSync) {
-                    // handle any thrown exception
-                    if (exchange.getException() != null) {
-                        getExceptionHandler().handleException("Error 
processing exchange", exchange, exchange.getException());
-                    }
-                    // sync wil release outside this callback
-                    if (!doneSync) {
-                        releaseExchange(exchange);
-                    }
+            getAsyncProcessor().process(exchange, cbDoneSync -> {
+                // handle any thrown exception
+                if (exchange.getException() != null) {
+                    getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
+                }
+                // sync wil release outside this callback
+                if (!cbDoneSync) {
+                    releaseExchange(exchange, false);
                 }
             });
         } else {
@@ -230,7 +226,7 @@ public class TimerConsumer extends DefaultConsumer 
implements StartupListener, S
                     getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
                 }
             } finally {
-                releaseExchange(exchange);
+                releaseExchange(exchange, false);
             }
         }
     }
diff --git a/core/camel-api/src/main/java/org/apache/camel/Consumer.java 
b/core/camel-api/src/main/java/org/apache/camel/Consumer.java
index 2360be5..011208c 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Consumer.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Consumer.java
@@ -41,7 +41,9 @@ public interface Consumer extends Service, EndpointAware {
 
     /**
      * Releases the {@link Exchange} when its completed processing and no 
longer needed.
+     *
+     * @param autoRelease whether the exchange was created with auto release
      */
-    void releaseExchange(Exchange exchange);
+    void releaseExchange(Exchange exchange, boolean autoRelease);
 
 }
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index 5e4b285..e4fe5e0 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -103,9 +103,6 @@ public class PooledExchangeFactory extends ServiceSupport
             if (autoRelease) {
                 // the consumer will either always be in auto release mode or 
not, so its safe to initialize the task only once when the exchange is created
                 answer.onDone(this::release);
-            } else {
-                // we need to signal true
-                answer.onDone(e -> true);
             }
             return answer;
         } else {
@@ -131,9 +128,6 @@ public class PooledExchangeFactory extends ServiceSupport
             if (autoRelease) {
                 // the consumer will either always be in auto release mode or 
not, so its safe to initialize the task only once when the exchange is created
                 answer.onDone(this::release);
-            } else {
-                // we need to signal true
-                answer.onDone(e -> true);
             }
             return answer;
         } else {
@@ -153,6 +147,7 @@ public class PooledExchangeFactory extends ServiceSupport
         try {
             ExtendedExchange ee = exchange.adapt(ExtendedExchange.class);
             ee.done();
+            ee.onDone(null);
 
             // only release back in pool if reset was success
             if (statisticsEnabled) {
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTestEndpointTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTestEndpointTest.java
index 0050d05..682dd11 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTestEndpointTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTestEndpointTest.java
@@ -86,7 +86,7 @@ public class DataSetTestEndpointTest extends 
ContextTestSupport {
                 }
 
                 @Override
-                public void releaseExchange(Exchange exchange) {
+                public void releaseExchange(Exchange exchange, boolean 
autoRelease) {
                     // noop
                 }
 
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
index 83c568c..378f015 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
@@ -133,8 +133,14 @@ public class DefaultConsumer extends ServiceSupport 
implements Consumer, RouteAw
     }
 
     @Override
-    public void releaseExchange(Exchange exchange) {
+    public void releaseExchange(Exchange exchange, boolean autoRelease) {
         if (exchange != null) {
+            if (!autoRelease) {
+                // we must manually done the exchange
+                // TODO: hack
+                exchange.adapt(ExtendedExchange.class).onDone(e -> true);
+                exchange.adapt(ExtendedExchange.class).done();
+            }
             exchangeFactory.release(exchange);
         }
     }
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java
index ca4930f..a0ae0de 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java
@@ -54,7 +54,7 @@ public abstract class PollingConsumerSupport extends 
ServiceSupport implements P
     }
 
     @Override
-    public void releaseExchange(Exchange exchange) {
+    public void releaseExchange(Exchange exchange, boolean autoRelease) {
         throw new UnsupportedOperationException("Not supported on 
PollingConsumer");
     }
 

Reply via email to