This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new ee5a18c  CAMEL-16279: camel-core - Optimize core to reduce object 
allocations by pooloing reusable tasks in the routing engine.
ee5a18c is described below

commit ee5a18c8b4989ce2933203c47220830c9c7a8a82
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Tue Mar 9 09:08:06 2021 +0100

    CAMEL-16279: camel-core - Optimize core to reduce object allocations by 
pooloing reusable tasks in the routing engine.
---
 .../java/org/apache/camel/spi/ExchangeFactory.java |  5 --
 .../org/apache/camel/spi/PooledObjectFactory.java  |  5 ++
 .../engine/CamelInternalPooledTaskFactory.java     | 47 +++++++++++++
 .../camel/impl/engine/CamelInternalProcessor.java  | 79 ++++++++++++++++++----
 .../camel/impl/engine/CamelInternalTask.java       | 45 ++++++++++++
 .../camel/impl/engine/PooledExchangeFactory.java   | 35 ----------
 .../java/org/apache/camel/processor/Pipeline.java  | 15 ++++
 .../camel/main/DefaultConfigurationConfigurer.java |  2 +-
 .../camel/support/PooledObjectFactorySupport.java  | 17 ++++-
 .../support/PrototypeObjectFactorySupport.java     |  5 ++
 10 files changed, 199 insertions(+), 56 deletions(-)

diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java 
b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
index 29b3fdb..75b9d21 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
@@ -79,9 +79,4 @@ public interface ExchangeFactory extends 
PooledObjectFactory<Exchange>, NonManag
         return true;
     }
 
