This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/exchange-factory by this push: new a842a54 CAMEL-16222: PooledExchangeFactory experiment a842a54 is described below commit a842a540b169dcebb9bc7059e49c1c8faf7bc670 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Fri Feb 19 15:52:29 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../camel/component/dataset/DataSetConsumer.java | 2 +- .../camel/component/file/GenericFileConsumer.java | 2 +- .../component/scheduler/SchedulerConsumer.java | 32 ++++++++++------------ .../camel/component/timer/TimerConsumer.java | 22 ++++++--------- .../src/main/java/org/apache/camel/Consumer.java | 4 ++- .../camel/impl/engine/PooledExchangeFactory.java | 7 +---- .../component/dataset/DataSetTestEndpointTest.java | 2 +- .../org/apache/camel/support/DefaultConsumer.java | 8 +++++- .../camel/support/PollingConsumerSupport.java | 2 +- 9 files changed, 38 insertions(+), 43 deletions(-) diff --git a/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java b/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java index 81fd019..84ef2cf 100644 --- a/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java +++ b/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java @@ -125,7 +125,7 @@ public class DataSetConsumer extends DefaultConsumer { } catch (Exception e) { handleException(e); } finally { - releaseExchange(exchange); + releaseExchange(exchange, false); } } } diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java index 7aa46b3..51b53a0 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java @@ -272,7 +272,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum GenericFile<?> file = exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE, GenericFile.class); String key = file.getAbsoluteFilePath(); endpoint.getInProgressRepository().remove(key); - releaseExchange(exchange); + releaseExchange(exchange, true); } } diff --git a/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerConsumer.java b/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerConsumer.java index 04ab2b2..3ddbf44 100644 --- a/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerConsumer.java +++ b/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerConsumer.java @@ -19,7 +19,6 @@ package org.apache.camel.component.scheduler; import java.util.Date; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.support.ScheduledPollConsumer; @@ -57,22 +56,19 @@ public class SchedulerConsumer extends ScheduledPollConsumer { if (!getEndpoint().isSynchronous()) { final AtomicBoolean polled = new AtomicBoolean(true); - boolean doneSync = getAsyncProcessor().process(exchange, new AsyncCallback() { - @Override - public void done(boolean doneSync) { - // handle any thrown exception - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); - } - boolean wasPolled = exchange.getProperty(Exchange.SCHEDULER_POLLED_MESSAGES, true, boolean.class); - if (!wasPolled) { - polled.set(false); - } - - // sync wil release outside this callback - if (!doneSync) { - releaseExchange(exchange); - } + boolean doneSync = getAsyncProcessor().process(exchange, cbDoneSync -> { + // handle any thrown exception + if (exchange.getException() != null) { + getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); + } + boolean wasPolled = exchange.getProperty(Exchange.SCHEDULER_POLLED_MESSAGES, true, boolean.class); + if (!wasPolled) { + polled.set(false); + } + + // sync wil release outside this callback + if (!cbDoneSync) { + releaseExchange(exchange, false); } }); if (!doneSync) { @@ -95,7 +91,7 @@ public class SchedulerConsumer extends ScheduledPollConsumer { // for example to overrule and indicate no message was polled, which can affect the scheduler // to leverage backoff on idle etc. boolean polled = exchange.getProperty(Exchange.SCHEDULER_POLLED_MESSAGES, true, boolean.class); - releaseExchange(exchange); + releaseExchange(exchange, false); return polled ? 1 : 0; } diff --git a/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java b/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java index 40b44c8..996e228 100644 --- a/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java +++ b/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java @@ -22,7 +22,6 @@ import java.util.TimerTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; -import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -204,17 +203,14 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener, S } if (!endpoint.isSynchronous()) { - getAsyncProcessor().process(exchange, new AsyncCallback() { - @Override - public void done(boolean doneSync) { - // handle any thrown exception - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); - } - // sync wil release outside this callback - if (!doneSync) { - releaseExchange(exchange); - } + getAsyncProcessor().process(exchange, cbDoneSync -> { + // handle any thrown exception + if (exchange.getException() != null) { + getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); + } + // sync wil release outside this callback + if (!cbDoneSync) { + releaseExchange(exchange, false); } }); } else { @@ -230,7 +226,7 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener, S getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); } } finally { - releaseExchange(exchange); + releaseExchange(exchange, false); } } } diff --git a/core/camel-api/src/main/java/org/apache/camel/Consumer.java b/core/camel-api/src/main/java/org/apache/camel/Consumer.java index 2360be5..011208c 100644 --- a/core/camel-api/src/main/java/org/apache/camel/Consumer.java +++ b/core/camel-api/src/main/java/org/apache/camel/Consumer.java @@ -41,7 +41,9 @@ public interface Consumer extends Service, EndpointAware { /** * Releases the {@link Exchange} when its completed processing and no longer needed. + * + * @param autoRelease whether the exchange was created with auto release */ - void releaseExchange(Exchange exchange); + void releaseExchange(Exchange exchange, boolean autoRelease); } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java index 5e4b285..e4fe5e0 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java @@ -103,9 +103,6 @@ public class PooledExchangeFactory extends ServiceSupport if (autoRelease) { // the consumer will either always be in auto release mode or not, so its safe to initialize the task only once when the exchange is created answer.onDone(this::release); - } else { - // we need to signal true - answer.onDone(e -> true); } return answer; } else { @@ -131,9 +128,6 @@ public class PooledExchangeFactory extends ServiceSupport if (autoRelease) { // the consumer will either always be in auto release mode or not, so its safe to initialize the task only once when the exchange is created answer.onDone(this::release); - } else { - // we need to signal true - answer.onDone(e -> true); } return answer; } else { @@ -153,6 +147,7 @@ public class PooledExchangeFactory extends ServiceSupport try { ExtendedExchange ee = exchange.adapt(ExtendedExchange.class); ee.done(); + ee.onDone(null); // only release back in pool if reset was success if (statisticsEnabled) { diff --git a/core/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTestEndpointTest.java b/core/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTestEndpointTest.java index 0050d05..682dd11 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTestEndpointTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTestEndpointTest.java @@ -86,7 +86,7 @@ public class DataSetTestEndpointTest extends ContextTestSupport { } @Override - public void releaseExchange(Exchange exchange) { + public void releaseExchange(Exchange exchange, boolean autoRelease) { // noop } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java index 83c568c..378f015 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java @@ -133,8 +133,14 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw } @Override - public void releaseExchange(Exchange exchange) { + public void releaseExchange(Exchange exchange, boolean autoRelease) { if (exchange != null) { + if (!autoRelease) { + // we must manually done the exchange + // TODO: hack + exchange.adapt(ExtendedExchange.class).onDone(e -> true); + exchange.adapt(ExtendedExchange.class).done(); + } exchangeFactory.release(exchange); } } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java b/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java index ca4930f..a0ae0de 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java @@ -54,7 +54,7 @@ public abstract class PollingConsumerSupport extends ServiceSupport implements P } @Override - public void releaseExchange(Exchange exchange) { + public void releaseExchange(Exchange exchange, boolean autoRelease) { throw new UnsupportedOperationException("Not supported on PollingConsumer"); }