This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c77d3b  CAMEL-16446: camel-core - Optimize EIPs to eager load needed 
classes
0c77d3b is described below

commit 0c77d3be900427f77baba0bdc9b079b2eb0541da
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Sat Apr 3 12:13:23 2021 +0200

    CAMEL-16446: camel-core - Optimize EIPs to eager load needed classes
---
 .../apache/camel/processor/MulticastProcessor.java | 53 +++++++++++++++++++---
 .../camel/processor/RecipientListProcessor.java    |  4 ++
 .../java/org/apache/camel/processor/Splitter.java  | 24 ++++++++++
 .../apache/camel/support/cache/ServicePool.java    | 41 +++++++++++++----
 4 files changed, 106 insertions(+), 16 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 71d00ab..572fea9 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -52,6 +52,8 @@ import org.apache.camel.Route;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.StreamCache;
 import org.apache.camel.Traceable;
+import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy;
+import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
 import org.apache.camel.processor.errorhandler.ErrorHandlerSupport;
 import org.apache.camel.spi.ErrorHandlerAware;
 import org.apache.camel.spi.IdAware;
@@ -150,6 +152,14 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
 
     }
 
+    private final class Scheduler implements Executor {
+
+        @Override
+        public void execute(Runnable command) {
+            schedule(command);
+        }
+    }
+
     protected final Processor onPrepare;
     private final CamelContext camelContext;
     private final InternalProcessorFactory internalProcessorFactory;
@@ -167,6 +177,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
     private final boolean stopOnException;
     private final ExecutorService executorService;
     private final boolean shutdownExecutorService;
+    private final Scheduler scheduler = new Scheduler();
     private ExecutorService aggregateExecutorService;
     private boolean shutdownAggregateExecutorService;
     private final long timeout;
@@ -263,6 +274,23 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
     }
 
     @Override
