This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch pool in repository https://gitbox.apache.org/repos/asf/camel.git
commit 337b9c579a73cfdae52721f2f04f76ea616681a8 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sun Mar 7 17:54:51 2021 +0100 CAMEL-16279: camel-core - Optimize core to reduce object allocations by pooloing reusable tasks in the routing engine. --- .../java/org/apache/camel/spi/ExchangeFactory.java | 86 +----------- .../apache/camel/spi/InternalProcessorFactory.java | 4 +- .../org/apache/camel/spi/PooledObjectFactory.java | 125 +++++++++++++++++ .../camel/impl/engine/AbstractCamelContext.java | 3 + .../camel/impl/engine/PooledExchangeFactory.java | 5 + .../impl/engine/PrototypeExchangeFactory.java | 111 ++------------- .../apache/camel/processor/PooledExchangeTask.java | 41 ++++++ .../camel/processor/PooledExchangeTaskFactory.java | 56 ++++++++ .../apache/camel/processor/PooledTaskFactory.java | 64 +++++++++ .../camel/processor/PrototypeTaskFactory.java | 48 +++++++ .../errorhandler/RedeliveryErrorHandler.java | 137 ++++++++++++++----- .../camel/support/PooledObjectFactorySupport.java | 152 +++++++++++++++++++++ .../support/PrototypeObjectFactorySupport.java | 139 +++++++++++++++++++ 13 files changed, 756 insertions(+), 215 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java index fcedd54..29b3fdb 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java @@ -16,12 +16,10 @@ */ package org.apache.camel.spi; -import org.apache.camel.CamelContextAware; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.NonManagedService; -import org.apache.camel.Service; /** * Factory used by {@link Consumer} to create Camel {@link Exchange} holding the incoming message received by the @@ -36,50 +34,7 @@ import org.apache.camel.Service; * The factory is pluggable which allows to use different strategies. The default factory will create a new * {@link Exchange} instance, and the pooled factory will pool and reuse exchanges. */ -public interface ExchangeFactory extends Service, CamelContextAware, NonManagedService, RouteIdAware { - - /** - * Utilization statistics of the this factory. - */ - interface Statistics { - - /** - * Number of new exchanges created. - */ - long getCreatedCounter(); - - /** - * Number of exchanges acquired (reused) when using pooled factory. - */ - long getAcquiredCounter(); - - /** - * Number of exchanges released back to pool - */ - long getReleasedCounter(); - - /** - * Number of exchanges discarded (thrown away) such as if no space in cache pool. - */ - long getDiscardedCounter(); - - /** - * Reset the counters - */ - void reset(); - - /** - * Whether statistics is enabled. - */ - boolean isStatisticsEnabled(); - - /** - * Sets whether statistics is enabled. - * - * @param statisticsEnabled <tt>true</tt> to enable - */ - void setStatisticsEnabled(boolean statisticsEnabled); - } +public interface ExchangeFactory extends PooledObjectFactory<Exchange>, NonManagedService, RouteIdAware { /** * Service factory key. @@ -125,43 +80,8 @@ public interface ExchangeFactory extends Service, CamelContextAware, NonManagedS } /** - * The capacity the pool (for each consumer) uses for storing exchanges. The default capacity is 100. - */ - int getCapacity(); - - /** - * The current number of exchanges in the pool - */ - int getSize(); - - /** - * The capacity the pool (for each consumer) uses for storing exchanges. The default capacity is 100. - */ - void setCapacity(int capacity); - - /** - * Whether statistics is enabled. - */ - boolean isStatisticsEnabled(); - - /** - * Whether statistics is enabled. - */ - void setStatisticsEnabled(boolean statisticsEnabled); - - /** - * Reset the statistics - */ - void resetStatistics(); - - /** - * Purges the internal cache (if pooled) - */ - void purge(); - - /** - * Gets the usage statistics + * Whether the factory is pooled. */ - Statistics getStatistics(); + boolean isPooled(); } diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java index b3f8868..8a56f21 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java @@ -27,7 +27,9 @@ import org.apache.camel.Route; /** * A factory used internally by Camel to create {@link Processor} and other internal building blocks. This factory is - * used to have loose coupling between the modules in core. Camel user user should only use {@link ProcessorFactory}. + * used to have loose coupling between the modules in core. + * + * Camel end user should NOT use this, but use {@link ProcessorFactory} instead. * * @see ProcessorFactory */ diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java new file mode 100644 index 0000000..db4c0d1 --- /dev/null +++ b/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +import org.apache.camel.CamelContextAware; +import org.apache.camel.Service; + +/** + * Factory for pooled objects or tasks. + */ +public interface PooledObjectFactory<T> extends Service, CamelContextAware { + + /** + * Utilization statistics of the this factory. + */ + interface Statistics { + + /** + * Number of new exchanges created. + */ + long getCreatedCounter(); + + /** + * Number of exchanges acquired (reused) when using pooled factory. + */ + long getAcquiredCounter(); + + /** + * Number of exchanges released back to pool + */ + long getReleasedCounter(); + + /** + * Number of exchanges discarded (thrown away) such as if no space in cache pool. + */ + long getDiscardedCounter(); + + /** + * Reset the counters + */ + void reset(); + + /** + * Whether statistics is enabled. + */ + boolean isStatisticsEnabled(); + + /** + * Sets whether statistics is enabled. + * + * @param statisticsEnabled <tt>true</tt> to enable + */ + void setStatisticsEnabled(boolean statisticsEnabled); + } + + /** + * The current number of objects in the pool + */ + int getSize(); + + /** + * The capacity the pool uses for storing objects. The default capacity is 100. + */ + int getCapacity(); + + /** + * The capacity the pool uses for storing objects. The default capacity is 100. + */ + void setCapacity(int capacity); + + /** + * Whether statistics is enabled. + */ + boolean isStatisticsEnabled(); + + /** + * Whether statistics is enabled. + */ + void setStatisticsEnabled(boolean statisticsEnabled); + + /** + * Reset the statistics + */ + void resetStatistics(); + + /** + * Purges the internal cache (if pooled) + */ + void purge(); + + /** + * Gets the usage statistics + */ + Statistics getStatistics(); + + /** + * Acquires an object from the pool (if any) + * + * @return the object or <tt>null</tt> if the pool is empty + */ + T acquire(); + + /** + * Releases the object back to the pool + * + * @param t the object + * @return true if released into the pool, or false if something went wrong and the object was discarded + */ + boolean release(T t); + +} diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java index d7e6dd4..45fcf6d 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java @@ -2882,6 +2882,9 @@ public abstract class AbstractCamelContext extends BaseService } bootstraps.clear(); + if (adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled()) { + LOG.info("Pooled mode enabled. Camel pools and reuses objects to reduce JVM object allocations."); + } if (isLightweight()) { LOG.info("Lightweight mode enabled. Performing optimizations and memory reduction."); ReifierStrategy.clearReifiers(); diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java index 4079a46..b1647b5 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java @@ -168,6 +168,11 @@ public final class PooledExchangeFactory extends PrototypeExchangeFactory { } @Override + public boolean isPooled() { + return true; + } + + @Override protected void doStop() throws Exception { exchangeFactoryManager.removeExchangeFactory(this); logUsageSummary(LOG, "PooledExchangeFactory", pool.size()); diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java index fb17cb0..eef1ea7 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java @@ -16,9 +16,6 @@ */ package org.apache.camel.impl.engine; -import java.util.concurrent.atomic.LongAdder; - -import org.apache.camel.CamelContext; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; @@ -26,7 +23,7 @@ import org.apache.camel.ExtendedCamelContext; import org.apache.camel.spi.ExchangeFactory; import org.apache.camel.spi.ExchangeFactoryManager; import org.apache.camel.support.DefaultExchange; -import org.apache.camel.support.service.ServiceSupport; +import org.apache.camel.support.PooledObjectFactorySupport; import org.apache.camel.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,13 +31,11 @@ import org.slf4j.LoggerFactory; /** * {@link ExchangeFactory} that creates a new {@link Exchange} instance. */ -public class PrototypeExchangeFactory extends ServiceSupport implements ExchangeFactory { +public class PrototypeExchangeFactory extends PooledObjectFactorySupport<Exchange> implements ExchangeFactory { private static final Logger LOG = LoggerFactory.getLogger(PrototypeExchangeFactory.class); - final UtilizationStatistics statistics = new UtilizationStatistics(); final Consumer consumer; - CamelContext camelContext; ExchangeFactoryManager exchangeFactoryManager; String routeId; @@ -54,6 +49,7 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange @Override protected void doBuild() throws Exception { + super.doBuild(); this.exchangeFactoryManager = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactoryManager(); } @@ -73,16 +69,6 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange } @Override - public CamelContext getCamelContext() { - return camelContext; - } - - @Override - public void setCamelContext(CamelContext camelContext) { - this.camelContext = camelContext; - } - - @Override public ExchangeFactory newExchangeFactory(Consumer consumer) { PrototypeExchangeFactory answer = new PrototypeExchangeFactory(consumer); answer.setStatisticsEnabled(statistics.isStatisticsEnabled()); @@ -92,6 +78,11 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange } @Override + public Exchange acquire() { + throw new UnsupportedOperationException("Not in use"); + } + + @Override public Exchange create(boolean autoRelease) { if (statistics.isStatisticsEnabled()) { statistics.created.increment(); @@ -116,47 +107,18 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange } @Override - public boolean isStatisticsEnabled() { - return statistics.isStatisticsEnabled(); - } - - @Override - public void setStatisticsEnabled(boolean statisticsEnabled) { - statistics.setStatisticsEnabled(statisticsEnabled); - } - - @Override - public int getCapacity() { - return 0; - } - - @Override - public int getSize() { - return 0; - } - - @Override - public void setCapacity(int capacity) { - // not in use - } - - @Override public void resetStatistics() { statistics.reset(); } @Override - public void purge() { - // not in use - } - - @Override - public Statistics getStatistics() { - return statistics; + public boolean isPooled() { + return false; } @Override protected void doStart() throws Exception { + super.doStart(); if (exchangeFactoryManager != null) { exchangeFactoryManager.addExchangeFactory(this); } @@ -164,6 +126,7 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange @Override protected void doStop() throws Exception { + super.doStop(); if (exchangeFactoryManager != null) { exchangeFactoryManager.removeExchangeFactory(this); } @@ -190,54 +153,4 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange } } - /** - * Represents utilization statistics - */ - final class UtilizationStatistics implements ExchangeFactory.Statistics { - - boolean statisticsEnabled; - final LongAdder created = new LongAdder(); - final LongAdder acquired = new LongAdder(); - final LongAdder released = new LongAdder(); - final LongAdder discarded = new LongAdder(); - - @Override - public void reset() { - created.reset(); - acquired.reset(); - released.reset(); - discarded.reset(); - } - - @Override - public long getCreatedCounter() { - return created.longValue(); - } - - @Override - public long getAcquiredCounter() { - return acquired.longValue(); - } - - @Override - public long getReleasedCounter() { - return released.longValue(); - } - - @Override - public long getDiscardedCounter() { - return discarded.longValue(); - } - - @Override - public boolean isStatisticsEnabled() { - return statisticsEnabled; - } - - @Override - public void setStatisticsEnabled(boolean statisticsEnabled) { - this.statisticsEnabled = statisticsEnabled; - } - } - } diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTask.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTask.java new file mode 100644 index 0000000..d4e0226 --- /dev/null +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTask.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; + +/** + * A task that EIPs and internal routing engine uses to store state when processing an {@link Exchange}. + * + * @see org.apache.camel.processor.PooledExchangeTaskFactory + */ +public interface PooledExchangeTask extends Runnable { + + /** + * Prepares the task for the given exchange and its callback + * + * @param exchange the exchange + * @param callback the callback + */ + void prepare(Exchange exchange, AsyncCallback callback); + + /** + * Resets the task after its done and can be reused for another exchange. + */ + void reset(); +} diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTaskFactory.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTaskFactory.java new file mode 100644 index 0000000..b90e9f5 --- /dev/null +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTaskFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.spi.PooledObjectFactory; + +/** + * Factory to create {@link PooledExchangeTask}. + * + * @see PooledExchangeTask + */ +public interface PooledExchangeTaskFactory extends PooledObjectFactory<PooledExchangeTask> { + + /** + * Creates a new task to use for processing the exchange. + * + * @param exchange the current exchange + * @param callback the callback for the exchange + * @return the task + */ + PooledExchangeTask create(Exchange exchange, AsyncCallback callback); + + /** + * Attempts to acquire a pooled task to use for processing the exchange, if not possible then a new task is created. + * + * @param exchange the current exchange + * @param callback the callback for the exchange + * @return the task + */ + PooledExchangeTask acquire(Exchange exchange, AsyncCallback callback); + + /** + * Releases the task after its done being used + * + * @param task the task + * @return true if the task was released, and false if the task failed to be released or no space in pool, and + * the task was discarded. + */ + boolean release(PooledExchangeTask task); +} diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java new file mode 100644 index 0000000..c1ce72e --- /dev/null +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.support.PooledObjectFactorySupport; + +public abstract class PooledTaskFactory extends PooledObjectFactorySupport<PooledExchangeTask> + implements PooledExchangeTaskFactory { + + @Override + public PooledExchangeTask acquire() { + return pool.poll(); + } + + public PooledExchangeTask acquire(Exchange exchange, AsyncCallback callback) { + PooledExchangeTask task = acquire(); + if (task == null) { + if (statistics.isStatisticsEnabled()) { + statistics.created.increment(); + } + task = create(exchange, callback); + } else { + if (statistics.isStatisticsEnabled()) { + statistics.acquired.increment(); + } + } + task.prepare(exchange, callback); + return task; + } + + @Override + public boolean release(PooledExchangeTask task) { + boolean inserted = pool.offer(task); + if (statistics.isStatisticsEnabled()) { + if (inserted) { + statistics.released.increment(); + } else { + statistics.discarded.increment(); + } + } + return inserted; + } + + @Override + public String toString() { + return "PooledTaskFactory[capacity: " + getCapacity() + "]"; + } +} diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PrototypeTaskFactory.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PrototypeTaskFactory.java new file mode 100644 index 0000000..0c5c444 --- /dev/null +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PrototypeTaskFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.support.PrototypeObjectFactorySupport; + +public abstract class PrototypeTaskFactory extends PrototypeObjectFactorySupport<PooledExchangeTask> + implements PooledExchangeTaskFactory { + + @Override + public PooledExchangeTask acquire(Exchange exchange, AsyncCallback callback) { + PooledExchangeTask task = create(exchange, callback); + task.prepare(exchange, callback); + return task; + } + + @Override + public PooledExchangeTask acquire() { + throw new UnsupportedOperationException("Not in use"); + } + + @Override + public boolean release(PooledExchangeTask pooledTask) { + // not pooled + return true; + } + + @Override + public String toString() { + return "PrototypeTaskFactory"; + } +} diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java index 3c0eaf3..39d3658 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java @@ -38,6 +38,10 @@ import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.RuntimeCamelException; +import org.apache.camel.processor.PooledExchangeTask; +import org.apache.camel.processor.PooledExchangeTaskFactory; +import org.apache.camel.processor.PooledTaskFactory; +import org.apache.camel.processor.PrototypeTaskFactory; import org.apache.camel.spi.AsyncProcessorAwaitManager; import org.apache.camel.spi.CamelLogger; import org.apache.camel.spi.ErrorHandlerRedeliveryCustomizer; @@ -69,6 +73,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport private static final Logger LOG = LoggerFactory.getLogger(RedeliveryErrorHandler.class); + // factory + protected PooledExchangeTaskFactory taskFactory; + // state protected final AtomicInteger redeliverySleepCounter = new AtomicInteger(); protected ScheduledExecutorService executorService; @@ -169,12 +176,8 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport @Override public boolean process(final Exchange exchange, final AsyncCallback callback) { // Create the redelivery task object for this exchange (optimize to only create task can do redelivery or not) - Runnable task; - if (simpleTask) { - task = new SimpleTask(exchange, callback); - } else { - task = new RedeliveryTask(exchange, callback); - } + Runnable task = taskFactory.acquire(exchange, callback); + // Run it if (exchange.isTransacted()) { reactiveExecutor.scheduleSync(task); @@ -345,14 +348,18 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport /** * Simple task to perform calling the processor with no redelivery support */ - protected class SimpleTask implements Runnable, AsyncCallback { - private final ExtendedExchange exchange; - private final AsyncCallback callback; - private boolean first = true; + protected class SimpleTask implements PooledExchangeTask, Runnable, AsyncCallback { + private ExtendedExchange exchange; + private AsyncCallback callback; + private boolean first; - SimpleTask(Exchange exchange, AsyncCallback callback) { + public SimpleTask() { + } + + public void prepare(Exchange exchange, AsyncCallback callback) { this.exchange = (ExtendedExchange) exchange; this.callback = callback; + this.first = true; } @Override @@ -360,6 +367,12 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport return "SimpleTask"; } + public void reset() { + this.exchange = null; + this.callback = null; + this.first = true; + } + @Override public void done(boolean doneSync) { // the run method decides what to do when we are done @@ -385,7 +398,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport if (exchange.getException() == null) { exchange.setException(new RejectedExecutionException()); } - callback.done(false); + AsyncCallback cb = callback; + taskFactory.release(this); + cb.done(false); return; } if (exchange.isInterrupted()) { @@ -396,7 +411,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport } exchange.setRouteStop(true); // we should not continue routing so call callback - callback.done(false); + AsyncCallback cb = callback; + taskFactory.release(this); + cb.done(false); return; } @@ -413,14 +430,18 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport onExceptionOccurred(); prepareExchangeAfterFailure(exchange); // we do not support redelivery so continue callback - reactiveExecutor.schedule(callback); + AsyncCallback cb = callback; + taskFactory.release(this); + reactiveExecutor.schedule(cb); } else if (first) { // first time call the target processor first = false; outputAsync.process(exchange, this); } else { // we are done so continue callback - reactiveExecutor.schedule(callback); + AsyncCallback cb = callback; + taskFactory.release(this); + reactiveExecutor.schedule(cb); } } @@ -585,15 +606,16 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport /** * Task to perform calling the processor and handling redelivery if it fails (more advanced than ProcessTask) */ - protected class RedeliveryTask implements Runnable { - private final Exchange original; - private final ExtendedExchange exchange; - private final AsyncCallback callback; + protected class RedeliveryTask implements PooledExchangeTask, Runnable { + // state + private Exchange original; + private ExtendedExchange exchange; + private AsyncCallback callback; private int redeliveryCounter; private long redeliveryDelay; - private Predicate retryWhilePredicate; // default behavior which can be overloaded on a per exception basis + private Predicate retryWhilePredicate; private RedeliveryPolicy currentRedeliveryPolicy; private Processor failureProcessor; private Processor onRedeliveryProcessor; @@ -603,7 +625,16 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport private boolean useOriginalInMessage; private boolean useOriginalInBody; - public RedeliveryTask(Exchange exchange, AsyncCallback callback) { + public RedeliveryTask() { + } + + @Override + public String toString() { + return "RedeliveryTask"; + } + + @Override + public void prepare(Exchange exchange, AsyncCallback callback) { this.retryWhilePredicate = retryWhilePolicy; this.currentRedeliveryPolicy = redeliveryPolicy; this.handledPredicate = getDefaultHandledPredicate(); @@ -611,7 +642,6 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport this.useOriginalInBody = useOriginalBodyPolicy; this.onRedeliveryProcessor = redeliveryProcessor; this.onExceptionProcessor = RedeliveryErrorHandler.this.onExceptionProcessor; - // do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the // original Exchange is being redelivered, and not a mutated Exchange this.original = redeliveryEnabled ? defensiveCopyExchangeIfNeeded(exchange) : null; @@ -620,8 +650,20 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport } @Override - public String toString() { - return "RedeliveryTask"; + public void reset() { + this.retryWhilePredicate = null; + this.currentRedeliveryPolicy = null; + this.handledPredicate = null; + this.continuedPredicate = null; + this.useOriginalInMessage = false; + this.useOriginalInBody = false; + this.onRedeliveryProcessor = null; + this.onExceptionProcessor = null; + this.original = null; + this.exchange = null; + this.callback = null; + this.redeliveryCounter = 0; + this.redeliveryDelay = 0; } /** @@ -635,7 +677,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport if (exchange.getException() == null) { exchange.setException(new RejectedExecutionException()); } - callback.done(false); + AsyncCallback cb = callback; + taskFactory.release(this); + cb.done(false); return; } @@ -644,7 +688,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport } catch (Throwable e) { // unexpected exception during running so break out exchange.setException(e); - callback.done(false); + AsyncCallback cb = callback; + taskFactory.release(this); + cb.done(false); } } @@ -804,7 +850,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport // only process if the exchange hasn't failed // and it has not been handled by the error processor if (isDone(exchange)) { - reactiveExecutor.schedule(callback); + AsyncCallback cb = callback; + taskFactory.release(this); + reactiveExecutor.schedule(cb); } else { // error occurred so loop back around which we do by invoking the processAsyncErrorHandler reactiveExecutor.schedule(this); @@ -1107,7 +1155,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport } } finally { // if the fault was handled asynchronously, this should be reflected in the callback as well - reactiveExecutor.schedule(callback); + AsyncCallback cb = callback; + taskFactory.release(this); + reactiveExecutor.schedule(cb); } }); } else { @@ -1126,7 +1176,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport prepareExchangeAfterFailure(exchange, isDeadLetterChannel, shouldHandle, shouldContinue); } finally { // callback we are done - reactiveExecutor.schedule(callback); + AsyncCallback cb = callback; + taskFactory.release(this); + reactiveExecutor.schedule(cb); } } @@ -1503,8 +1555,6 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport @Override protected void doStart() throws Exception { - ServiceHelper.startService(output, outputAsync, deadLetter); - // determine if redeliver is enabled or not redeliveryEnabled = determineIfRedeliveryIsEnabled(); if (LOG.isTraceEnabled()) { @@ -1531,6 +1581,28 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport // however if we dont then its less memory overhead (and a bit less cpu) of using the simple task simpleTask = deadLetter == null && !redeliveryEnabled && (exceptionPolicies == null || exceptionPolicies.isEmpty()) && onPrepareProcessor == null; + + boolean pooled = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled(); + if (pooled) { + taskFactory = new PooledTaskFactory() { + @Override + public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { + return simpleTask ? new SimpleTask() : new RedeliveryTask(); + } + }; + int capacity = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity(); + taskFactory.setCapacity(capacity); + } else { + taskFactory = new PrototypeTaskFactory() { + @Override + public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { + return simpleTask ? new SimpleTask() : new RedeliveryTask(); + } + }; + } + LOG.trace("Using TaskFactory: {}", taskFactory); + + ServiceHelper.startService(taskFactory, output, outputAsync, deadLetter); } @Override @@ -1542,6 +1614,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport @Override protected void doShutdown() throws Exception { - ServiceHelper.stopAndShutdownServices(deadLetter, output, outputAsync); + ServiceHelper.stopAndShutdownServices(deadLetter, output, outputAsync, taskFactory); } + } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java b/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java new file mode 100644 index 0000000..91c56cd --- /dev/null +++ b/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.support; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.LongAdder; + +import org.apache.camel.CamelContext; +import org.apache.camel.spi.PooledObjectFactory; +import org.apache.camel.support.service.ServiceSupport; + +public abstract class PooledObjectFactorySupport<T> extends ServiceSupport implements PooledObjectFactory<T> { + + protected final UtilizationStatistics statistics = new UtilizationStatistics(); + + protected CamelContext camelContext; + protected BlockingQueue<T> pool; + protected int capacity = 100; + + @Override + protected void doBuild() throws Exception { + super.doBuild(); + this.pool = new ArrayBlockingQueue<>(capacity); + } + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public boolean isStatisticsEnabled() { + return statistics.isStatisticsEnabled(); + } + + @Override + public void setStatisticsEnabled(boolean statisticsEnabled) { + statistics.setStatisticsEnabled(statisticsEnabled); + } + + @Override + public int getSize() { + if (pool != null) { + return pool.size(); + } else { + return 0; + } + } + + @Override + public int getCapacity() { + return capacity; + } + + @Override + public void setCapacity(int capacity) { + this.capacity = capacity; + } + + @Override + public void resetStatistics() { + statistics.reset(); + } + + @Override + public void purge() { + pool.clear(); + } + + @Override + public Statistics getStatistics() { + return statistics; + } + + @Override + protected void doShutdown() throws Exception { + super.doShutdown(); + statistics.reset(); + pool.clear(); + } + + /** + * Represents utilization statistics + */ + protected final class UtilizationStatistics implements PooledObjectFactory.Statistics { + + boolean statisticsEnabled; + public final LongAdder created = new LongAdder(); + public final LongAdder acquired = new LongAdder(); + public final LongAdder released = new LongAdder(); + public final LongAdder discarded = new LongAdder(); + + @Override + public void reset() { + created.reset(); + acquired.reset(); + released.reset(); + discarded.reset(); + } + + @Override + public long getCreatedCounter() { + return created.longValue(); + } + + @Override + public long getAcquiredCounter() { + return acquired.longValue(); + } + + @Override + public long getReleasedCounter() { + return released.longValue(); + } + + @Override + public long getDiscardedCounter() { + return discarded.longValue(); + } + + @Override + public boolean isStatisticsEnabled() { + return statisticsEnabled; + } + + @Override + public void setStatisticsEnabled(boolean statisticsEnabled) { + this.statisticsEnabled = statisticsEnabled; + } + } + +} diff --git a/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java b/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java new file mode 100644 index 0000000..5df6bbf --- /dev/null +++ b/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.support; + +import java.util.concurrent.atomic.LongAdder; + +import org.apache.camel.CamelContext; +import org.apache.camel.spi.PooledObjectFactory; +import org.apache.camel.support.service.ServiceSupport; + +/** + * {@link org.apache.camel.spi.PooledObjectFactory} that creates a new instance (does not pool). + */ +public abstract class PrototypeObjectFactorySupport<T> extends ServiceSupport implements PooledObjectFactory<T> { + + protected final UtilizationStatistics statistics = new UtilizationStatistics(); + private CamelContext camelContext; + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public boolean isStatisticsEnabled() { + return statistics.isStatisticsEnabled(); + } + + @Override + public void setStatisticsEnabled(boolean statisticsEnabled) { + statistics.setStatisticsEnabled(statisticsEnabled); + } + + @Override + public int getSize() { + return 0; + } + + @Override + public int getCapacity() { + return 0; + } + + @Override + public void setCapacity(int capacity) { + // not in use + } + + @Override + public void resetStatistics() { + statistics.reset(); + } + + @Override + public void purge() { + // not in use + } + + @Override + public Statistics getStatistics() { + return statistics; + } + + @Override + protected void doShutdown() throws Exception { + super.doShutdown(); + statistics.reset(); + } + + /** + * Represents utilization statistics + */ + protected final class UtilizationStatistics implements Statistics { + + boolean statisticsEnabled; + public final LongAdder created = new LongAdder(); + public final LongAdder acquired = new LongAdder(); + public final LongAdder released = new LongAdder(); + public final LongAdder discarded = new LongAdder(); + + @Override + public void reset() { + created.reset(); + acquired.reset(); + released.reset(); + discarded.reset(); + } + + @Override + public long getCreatedCounter() { + return created.longValue(); + } + + @Override + public long getAcquiredCounter() { + return acquired.longValue(); + } + + @Override + public long getReleasedCounter() { + return released.longValue(); + } + + @Override + public long getDiscardedCounter() { + return discarded.longValue(); + } + + @Override + public boolean isStatisticsEnabled() { + return statisticsEnabled; + } + + @Override + public void setStatisticsEnabled(boolean statisticsEnabled) { + this.statisticsEnabled = statisticsEnabled; + } + } + +}