This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/exchange-factory by this push: new 8b92a7f CAMEL-16222: PooledExchangeFactory experiment 8b92a7f is described below commit 8b92a7f340a335c2968fea9179eab386d22c8b75 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Feb 18 14:59:18 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../camel/catalog/components/vertx-http.json | 2 + .../camel/component/dataset/DataSetConsumer.java | 30 ++++++++++--- .../apache/camel/component/file/FileConsumer.java | 9 ++++ .../camel/component/file/GenericFileConsumer.java | 8 +++- .../component/file/remote/RemoteFileConsumer.java | 9 ++++ .../apache/camel/component/http/HttpProducer.java | 2 +- .../component/scheduler/SchedulerConsumer.java | 19 ++++++++- .../camel/component/timer/TimerConsumer.java | 14 +++++-- .../src/main/java/org/apache/camel/Consumer.java | 17 ++++++++ .../src/main/java/org/apache/camel/Endpoint.java | 7 ++++ .../java/org/apache/camel/ExtendedExchange.java | 2 + .../java/org/apache/camel/spi/ExchangeFactory.java | 12 +++++- .../main/java/org/apache/camel/spi/UnitOfWork.java | 7 ++++ .../camel/impl/engine/DefaultExchangeFactory.java | 5 ++- .../camel/impl/engine/DefaultUnitOfWork.java | 12 ++++++ .../camel/impl/engine/PooledExchangeFactory.java | 49 +++++++++++++++------- .../org/apache/camel/builder/ExchangeBuilder.java | 4 +- .../apache/camel/processor/WireTapProcessor.java | 7 +--- .../component/dataset/DataSetTestEndpointTest.java | 12 +++++- .../org/apache/camel/support/DefaultConsumer.java | 18 ++++++++ .../org/apache/camel/support/DefaultEndpoint.java | 23 ++++------ .../org/apache/camel/support/DefaultExchange.java | 16 +++++-- .../support/DefaultInterceptSendToEndpoint.java | 5 +++ .../camel/support/PollingConsumerSupport.java | 14 +++++-- 24 files changed, 244 insertions(+), 59 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/vertx-http.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/vertx-http.json index 852e081..069eb8c 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/vertx-http.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/vertx-http.json @@ -23,6 +23,7 @@ }, "componentProperties": { "lazyStartProducer": { "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during star [...] + "responsePayloadAsByteArray": { "kind": "property", "displayName": "Response Payload As Byte Array", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether the response body should be byte or as io.vertx.core.buffer.Buffer" }, "allowJavaSerializedObject": { "kind": "property", "displayName": "Allow Java Serialized Object", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to allow java serialization when a request has the Content-Type application\/x-java-serialized-object This is disabled by default. If you enable this, be aware that Java will deseria [...] "autowiredEnabled": { "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which t [...] "vertx": { "kind": "property", "displayName": "Vertx", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.vertx.core.Vertx", "deprecated": false, "autowired": false, "secret": false, "description": "To use an existing vertx instead of creating a new instance" }, @@ -48,6 +49,7 @@ "httpMethod": { "kind": "parameter", "displayName": "Http Method", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "io.vertx.core.http.HttpMethod", "enum": [ "OPTIONS", "GET", "HEAD", "POST", "PUT", "DELETE", "TRACE", "CONNECT", "PATCH", "OTHER" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.vertx.http.VertxHttpConfiguration", "configurationField": "configuration", "description" [...] "lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during sta [...] "okStatusCodeRange": { "kind": "parameter", "displayName": "Ok Status Code Range", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "200-299", "configurationClass": "org.apache.camel.component.vertx.http.VertxHttpConfiguration", "configurationField": "configuration", "description": "The status codes which are considered a success response. The values [...] + "responsePayloadAsByteArray": { "kind": "parameter", "displayName": "Response Payload As Byte Array", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.vertx.http.VertxHttpConfiguration", "configurationField": "configuration", "description": "Whether the response body should be byte or as io.vertx.core.b [...] "sessionManagement": { "kind": "parameter", "displayName": "Session Management", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.vertx.http.VertxHttpConfiguration", "configurationField": "configuration", "description": "Enables session management via WebClientSession. By default the client is configur [...] "throwExceptionOnFailure": { "kind": "parameter", "displayName": "Throw Exception On Failure", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.vertx.http.VertxHttpConfiguration", "configurationField": "configuration", "description": "Disable throwing HttpOperationFailedException in case of failed respo [...] "timeout": { "kind": "parameter", "displayName": "Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": -1, "configurationClass": "org.apache.camel.component.vertx.http.VertxHttpConfiguration", "configurationField": "configuration", "description": "The amount of time in milliseconds after which if the request does not return any data within the timeout per [...] diff --git a/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java b/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java index 43fbb4b..81fd019 100644 --- a/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java +++ b/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java @@ -20,6 +20,7 @@ import java.util.concurrent.ExecutorService; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; +import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.spi.CamelLogger; import org.apache.camel.support.DefaultConsumer; @@ -86,10 +87,27 @@ public class DataSetConsumer extends DefaultConsumer { } } + /** + * Creates a message exchange for the given index in the {@link DataSet} + */ + protected Exchange createExchange(long messageIndex) throws Exception { + Exchange exchange = createExchange(false); + + endpoint.getDataSet().populateMessage(exchange, messageIndex); + + if (!endpoint.getDataSetIndex().equals("off")) { + Message in = exchange.getIn(); + in.setHeader(Exchange.DATASET_INDEX, messageIndex); + } + + return exchange; + } + protected void sendMessages(long startIndex, long endIndex) { - try { - for (long i = startIndex; i < endIndex; i++) { - Exchange exchange = endpoint.createExchange(i); + for (long i = startIndex; i < endIndex; i++) { + Exchange exchange = null; + try { + exchange = createExchange(i); getProcessor().process(exchange); try { @@ -104,9 +122,11 @@ public class DataSetConsumer extends DefaultConsumer { if (reporter != null) { reporter.process(exchange); } + } catch (Exception e) { + handleException(e); + } finally { + releaseExchange(exchange); } - } catch (Exception e) { - handleException(e); } } diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java index 7b9ee71..50087fb 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java @@ -57,6 +57,15 @@ public class FileConsumer extends GenericFileConsumer<File> { } @Override + protected Exchange createExchange(GenericFile<File> file) { + Exchange exchange = createExchange(true); + if (file != null) { + file.bindToExchange(exchange, getEndpoint().isProbeContentType()); + } + return exchange; + } + + @Override protected boolean pollDirectory(String fileName, List<GenericFile<File>> fileList, int depth) { LOG.trace("pollDirectory from fileName: {}", fileName); diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java index 3e6a7b4..7aa46b3 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java @@ -102,6 +102,11 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum } /** + * Creates the exchange from the polled file + */ + protected abstract Exchange createExchange(GenericFile<T> file); + + /** * Poll for files */ @Override @@ -164,7 +169,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum // use a linked list so we can dequeue the exchanges LinkedList<Exchange> exchanges = new LinkedList<>(); for (GenericFile<T> file : files) { - Exchange exchange = endpoint.createExchange(file); + Exchange exchange = createExchange(file); endpoint.configureExchange(exchange); endpoint.configureMessage(file, exchange.getIn()); exchanges.add(exchange); @@ -267,6 +272,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum GenericFile<?> file = exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE, GenericFile.class); String key = file.getAbsoluteFilePath(); endpoint.getInProgressRepository().remove(key); + releaseExchange(exchange); } } diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java index d428e24..83e2292 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java @@ -59,6 +59,15 @@ public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T> { } @Override + protected Exchange createExchange(GenericFile<T> file) { + Exchange answer = createExchange(true); + if (file != null) { + file.bindToExchange(answer); + } + return answer; + } + + @Override protected boolean prePollCheck() throws Exception { if (LOG.isTraceEnabled()) { LOG.trace("prePollCheck on {}", getEndpoint().getConfiguration().remoteServerInformation()); diff --git a/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java b/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java index 41d5205..af521da 100644 --- a/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java +++ b/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java @@ -623,7 +623,7 @@ public class HttpProducer extends DefaultProducer { try { if (body == null) { return null; - // special optimized for using these 3 type converters for common message payload types + // special optimized for using these 3 type converters for common message payload types } else if (body instanceof byte[]) { answer = HttpEntityConverter.toHttpEntity((byte[]) body, exchange); } else if (body instanceof InputStream) { diff --git a/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerConsumer.java b/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerConsumer.java index 240e673..04ab2b2 100644 --- a/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerConsumer.java +++ b/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerConsumer.java @@ -17,6 +17,7 @@ package org.apache.camel.component.scheduler; import java.util.Date; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; @@ -44,7 +45,7 @@ public class SchedulerConsumer extends ScheduledPollConsumer { } protected int sendTimerExchange() { - final Exchange exchange = getEndpoint().createExchange(); + final Exchange exchange = createExchange(false); exchange.setProperty(Exchange.TIMER_NAME, getEndpoint().getName()); Date now = new Date(); @@ -55,15 +56,28 @@ public class SchedulerConsumer extends ScheduledPollConsumer { } if (!getEndpoint().isSynchronous()) { - getAsyncProcessor().process(exchange, new AsyncCallback() { + final AtomicBoolean polled = new AtomicBoolean(true); + boolean doneSync = getAsyncProcessor().process(exchange, new AsyncCallback() { @Override public void done(boolean doneSync) { // handle any thrown exception if (exchange.getException() != null) { getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); } + boolean wasPolled = exchange.getProperty(Exchange.SCHEDULER_POLLED_MESSAGES, true, boolean.class); + if (!wasPolled) { + polled.set(false); + } + + // sync wil release outside this callback + if (!doneSync) { + releaseExchange(exchange); + } } }); + if (!doneSync) { + return polled.get() ? 1 : 0; + } } else { try { getProcessor().process(exchange); @@ -81,6 +95,7 @@ public class SchedulerConsumer extends ScheduledPollConsumer { // for example to overrule and indicate no message was polled, which can affect the scheduler // to leverage backoff on idle etc. boolean polled = exchange.getProperty(Exchange.SCHEDULER_POLLED_MESSAGES, true, boolean.class); + releaseExchange(exchange); return polled ? 1 : 0; } diff --git a/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java b/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java index 8cc80c4..40b44c8 100644 --- a/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java +++ b/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java @@ -185,7 +185,7 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener, S } protected void sendTimerExchange(long counter) { - final Exchange exchange = endpoint.createExchange(); + final Exchange exchange = createExchange(false); if (endpoint.isIncludeMetadata()) { exchange.setProperty(Exchange.TIMER_COUNTER, counter); @@ -211,6 +211,10 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener, S if (exchange.getException() != null) { getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); } + // sync wil release outside this callback + if (!doneSync) { + releaseExchange(exchange); + } } }); } else { @@ -221,8 +225,12 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener, S } // handle any thrown exception - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); + try { + if (exchange.getException() != null) { + getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); + } + } finally { + releaseExchange(exchange); } } } diff --git a/core/camel-api/src/main/java/org/apache/camel/Consumer.java b/core/camel-api/src/main/java/org/apache/camel/Consumer.java index 0f3ec2e..2360be5 100644 --- a/core/camel-api/src/main/java/org/apache/camel/Consumer.java +++ b/core/camel-api/src/main/java/org/apache/camel/Consumer.java @@ -16,6 +16,8 @@ */ package org.apache.camel; +import org.apache.camel.spi.UnitOfWork; + /** * A consumer of message exchanges from an {@link Endpoint}. * <p/> @@ -25,6 +27,21 @@ package org.apache.camel; */ public interface Consumer extends Service, EndpointAware { + /** + * The processor that will process the {@link Exchange} that was consumed. + */ Processor getProcessor(); + /** + * Creates an {@link Exchange} that was consumed. + * + * @param autoRelease whether to auto release the exchange when routing is complete via {@link UnitOfWork} + */ + Exchange createExchange(boolean autoRelease); + + /** + * Releases the {@link Exchange} when its completed processing and no longer needed. + */ + void releaseExchange(Exchange exchange); + } diff --git a/core/camel-api/src/main/java/org/apache/camel/Endpoint.java b/core/camel-api/src/main/java/org/apache/camel/Endpoint.java index 10bfd5e..cacdf51 100644 --- a/core/camel-api/src/main/java/org/apache/camel/Endpoint.java +++ b/core/camel-api/src/main/java/org/apache/camel/Endpoint.java @@ -79,6 +79,13 @@ public interface Endpoint extends IsSingleton, Service { Exchange createExchange(ExchangePattern pattern); /** + * Configures a newly created {@link Exchange}. + * + * @param exchange the new exchange + */ + void configureExchange(Exchange exchange); + + /** * Returns the context which created the endpoint * * @return the context which created the endpoint diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java index c2a1ffd..578a3ee 100644 --- a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java +++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java @@ -30,6 +30,8 @@ public interface ExtendedExchange extends Exchange { /** * Clears the exchange from user data so it may be reused. + * <p/> + * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself. */ void reset(); diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java index f1eefd0..95586c4 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java @@ -27,6 +27,11 @@ import org.apache.camel.Exchange; */ public interface ExchangeFactory { + // TODO: new factory per consumer so there is no single race bottleneck + // TODO: only use factory on route consumer to limit its scope to most significant impact + // TODO: release from extended exchange without onCompletion (overhead) + // TODO: reuse unit of work (expensive to create) + /** * Service factory key. */ @@ -34,15 +39,18 @@ public interface ExchangeFactory { /** * Gets a new {@link Exchange} + * + * @param autoRelease whether to auto release the exchange when routing is complete via {@link UnitOfWork} */ - Exchange create(); + Exchange create(boolean autoRelease); /** * Gets a new {@link Exchange} * + * @param autoRelease whether to auto release the exchange when routing is complete via {@link UnitOfWork} * @param fromEndpoint the from endpoint */ - Exchange create(Endpoint fromEndpoint); + Exchange create(Endpoint fromEndpoint, boolean autoRelease); default void release(Exchange exchange) { // noop diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java b/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java index 062f22d..20284fa 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java @@ -41,6 +41,13 @@ public interface UnitOfWork extends Service { String MDC_TRANSACTION_KEY = "camel.transactionKey"; /** + * Clears the unit of work from user data so it may be reused. + * <p/> + * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself. + */ + void reset(); + + /** * Adds a synchronization hook * * @param synchronization the hook diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java index fedae54..ec17772 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java @@ -41,12 +41,13 @@ public class DefaultExchangeFactory implements ExchangeFactory, CamelContextAwar } @Override - public Exchange create() { + public Exchange create(boolean autoRelease) { return new DefaultExchange(camelContext); } @Override - public Exchange create(Endpoint fromEndpoint) { + public Exchange create(Endpoint fromEndpoint, boolean autoRelease) { return new DefaultExchange(fromEndpoint); } + } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java index f8576b8..975b8a0 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java @@ -132,6 +132,18 @@ public class DefaultUnitOfWork implements UnitOfWork, Service { } @Override + public void reset() { + routes.clear(); + if (synchronizations != null) { + synchronizations.clear(); + } + originalInMessage = null; + if (transactedBy != null) { + transactedBy.clear(); + } + } + + @Override public void setParentUnitOfWork(UnitOfWork parentUnitOfWork) { } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java index 88142c9..30094cd 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java @@ -48,6 +48,7 @@ public class PooledExchangeFactory extends ServiceSupport private final AtomicLong acquired = new AtomicLong(); private final AtomicLong created = new AtomicLong(); private final AtomicLong released = new AtomicLong(); + private final AtomicLong discarded = new AtomicLong(); private CamelContext camelContext; private boolean statisticsEnabled = true; @@ -71,7 +72,7 @@ public class PooledExchangeFactory extends ServiceSupport } @Override - public Exchange create() { + public Exchange create(boolean autoRelease) { Exchange exchange = pool.poll(); if (exchange == null) { if (statisticsEnabled) { @@ -83,17 +84,16 @@ public class PooledExchangeFactory extends ServiceSupport if (statisticsEnabled) { acquired.incrementAndGet(); } - // reset exchange before we use it - ExtendedExchange ee = exchange.adapt(ExtendedExchange.class); - ee.reset(); } - // add on completion which will return the exchange when done - exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion); + if (autoRelease) { + // add on completion which will return the exchange when done + exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion); + } return exchange; } @Override - public Exchange create(Endpoint fromEndpoint) { + public Exchange create(Endpoint fromEndpoint, boolean autoRelease) { Exchange exchange = pool.poll(); if (exchange == null) { if (statisticsEnabled) { @@ -108,17 +108,33 @@ public class PooledExchangeFactory extends ServiceSupport // need to mark this exchange from the given endpoint exchange.adapt(ExtendedExchange.class).setFromEndpoint(fromEndpoint); } - // add on completion which will return the exchange when done - exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion); + if (autoRelease) { + // add on completion which will return the exchange when done + exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion); + } return exchange; } @Override public void release(Exchange exchange) { - if (statisticsEnabled) { - released.incrementAndGet(); + // reset exchange before returning to pool + try { + // TODO: reset on pool as this then update created to be up-to-date + ExtendedExchange ee = exchange.adapt(ExtendedExchange.class); + ee.reset(); + + // only release back in pool if reset was success + if (statisticsEnabled) { + released.incrementAndGet(); + } + pool.offer(exchange); + } catch (Exception e) { + if (statisticsEnabled) { + discarded.incrementAndGet(); + } + // ignore + LOG.debug("Error resetting exchange: {}. This exchange is discarded.", exchange); } - pool.offer(exchange); } @Override @@ -126,13 +142,14 @@ public class PooledExchangeFactory extends ServiceSupport pool.clear(); if (statisticsEnabled) { - LOG.info("PooledExchangeFactory usage [created: {}, acquired: {}, released: {}]", created.get(), acquired.get(), - released.get()); + LOG.info("PooledExchangeFactory usage [created: {}, acquired: {}, released: {}, discarded: {}]", + created.get(), acquired.get(), released.get(), discarded.get()); } created.set(0); acquired.set(0); released.set(0); + discarded.set(0); } private final class ReleaseOnCompletion extends SynchronizationAdapter { @@ -145,7 +162,9 @@ public class PooledExchangeFactory extends ServiceSupport @Override public void onDone(Exchange exchange) { - release(exchange); + if (exchange != null) { + release(exchange); + } } } diff --git a/core/camel-core-model/src/main/java/org/apache/camel/builder/ExchangeBuilder.java b/core/camel-core-model/src/main/java/org/apache/camel/builder/ExchangeBuilder.java index 1088cdc..8594bfa 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/builder/ExchangeBuilder.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/builder/ExchangeBuilder.java @@ -22,8 +22,8 @@ import java.util.Map; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; -import org.apache.camel.ExtendedCamelContext; import org.apache.camel.Message; +import org.apache.camel.support.DefaultExchange; /** * Builder to create {@link Exchange} and add headers and set body on the Exchange {@link Message}. @@ -103,7 +103,7 @@ public final class ExchangeBuilder { * @return exchange */ public Exchange build() { - Exchange exchange = context.adapt(ExtendedCamelContext.class).getExchangeFactory().create(); + Exchange exchange = new DefaultExchange(context); Message message = exchange.getIn(); message.setBody(body); if (headers.size() > 0) { diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java index 4ce6bb5..9ca24e6 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java @@ -29,7 +29,6 @@ import org.apache.camel.CamelContextAware; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Expression; -import org.apache.camel.ExtendedCamelContext; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; @@ -42,6 +41,7 @@ import org.apache.camel.spi.RouteIdAware; import org.apache.camel.spi.ShutdownAware; import org.apache.camel.support.AsyncProcessorConverterHelper; import org.apache.camel.support.AsyncProcessorSupport; +import org.apache.camel.support.DefaultExchange; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.ObjectHelper; @@ -257,10 +257,7 @@ public class WireTapProcessor extends AsyncProcessorSupport } private Exchange configureNewExchange(Exchange exchange) { - Exchange answer - = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().create(exchange.getFromEndpoint()); - answer.setPattern(ExchangePattern.InOnly); - return answer; + return new DefaultExchange(exchange.getFromEndpoint(), ExchangePattern.InOnly); } public List<Processor> getNewExchangeProcessors() { diff --git a/core/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTestEndpointTest.java b/core/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTestEndpointTest.java index bd09c43..0050d05 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTestEndpointTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTestEndpointTest.java @@ -81,9 +81,19 @@ public class DataSetTestEndpointTest extends ContextTestSupport { } @Override + public Exchange createExchange(boolean autoRelease) { + return new DefaultExchange(getEndpoint()); + } + + @Override + public void releaseExchange(Exchange exchange) { + // noop + } + + @Override public void start() { // when starting then send a message to the processor - Exchange exchange = new DefaultExchange(getEndpoint()); + Exchange exchange = createExchange(false); exchange.getIn().setBody(expectedBody); try { processor.process(exchange); diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java index 71d4265..c8e594f 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java @@ -26,6 +26,7 @@ import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.RouteAware; import org.apache.camel.spi.ExceptionHandler; +import org.apache.camel.spi.ExchangeFactory; import org.apache.camel.spi.RouteIdAware; import org.apache.camel.spi.UnitOfWork; import org.apache.camel.support.service.ServiceHelper; @@ -45,6 +46,7 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw private final Endpoint endpoint; private final Processor processor; private final AsyncProcessor asyncProcessor; + private final ExchangeFactory exchangeFactory; private ExceptionHandler exceptionHandler; private Route route; private String routeId; @@ -54,6 +56,7 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw this.processor = processor; this.asyncProcessor = AsyncProcessorConverterHelper.convert(processor); this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass()); + this.exchangeFactory = endpoint.getCamelContext().adapt(ExtendedCamelContext.class).getExchangeFactory(); } @Override @@ -121,6 +124,21 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw } @Override + public Exchange createExchange(boolean autoRelease) { + Exchange answer = exchangeFactory.create(getEndpoint(), autoRelease); + endpoint.configureExchange(answer); + answer.adapt(ExtendedExchange.class).setFromRouteId(routeId); + return answer; + } + + @Override + public void releaseExchange(Exchange exchange) { + if (exchange != null) { + exchangeFactory.release(exchange); + } + } + + @Override public Endpoint getEndpoint() { return endpoint; } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java index aa777e9..70d9aaf 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java @@ -27,10 +27,8 @@ import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; -import org.apache.camel.ExtendedCamelContext; import org.apache.camel.PollingConsumer; import org.apache.camel.spi.ExceptionHandler; -import org.apache.camel.spi.ExchangeFactory; import org.apache.camel.spi.HasId; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.PropertyConfigurer; @@ -56,7 +54,6 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint private volatile String endpointUri; private CamelContext camelContext; private Component component; - private ExchangeFactory exchangeFactory; @Metadata(label = "advanced", defaultValue = "true", description = "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired)" @@ -101,9 +98,6 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint this.setEndpointUri(endpointUri); if (component != null) { this.camelContext = component.getCamelContext(); - if (this.camelContext != null) { - this.exchangeFactory = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory(); - } } } @@ -234,14 +228,19 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint @Override public Exchange createExchange() { - return createExchange(getExchangePattern()); + return createExchange(exchangePattern); } @Override public Exchange createExchange(ExchangePattern pattern) { - Exchange exchange = exchangeFactory.create(this); - exchange.setPattern(pattern); - return exchange; + Exchange answer = new DefaultExchange(this, pattern); + configureExchange(answer); + return answer; + } + + @Override + public void configureExchange(Exchange exchange) { + // noop } /** @@ -490,10 +489,6 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint protected void doInit() throws Exception { ObjectHelper.notNull(getCamelContext(), "camelContext"); - if (exchangeFactory == null) { - exchangeFactory = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory(); - } - if (autowiredEnabled && getComponent() != null) { PropertyConfigurer configurer = getComponent().getEndpointPropertyConfigurer(); if (configurer instanceof PropertyConfigurerGetter) { diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java index fca25fa..5ff30c9 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java @@ -53,6 +53,7 @@ public final class DefaultExchange implements ExtendedExchange { private Exception exception; private String exchangeId; private UnitOfWork unitOfWork; + private ExchangePattern originalPattern; private ExchangePattern pattern; private Endpoint fromEndpoint; private String fromRouteId; @@ -74,12 +75,14 @@ public final class DefaultExchange implements ExtendedExchange { this.context = context; this.pattern = ExchangePattern.InOnly; this.created = System.currentTimeMillis(); + this.originalPattern = this.pattern; } public DefaultExchange(CamelContext context, ExchangePattern pattern) { this.context = context; this.pattern = pattern; this.created = System.currentTimeMillis(); + this.originalPattern = this.pattern; } public DefaultExchange(Exchange parent) { @@ -89,6 +92,7 @@ public final class DefaultExchange implements ExtendedExchange { this.fromEndpoint = parent.getFromEndpoint(); this.fromRouteId = parent.getFromRouteId(); this.unitOfWork = parent.getUnitOfWork(); + this.originalPattern = this.pattern; } public DefaultExchange(Endpoint fromEndpoint) { @@ -96,6 +100,7 @@ public final class DefaultExchange implements ExtendedExchange { this.pattern = ExchangePattern.InOnly; this.created = System.currentTimeMillis(); this.fromEndpoint = fromEndpoint; + this.originalPattern = this.pattern; } public DefaultExchange(Endpoint fromEndpoint, ExchangePattern pattern) { @@ -103,6 +108,7 @@ public final class DefaultExchange implements ExtendedExchange { this.pattern = pattern; this.created = System.currentTimeMillis(); this.fromEndpoint = fromEndpoint; + this.originalPattern = this.pattern; } @Override @@ -118,13 +124,17 @@ public final class DefaultExchange implements ExtendedExchange { public void reset() { this.properties.clear(); this.exchangeId = null; + // TODO: This is reset time this.created = System.currentTimeMillis(); + this.in = null; this.out = null; this.exception = null; this.unitOfWork = null; - this.pattern = null; - this.fromEndpoint = null; - this.fromRouteId = null; + // reset pattern to original + this.pattern = originalPattern; + // do not reset endpoint as it would be the same consumer/endpoint again + // this.fromEndpoint = null; + // this.fromRouteId = null; if (this.onCompletions != null) { this.onCompletions.clear(); } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java index 74725c9..99692e0 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java @@ -113,6 +113,11 @@ public class DefaultInterceptSendToEndpoint implements InterceptSendToEndpoint, } @Override + public void configureExchange(Exchange exchange) { + delegate.configureExchange(exchange); + } + + @Override public CamelContext getCamelContext() { return delegate.getCamelContext(); } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java b/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java index be1759c..ca4930f 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java @@ -16,9 +16,7 @@ */ package org.apache.camel.support; -import org.apache.camel.Endpoint; -import org.apache.camel.PollingConsumer; -import org.apache.camel.Processor; +import org.apache.camel.*; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.support.service.ServiceSupport; @@ -50,6 +48,16 @@ public abstract class PollingConsumerSupport extends ServiceSupport implements P return null; } + @Override + public Exchange createExchange(boolean autoRelease) { + throw new UnsupportedOperationException("Not supported on PollingConsumer"); + } + + @Override + public void releaseExchange(Exchange exchange) { + throw new UnsupportedOperationException("Not supported on PollingConsumer"); + } + public ExceptionHandler getExceptionHandler() { return exceptionHandler; }