+    protected void doBuild() throws Exception {
+        // eager load classes
+        Object dummy = new MulticastReactiveTask();
+        LOG.trace("Loaded {}", dummy.getClass().getName());
+        Object dummy2 = new MulticastTransactedTask();
+        LOG.trace("Loaded {}", dummy2.getClass().getName());
+        Object dummy3 = new UseOriginalAggregationStrategy();
+        LOG.trace("Loaded {}", dummy3.getClass().getName());
+        if (isShareUnitOfWork()) {
+            Object dummy4 = new ShareUnitOfWorkAggregationStrategy(null);
+            LOG.trace("Loaded {}", dummy4.getClass().getName());
+        }
+        Object dummy5 = new DefaultProcessorExchangePair(0, null, null, null);
+        LOG.trace("Loaded {}", dummy5.getClass().getName());
+    }
+
+    @Override
     protected void doInit() throws Exception {
         if (route != null) {
             Exchange exchange = new DefaultExchange(getCamelContext());
@@ -356,23 +384,30 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         final Iterable<ProcessorExchangePair> pairs;
         final AsyncCallback callback;
         final Iterator<ProcessorExchangePair> iterator;
-        final ReentrantLock lock;
-        final AsyncCompletionService<Exchange> completion;
-        final AtomicReference<Exchange> result;
+        final ReentrantLock lock = new ReentrantLock();
+        final AsyncCompletionService<Exchange> completion
+                = new AsyncCompletionService<>(scheduler, !isStreaming(), 
lock);
+        final AtomicReference<Exchange> result = new AtomicReference<>();
         final AtomicInteger nbExchangeSent = new AtomicInteger();
         final AtomicInteger nbAggregated = new AtomicInteger();
         final AtomicBoolean allSent = new AtomicBoolean();
         final AtomicBoolean done = new AtomicBoolean();
         final Map<String, String> mdc;
 
+        private MulticastTask() {
+            // used for eager classloading
+            this.original = null;
+            this.pairs = null;
+            this.callback = null;
+            this.iterator = null;
+            this.mdc = null;
+        }
+
         MulticastTask(Exchange original, Iterable<ProcessorExchangePair> 
pairs, AsyncCallback callback) {
             this.original = original;
             this.pairs = pairs;
             this.callback = callback;
             this.iterator = pairs.iterator();
-            this.lock = new ReentrantLock();
-            this.completion = new 
AsyncCompletionService<>(MulticastProcessor.this::schedule, !isStreaming(), 
lock);
-            this.result = new AtomicReference<>();
             if (timeout > 0) {
                 schedule(aggregateExecutorService, this::timeout, timeout, 
TimeUnit.MILLISECONDS);
             }
@@ -459,6 +494,9 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
      */
     protected class MulticastReactiveTask extends MulticastTask {
 
+        private MulticastReactiveTask() {
+        }
+
         public MulticastReactiveTask(Exchange original, 
Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
             super(original, pairs, callback);
         }
@@ -555,6 +593,9 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
      */
     protected class MulticastTransactedTask extends MulticastTask {
 
+        private MulticastTransactedTask() {
+        }
+
         public MulticastTransactedTask(Exchange original, 
Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
             super(original, pairs, callback);
         }
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index f2df1f6..e1dacf3 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -334,6 +334,10 @@ public class RecipientListProcessor extends 
MulticastProcessor {
     protected void doBuild() throws Exception {
         super.doBuild();
         ServiceHelper.buildService(producerCache);
+
+        // eager load classes
+        Object dummy = new RecipientProcessorExchangePair(0, null, null, null, 
null, null, null, false);
+        LOG.trace("Loaded {}", dummy.getClass().getName());
     }
 
     @Override
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
index 19c1ce7..58851cc 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
@@ -44,6 +44,8 @@ import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.ObjectHelper;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.StringHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.camel.util.ObjectHelper.notNull;
 
@@ -53,6 +55,8 @@ import static org.apache.camel.util.ObjectHelper.notNull;
  */
 public class Splitter extends MulticastProcessor implements AsyncProcessor, 
Traceable {
 
+    private static final Logger LOG = LoggerFactory.getLogger(Splitter.class);
+
     private static final String IGNORE_DELIMITER_MARKER = "false";
     private final Expression expression;
     private final String delimiter;
@@ -112,6 +116,14 @@ public class Splitter extends MulticastProcessor 
implements AsyncProcessor, Trac
     }
 
     @Override
+    protected void doBuild() throws Exception {
+        super.doBuild();
+        // eager load classes
+        Object dummy = new SplitterIterable();
+        LOG.trace("Loaded {}", dummy.getClass().getName());
+    }
+
+    @Override
     protected void doInit() throws Exception {
         super.doInit();
         expression.init(getCamelContext());
@@ -180,6 +192,18 @@ public class Splitter extends MulticastProcessor 
implements AsyncProcessor, Trac
         private final Route route;
         private final Exchange original;
 
+        private SplitterIterable() {
+            // used for eager classloading
+            value = null;
+            iterator = null;
+            copy = null;
+            route = null;
+            original = null;
+            // for loading classes from iterator
+            Object dummy = iterator();
+            LOG.trace("Loaded {}", dummy.getClass().getName());
+        }
+
         private SplitterIterable(Exchange exchange, Object value) {
             this.original = exchange;
             this.value = value;
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java
index 1a4d3ba..28e9ff2 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java
@@ -127,15 +127,11 @@ abstract class ServicePool<S extends Service> extends 
ServiceSupport implements
     }
 
     private Pool<S> getOrCreatePool(Endpoint endpoint) {
-        return pool.computeIfAbsent(endpoint, this::createPool);
-    }
-
-    private Pool<S> createPool(Endpoint endpoint) {
         boolean singleton = endpoint.isSingletonProducer();
         if (singleton) {
-            return new SinglePool(endpoint);
+            return pool.computeIfAbsent(endpoint, SinglePool::new);
         } else {
-            return new MultiplePool(endpoint);
+            return pool.computeIfAbsent(endpoint, MultiplePool::new);
         }
     }
 
@@ -157,6 +153,15 @@ abstract class ServicePool<S extends Service> extends 
ServiceSupport implements
     }
 
     @Override
+    protected void doBuild() throws Exception {
+        // eager load classes
+        SinglePool dummy = new SinglePool();
+        LOG.trace("Loaded {}", dummy.getClass().getName());
+        MultiplePool dummy2 = new MultiplePool();
+        LOG.trace("Loaded {}", dummy2.getClass().getName());
+    }
+
+    @Override
     protected void doStart() throws Exception {
         // noop
     }
@@ -194,6 +199,11 @@ abstract class ServicePool<S extends Service> extends 
ServiceSupport implements
         private final Endpoint endpoint;
         private volatile S s;
 
+        private SinglePool() {
+            // only used for eager classloading
+            this.endpoint = null;
+        }
+
         SinglePool(Endpoint endpoint) {
             this.endpoint = endpoint;
         }
@@ -251,11 +261,13 @@ abstract class ServicePool<S extends Service> extends 
ServiceSupport implements
         }
 
         private void cleanupEvicts() {
-            singlePoolEvicted.forEach((e, p) -> {
+            for (Map.Entry<Endpoint, Pool<S>> entry : 
singlePoolEvicted.entrySet()) {
+                Endpoint e = entry.getKey();
+                Pool<S> p = entry.getValue();
                 doStop(e);
                 p.stop();
                 singlePoolEvicted.remove(e);
-            });
+            }
         }
 
         void doStop(Service s) {
@@ -279,6 +291,13 @@ abstract class ServicePool<S extends Service> extends 
ServiceSupport implements
         private final BlockingQueue<S> queue;
         private final List<S> evicts;
 
+        private MultiplePool() {
+            // only used for eager classloading
+            this.endpoint = null;
+            this.queue = null;
+            this.evicts = null;
+        }
+
         MultiplePool(Endpoint endpoint) {
             this.endpoint = endpoint;
             this.queue = new ArrayBlockingQueue<>(capacity);
@@ -289,8 +308,10 @@ abstract class ServicePool<S extends Service> extends 
ServiceSupport implements
             if (!evicts.isEmpty()) {
                 synchronized (this) {
                     if (!evicts.isEmpty()) {
-                        evicts.forEach(this::doStop);
-                        evicts.forEach(queue::remove);
+                        for (S evict : evicts) {
+                            doStop(evict);
+                            queue.remove(evict);
+                        }
                         evicts.clear();
                         if (queue.isEmpty()) {
                             pool.remove(endpoint);

Reply via email to