This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new ee5a18c CAMEL-16279: camel-core - Optimize core to reduce object allocations by pooloing reusable tasks in the routing engine. ee5a18c is described below commit ee5a18c8b4989ce2933203c47220830c9c7a8a82 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Mar 9 09:08:06 2021 +0100 CAMEL-16279: camel-core - Optimize core to reduce object allocations by pooloing reusable tasks in the routing engine. --- .../java/org/apache/camel/spi/ExchangeFactory.java | 5 -- .../org/apache/camel/spi/PooledObjectFactory.java | 5 ++ .../engine/CamelInternalPooledTaskFactory.java | 47 +++++++++++++ .../camel/impl/engine/CamelInternalProcessor.java | 79 ++++++++++++++++++---- .../camel/impl/engine/CamelInternalTask.java | 45 ++++++++++++ .../camel/impl/engine/PooledExchangeFactory.java | 35 ---------- .../java/org/apache/camel/processor/Pipeline.java | 15 ++++ .../camel/main/DefaultConfigurationConfigurer.java | 2 +- .../camel/support/PooledObjectFactorySupport.java | 17 ++++- .../support/PrototypeObjectFactorySupport.java | 5 ++ 10 files changed, 199 insertions(+), 56 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java index 29b3fdb..75b9d21 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java @@ -79,9 +79,4 @@ public interface ExchangeFactory extends PooledObjectFactory<Exchange>, NonManag return true; } - /** - * Whether the factory is pooled. - */ - boolean isPooled(); - } diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java index db4c0d1..a9c38bc 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java @@ -108,6 +108,11 @@ public interface PooledObjectFactory<T> extends Service, CamelContextAware { Statistics getStatistics(); /** + * Whether the factory is pooled. + */ + boolean isPooled(); + + /** * Acquires an object from the pool (if any) * * @return the object or <tt>null</tt> if the pool is empty diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalPooledTaskFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalPooledTaskFactory.java new file mode 100644 index 0000000..faace27 --- /dev/null +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalPooledTaskFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.engine; + +import org.apache.camel.support.PooledObjectFactorySupport; + +/** + * A pool for reusing {@link CamelInternalTask} to reduce object allocations. + */ +final class CamelInternalPooledTaskFactory extends PooledObjectFactorySupport<CamelInternalTask> { + + @Override + public void setStatisticsEnabled(boolean statisticsEnabled) { + // we do not want to capture statistics so its disabled + } + + @Override + public CamelInternalTask acquire() { + return pool.poll(); + } + + @Override + public boolean release(CamelInternalTask task) { + task.reset(); + return pool.offer(task); + } + + @Override + public String toString() { + return "CamelInternalPooledTaskFactory[capacity: " + getCapacity() + "]"; + } + +} diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java index 9e847fe..79919ea 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java @@ -17,6 +17,7 @@ package org.apache.camel.impl.engine; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.concurrent.CopyOnWriteArrayList; @@ -45,6 +46,7 @@ import org.apache.camel.spi.InflightRepository; import org.apache.camel.spi.InternalProcessor; import org.apache.camel.spi.ManagementInterceptStrategy.InstrumentationProcessor; import org.apache.camel.spi.MessageHistoryFactory; +import org.apache.camel.spi.PooledObjectFactory; import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.spi.RoutePolicy; import org.apache.camel.spi.ShutdownStrategy; @@ -61,6 +63,7 @@ import org.apache.camel.support.OrderedComparator; import org.apache.camel.support.SynchronizationAdapter; import org.apache.camel.support.UnitOfWorkHelper; import org.apache.camel.support.processor.DelegateAsyncProcessor; +import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.StopWatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,6 +106,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In private final ShutdownStrategy shutdownStrategy; private final List<CamelInternalProcessorAdvice<?>> advices = new ArrayList<>(); private byte statefulAdvices; + private PooledObjectFactory<CamelInternalTask> taskFactory; public CamelInternalProcessor(CamelContext camelContext) { this.camelContext = camelContext; @@ -118,6 +122,27 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In } @Override + protected void doBuild() throws Exception { + boolean pooled = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled(); + + // only create pooled task factory + if (pooled) { + taskFactory = new CamelInternalPooledTaskFactory(); + int capacity = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity(); + taskFactory.setCapacity(capacity); + LOG.trace("Using TaskFactory: {}", taskFactory); + } + + ServiceHelper.buildService(taskFactory, processor); + } + + @Override + protected void doShutdown() throws Exception { + super.doShutdown(); + ServiceHelper.stopAndShutdownServices(taskFactory, processor); + } + + @Override public void addAdvice(CamelInternalProcessorAdvice<?> advice) { advices.add(advice); // ensure advices are sorted so they are in the order we want @@ -174,19 +199,35 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In /** * Callback task to process the advices after processing. */ - private final class AsyncAfterTask implements AsyncCallback { + private final class AsyncAfterTask implements CamelInternalTask { private final Object[] states; - private final Exchange exchange; - private final AsyncCallback originalCallback; + private Exchange exchange; + private AsyncCallback originalCallback; - private AsyncAfterTask(Object[] states, Exchange exchange, AsyncCallback originalCallback) { + private AsyncAfterTask(Object[] states) { this.states = states; + } + + @Override + public void prepare(Exchange exchange, AsyncCallback originalCallback) { this.exchange = exchange; this.originalCallback = originalCallback; } @Override + public Object[] getStates() { + return states; + } + + @Override + public void reset() { + Arrays.fill(states, null); + this.exchange = null; + this.originalCallback = null; + } + + @Override public void done(boolean doneSync) { try { for (int i = advices.size() - 1, j = states.length - 1; i >= 0; i--) { @@ -213,6 +254,11 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In // ---------------------------------------------------------- // CAMEL END USER - DEBUG ME HERE +++ END +++ // ---------------------------------------------------------- + + // task is done so reset + if (taskFactory != null) { + taskFactory.release(this); + } } } } @@ -253,8 +299,19 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In return true; } + Object[] states; + + // create internal callback which will execute the advices in reverse order when done + CamelInternalTask afterTask = taskFactory != null ? taskFactory.acquire() : null; + if (afterTask == null) { + states = statefulAdvices > 0 ? new Object[statefulAdvices] : EMPTY_STATES; + afterTask = new AsyncAfterTask(states); + } else { + states = afterTask.getStates(); + } + afterTask.prepare(exchange, originalCallback); + // optimise to use object array for states, and only for the number of advices that keep state - final Object[] states = statefulAdvices > 0 ? new Object[statefulAdvices] : EMPTY_STATES; // optimise for loop using index access to avoid creating iterator object for (int i = 0, j = 0; i < advices.size(); i++) { CamelInternalProcessorAdvice<?> task = advices.get(i); @@ -270,10 +327,6 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In } } - // create internal callback which will execute the advices in reverse order when done - // TODO: pool this task, and the states array - AsyncCallback callback = new AsyncAfterTask(states, exchange, originalCallback); - if (exchange.isTransacted()) { // must be synchronized for transacted exchanges if (LOG.isTraceEnabled()) { @@ -291,14 +344,16 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In // ---------------------------------------------------------- // CAMEL END USER - DEBUG ME HERE +++ END +++ // ---------------------------------------------------------- - callback.done(true); + if (taskFactory != null) { + taskFactory.release(afterTask); + } return true; } else { final UnitOfWork uow = exchange.getUnitOfWork(); // do uow before processing and if a value is returned the the uow wants to be processed after // was well in the same thread - AsyncCallback async = callback; + AsyncCallback async = afterTask; boolean beforeAndAfter = uow != null && uow.isBeforeAfterProcess(); if (beforeAndAfter) { async = uow.beforeProcess(processor, exchange, async); @@ -318,7 +373,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In // optimize to only do after uow processing if really needed if (beforeAndAfter) { // execute any after processor work (in current thread, not in the callback) - uow.afterProcess(processor, exchange, callback, false); + uow.afterProcess(processor, exchange, afterTask, false); } if (LOG.isTraceEnabled()) { diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalTask.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalTask.java new file mode 100644 index 0000000..9fc5caa --- /dev/null +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalTask.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.engine; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; + +/** + * Task uses to hold state during {@link CamelInternalProcessor}. + */ +interface CamelInternalTask extends AsyncCallback { + + /** + * Prepares the task for the given exchange and its callback + * + * @param exchange the exchange + * @param callback the callback + */ + void prepare(Exchange exchange, AsyncCallback callback); + + /** + * Gets the states + */ + Object[] getStates(); + + /** + * Resets the task after its done and can be reused for another exchange. + */ + void reset(); + +} diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java index d1effc9..ab64529 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java @@ -16,9 +16,6 @@ */ package org.apache.camel.impl.engine; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; - import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; @@ -36,8 +33,6 @@ public final class PooledExchangeFactory extends PrototypeExchangeFactory { private static final Logger LOG = LoggerFactory.getLogger(PooledExchangeFactory.class); private final ReleaseOnDoneTask onDone = new ReleaseOnDoneTask(); - private BlockingQueue<Exchange> pool; - private int capacity = 100; public PooledExchangeFactory() { } @@ -47,12 +42,6 @@ public final class PooledExchangeFactory extends PrototypeExchangeFactory { } @Override - protected void doBuild() throws Exception { - super.doBuild(); - this.pool = new ArrayBlockingQueue<>(capacity); - } - - @Override public ExchangeFactory newExchangeFactory(Consumer consumer) { PooledExchangeFactory answer = new PooledExchangeFactory(consumer); answer.setCamelContext(camelContext); @@ -61,23 +50,6 @@ public final class PooledExchangeFactory extends PrototypeExchangeFactory { return answer; } - public int getCapacity() { - return capacity; - } - - @Override - public int getSize() { - if (pool != null) { - return pool.size(); - } else { - return 0; - } - } - - public void setCapacity(int capacity) { - this.capacity = capacity; - } - @Override public Exchange create(boolean autoRelease) { Exchange exchange = pool.poll(); @@ -163,13 +135,6 @@ public final class PooledExchangeFactory extends PrototypeExchangeFactory { } @Override - public void purge() { - if (pool != null) { - pool.clear(); - } - } - - @Override public boolean isPooled() { return true; } diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java index 6d6b3d1..728e31a 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java @@ -177,15 +177,30 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { return new PipelineTask(); } + + @Override + public String toString() { + return "PooledTaskFactory[capacity: " + getCapacity() + "]"; + } }; int capacity = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity(); taskFactory.setCapacity(capacity); } else { taskFactory = new PrototypeTaskFactory() { @Override + public boolean isPooled() { + return false; + } + + @Override public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { return new PipelineTask(); } + + @Override + public String toString() { + return "PrototypeTaskFactory"; + } }; } LOG.trace("Using TaskFactory: {}", taskFactory); diff --git a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java index 73953bb..67c6162 100644 --- a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java +++ b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java @@ -128,8 +128,8 @@ public final class DefaultConfigurationConfigurer { } else if ("prototype".equals(config.getExchangeFactory())) { ecc.setExchangeFactory(new PrototypeExchangeFactory()); } - ecc.getExchangeFactory().setStatisticsEnabled(config.isExchangeFactoryStatisticsEnabled()); ecc.getExchangeFactory().setCapacity(config.getExchangeFactoryCapacity()); + ecc.getExchangeFactory().setStatisticsEnabled(config.isExchangeFactoryStatisticsEnabled()); if (!config.isJmxEnabled()) { camelContext.disableJMX(); diff --git a/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java b/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java index 64867b3..6aa402c 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java @@ -35,7 +35,9 @@ public abstract class PooledObjectFactorySupport<T> extends ServiceSupport imple @Override protected void doBuild() throws Exception { super.doBuild(); - this.pool = new ArrayBlockingQueue<>(capacity); + if (isPooled()) { + this.pool = new ArrayBlockingQueue<>(capacity); + } } @Override @@ -83,8 +85,15 @@ public abstract class PooledObjectFactorySupport<T> extends ServiceSupport imple } @Override + public boolean isPooled() { + return true; + } + + @Override public void purge() { - pool.clear(); + if (pool != null) { + pool.clear(); + } } @Override @@ -96,7 +105,9 @@ public abstract class PooledObjectFactorySupport<T> extends ServiceSupport imple protected void doShutdown() throws Exception { super.doShutdown(); statistics.reset(); - pool.clear(); + if (pool != null) { + pool.clear(); + } } /** diff --git a/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java b/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java index 8c038e4..a2d6bfe 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java @@ -81,6 +81,11 @@ public abstract class PrototypeObjectFactorySupport<T> extends ServiceSupport im } @Override + public boolean isPooled() { + return false; + } + + @Override protected void doShutdown() throws Exception { super.doShutdown(); statistics.reset();