-    /**
-     * Whether the factory is pooled.
-     */
-    boolean isPooled();
-
 }
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java 
b/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java
index db4c0d1..a9c38bc 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java
@@ -108,6 +108,11 @@ public interface PooledObjectFactory<T> extends Service, 
CamelContextAware {
     Statistics getStatistics();
 
     /**
+     * Whether the factory is pooled.
+     */
+    boolean isPooled();
+
+    /**
      * Acquires an object from the pool (if any)
      *
      * @return the object or <tt>null</tt> if the pool is empty
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalPooledTaskFactory.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalPooledTaskFactory.java
new file mode 100644
index 0000000..faace27
--- /dev/null
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalPooledTaskFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.engine;
+
+import org.apache.camel.support.PooledObjectFactorySupport;
+
+/**
+ * A pool for reusing {@link CamelInternalTask} to reduce object allocations.
+ */
+final class CamelInternalPooledTaskFactory extends 
PooledObjectFactorySupport<CamelInternalTask> {
+
+    @Override
+    public void setStatisticsEnabled(boolean statisticsEnabled) {
+        // we do not want to capture statistics so its disabled
+    }
+
+    @Override
+    public CamelInternalTask acquire() {
+        return pool.poll();
+    }
+
+    @Override
+    public boolean release(CamelInternalTask task) {
+        task.reset();
+        return pool.offer(task);
+    }
+
+    @Override
+    public String toString() {
+        return "CamelInternalPooledTaskFactory[capacity: " + getCapacity() + 
"]";
+    }
+
+}
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 9e847fe..79919ea 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -17,6 +17,7 @@
 package org.apache.camel.impl.engine;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -45,6 +46,7 @@ import org.apache.camel.spi.InflightRepository;
 import org.apache.camel.spi.InternalProcessor;
 import 
org.apache.camel.spi.ManagementInterceptStrategy.InstrumentationProcessor;
 import org.apache.camel.spi.MessageHistoryFactory;
+import org.apache.camel.spi.PooledObjectFactory;
 import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.RoutePolicy;
 import org.apache.camel.spi.ShutdownStrategy;
@@ -61,6 +63,7 @@ import org.apache.camel.support.OrderedComparator;
 import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.support.UnitOfWorkHelper;
 import org.apache.camel.support.processor.DelegateAsyncProcessor;
+import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -103,6 +106,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
     private final ShutdownStrategy shutdownStrategy;
     private final List<CamelInternalProcessorAdvice<?>> advices = new 
ArrayList<>();
     private byte statefulAdvices;
+    private PooledObjectFactory<CamelInternalTask> taskFactory;
 
     public CamelInternalProcessor(CamelContext camelContext) {
         this.camelContext = camelContext;
@@ -118,6 +122,27 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
     }
 
     @Override
+    protected void doBuild() throws Exception {
+        boolean pooled = 
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled();
+
+        // only create pooled task factory
+        if (pooled) {
+            taskFactory = new CamelInternalPooledTaskFactory();
+            int capacity = 
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity();
+            taskFactory.setCapacity(capacity);
+            LOG.trace("Using TaskFactory: {}", taskFactory);
+        }
+
+        ServiceHelper.buildService(taskFactory, processor);
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        super.doShutdown();
+        ServiceHelper.stopAndShutdownServices(taskFactory, processor);
+    }
+
+    @Override
     public void addAdvice(CamelInternalProcessorAdvice<?> advice) {
         advices.add(advice);
         // ensure advices are sorted so they are in the order we want
@@ -174,19 +199,35 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
     /**
      * Callback task to process the advices after processing.
      */
-    private final class AsyncAfterTask implements AsyncCallback {
+    private final class AsyncAfterTask implements CamelInternalTask {
 
         private final Object[] states;
-        private final Exchange exchange;
-        private final AsyncCallback originalCallback;
+        private Exchange exchange;
+        private AsyncCallback originalCallback;
 
-        private AsyncAfterTask(Object[] states, Exchange exchange, 
AsyncCallback originalCallback) {
+        private AsyncAfterTask(Object[] states) {
             this.states = states;
+        }
+
+        @Override
+        public void prepare(Exchange exchange, AsyncCallback originalCallback) 
{
             this.exchange = exchange;
             this.originalCallback = originalCallback;
         }
 
         @Override
+        public Object[] getStates() {
+            return states;
+        }
+
+        @Override
+        public void reset() {
+            Arrays.fill(states, null);
+            this.exchange = null;
+            this.originalCallback = null;
+        }
+
+        @Override
         public void done(boolean doneSync) {
             try {
                 for (int i = advices.size() - 1, j = states.length - 1; i >= 
0; i--) {
@@ -213,6 +254,11 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
                 // ----------------------------------------------------------
                 // CAMEL END USER - DEBUG ME HERE +++ END +++
                 // ----------------------------------------------------------
+
+                // task is done so reset
+                if (taskFactory != null) {
+                    taskFactory.release(this);
+                }
             }
         }
     }
@@ -253,8 +299,19 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
             return true;
         }
 
+        Object[] states;
+
+        // create internal callback which will execute the advices in reverse 
order when done
+        CamelInternalTask afterTask = taskFactory != null ? 
taskFactory.acquire() : null;
+        if (afterTask == null) {
+            states = statefulAdvices > 0 ? new Object[statefulAdvices] : 
EMPTY_STATES;
+            afterTask = new AsyncAfterTask(states);
+        } else {
+            states = afterTask.getStates();
+        }
+        afterTask.prepare(exchange, originalCallback);
+
         // optimise to use object array for states, and only for the number of 
advices that keep state
-        final Object[] states = statefulAdvices > 0 ? new 
Object[statefulAdvices] : EMPTY_STATES;
         // optimise for loop using index access to avoid creating iterator 
object
         for (int i = 0, j = 0; i < advices.size(); i++) {
             CamelInternalProcessorAdvice<?> task = advices.get(i);
@@ -270,10 +327,6 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
             }
         }
 
-        // create internal callback which will execute the advices in reverse 
order when done
-        // TODO: pool this task, and the states array
-        AsyncCallback callback = new AsyncAfterTask(states, exchange, 
originalCallback);
-
         if (exchange.isTransacted()) {
             // must be synchronized for transacted exchanges
             if (LOG.isTraceEnabled()) {
@@ -291,14 +344,16 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
             // ----------------------------------------------------------
             // CAMEL END USER - DEBUG ME HERE +++ END +++
             // ----------------------------------------------------------
-            callback.done(true);
+            if (taskFactory != null) {
+                taskFactory.release(afterTask);
+            }
             return true;
         } else {
             final UnitOfWork uow = exchange.getUnitOfWork();
 
             // do uow before processing and if a value is returned the the uow 
wants to be processed after
             // was well in the same thread
-            AsyncCallback async = callback;
+            AsyncCallback async = afterTask;
             boolean beforeAndAfter = uow != null && uow.isBeforeAfterProcess();
             if (beforeAndAfter) {
                 async = uow.beforeProcess(processor, exchange, async);
@@ -318,7 +373,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
             // optimize to only do after uow processing if really needed
             if (beforeAndAfter) {
                 // execute any after processor work (in current thread, not in 
the callback)
-                uow.afterProcess(processor, exchange, callback, false);
+                uow.afterProcess(processor, exchange, afterTask, false);
             }
 
             if (LOG.isTraceEnabled()) {
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalTask.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalTask.java
new file mode 100644
index 0000000..9fc5caa
--- /dev/null
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalTask.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.engine;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+
+/**
+ * Task uses to hold state during {@link CamelInternalProcessor}.
+ */
+interface CamelInternalTask extends AsyncCallback {
+
+    /**
+     * Prepares the task for the given exchange and its callback
+     *
+     * @param exchange the exchange
+     * @param callback the callback
+     */
+    void prepare(Exchange exchange, AsyncCallback callback);
+
+    /**
+     * Gets the states
+     */
+    Object[] getStates();
+
+    /**
+     * Resets the task after its done and can be reused for another exchange.
+     */
+    void reset();
+
+}
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index d1effc9..ab64529 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -16,9 +16,6 @@
  */
 package org.apache.camel.impl.engine;
 
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -36,8 +33,6 @@ public final class PooledExchangeFactory extends 
PrototypeExchangeFactory {
     private static final Logger LOG = 
LoggerFactory.getLogger(PooledExchangeFactory.class);
 
     private final ReleaseOnDoneTask onDone = new ReleaseOnDoneTask();
-    private BlockingQueue<Exchange> pool;
-    private int capacity = 100;
 
     public PooledExchangeFactory() {
     }
@@ -47,12 +42,6 @@ public final class PooledExchangeFactory extends 
PrototypeExchangeFactory {
     }
 
     @Override
-    protected void doBuild() throws Exception {
-        super.doBuild();
-        this.pool = new ArrayBlockingQueue<>(capacity);
-    }
-
-    @Override
     public ExchangeFactory newExchangeFactory(Consumer consumer) {
         PooledExchangeFactory answer = new PooledExchangeFactory(consumer);
         answer.setCamelContext(camelContext);
@@ -61,23 +50,6 @@ public final class PooledExchangeFactory extends 
PrototypeExchangeFactory {
         return answer;
     }
 
-    public int getCapacity() {
-        return capacity;
-    }
-
-    @Override
-    public int getSize() {
-        if (pool != null) {
-            return pool.size();
-        } else {
-            return 0;
-        }
-    }
-
-    public void setCapacity(int capacity) {
-        this.capacity = capacity;
-    }
-
     @Override
     public Exchange create(boolean autoRelease) {
         Exchange exchange = pool.poll();
@@ -163,13 +135,6 @@ public final class PooledExchangeFactory extends 
PrototypeExchangeFactory {
     }
 
     @Override
-    public void purge() {
-        if (pool != null) {
-            pool.clear();
-        }
-    }
-
-    @Override
     public boolean isPooled() {
         return true;
     }
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java
index 6d6b3d1..728e31a 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -177,15 +177,30 @@ public class Pipeline extends AsyncProcessorSupport 
implements Navigate<Processo
                 public PooledExchangeTask create(Exchange exchange, 
AsyncCallback callback) {
                     return new PipelineTask();
                 }
+
+                @Override
+                public String toString() {
+                    return "PooledTaskFactory[capacity: " + getCapacity() + 
"]";
+                }
             };
             int capacity = 
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity();
             taskFactory.setCapacity(capacity);
         } else {
             taskFactory = new PrototypeTaskFactory() {
                 @Override
+                public boolean isPooled() {
+                    return false;
+                }
+
+                @Override
                 public PooledExchangeTask create(Exchange exchange, 
AsyncCallback callback) {
                     return new PipelineTask();
                 }
+
+                @Override
+                public String toString() {
+                    return "PrototypeTaskFactory";
+                }
             };
         }
         LOG.trace("Using TaskFactory: {}", taskFactory);
diff --git 
a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
 
b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
index 73953bb..67c6162 100644
--- 
a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
+++ 
b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
@@ -128,8 +128,8 @@ public final class DefaultConfigurationConfigurer {
         } else if ("prototype".equals(config.getExchangeFactory())) {
             ecc.setExchangeFactory(new PrototypeExchangeFactory());
         }
-        
ecc.getExchangeFactory().setStatisticsEnabled(config.isExchangeFactoryStatisticsEnabled());
         
ecc.getExchangeFactory().setCapacity(config.getExchangeFactoryCapacity());
+        
ecc.getExchangeFactory().setStatisticsEnabled(config.isExchangeFactoryStatisticsEnabled());
 
         if (!config.isJmxEnabled()) {
             camelContext.disableJMX();
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java
index 64867b3..6aa402c 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java
@@ -35,7 +35,9 @@ public abstract class PooledObjectFactorySupport<T> extends 
ServiceSupport imple
     @Override
     protected void doBuild() throws Exception {
         super.doBuild();
-        this.pool = new ArrayBlockingQueue<>(capacity);
+        if (isPooled()) {
+            this.pool = new ArrayBlockingQueue<>(capacity);
+        }
     }
 
     @Override
@@ -83,8 +85,15 @@ public abstract class PooledObjectFactorySupport<T> extends 
ServiceSupport imple
     }
 
     @Override
+    public boolean isPooled() {
+        return true;
+    }
+
+    @Override
     public void purge() {
-        pool.clear();
+        if (pool != null) {
+            pool.clear();
+        }
     }
 
     @Override
@@ -96,7 +105,9 @@ public abstract class PooledObjectFactorySupport<T> extends 
ServiceSupport imple
     protected void doShutdown() throws Exception {
         super.doShutdown();
         statistics.reset();
-        pool.clear();
+        if (pool != null) {
+            pool.clear();
+        }
     }
 
     /**
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java
index 8c038e4..a2d6bfe 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java
@@ -81,6 +81,11 @@ public abstract class PrototypeObjectFactorySupport<T> 
extends ServiceSupport im
     }
 
     @Override
+    public boolean isPooled() {
+        return false;
+    }
+
+    @Override
     protected void doShutdown() throws Exception {
         super.doShutdown();
         statistics.reset();

Reply via email to