This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 0c77d3b CAMEL-16446: camel-core - Optimize EIPs to eager load needed classes 0c77d3b is described below commit 0c77d3be900427f77baba0bdc9b079b2eb0541da Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sat Apr 3 12:13:23 2021 +0200 CAMEL-16446: camel-core - Optimize EIPs to eager load needed classes --- .../apache/camel/processor/MulticastProcessor.java | 53 +++++++++++++++++++--- .../camel/processor/RecipientListProcessor.java | 4 ++ .../java/org/apache/camel/processor/Splitter.java | 24 ++++++++++ .../apache/camel/support/cache/ServicePool.java | 41 +++++++++++++---- 4 files changed, 106 insertions(+), 16 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 71d00ab..572fea9 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -52,6 +52,8 @@ import org.apache.camel.Route; import org.apache.camel.RuntimeCamelException; import org.apache.camel.StreamCache; import org.apache.camel.Traceable; +import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy; +import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy; import org.apache.camel.processor.errorhandler.ErrorHandlerSupport; import org.apache.camel.spi.ErrorHandlerAware; import org.apache.camel.spi.IdAware; @@ -150,6 +152,14 @@ public class MulticastProcessor extends AsyncProcessorSupport } + private final class Scheduler implements Executor { + + @Override + public void execute(Runnable command) { + schedule(command); + } + } + protected final Processor onPrepare; private final CamelContext camelContext; private final InternalProcessorFactory internalProcessorFactory; @@ -167,6 +177,7 @@ public class MulticastProcessor extends AsyncProcessorSupport private final boolean stopOnException; private final ExecutorService executorService; private final boolean shutdownExecutorService; + private final Scheduler scheduler = new Scheduler(); private ExecutorService aggregateExecutorService; private boolean shutdownAggregateExecutorService; private final long timeout; @@ -263,6 +274,23 @@ public class MulticastProcessor extends AsyncProcessorSupport } @Override + protected void doBuild() throws Exception { + // eager load classes + Object dummy = new MulticastReactiveTask(); + LOG.trace("Loaded {}", dummy.getClass().getName()); + Object dummy2 = new MulticastTransactedTask(); + LOG.trace("Loaded {}", dummy2.getClass().getName()); + Object dummy3 = new UseOriginalAggregationStrategy(); + LOG.trace("Loaded {}", dummy3.getClass().getName()); + if (isShareUnitOfWork()) { + Object dummy4 = new ShareUnitOfWorkAggregationStrategy(null); + LOG.trace("Loaded {}", dummy4.getClass().getName()); + } + Object dummy5 = new DefaultProcessorExchangePair(0, null, null, null); + LOG.trace("Loaded {}", dummy5.getClass().getName()); + } + + @Override protected void doInit() throws Exception { if (route != null) { Exchange exchange = new DefaultExchange(getCamelContext()); @@ -356,23 +384,30 @@ public class MulticastProcessor extends AsyncProcessorSupport final Iterable<ProcessorExchangePair> pairs; final AsyncCallback callback; final Iterator<ProcessorExchangePair> iterator; - final ReentrantLock lock; - final AsyncCompletionService<Exchange> completion; - final AtomicReference<Exchange> result; + final ReentrantLock lock = new ReentrantLock(); + final AsyncCompletionService<Exchange> completion + = new AsyncCompletionService<>(scheduler, !isStreaming(), lock); + final AtomicReference<Exchange> result = new AtomicReference<>(); final AtomicInteger nbExchangeSent = new AtomicInteger(); final AtomicInteger nbAggregated = new AtomicInteger(); final AtomicBoolean allSent = new AtomicBoolean(); final AtomicBoolean done = new AtomicBoolean(); final Map<String, String> mdc; + private MulticastTask() { + // used for eager classloading + this.original = null; + this.pairs = null; + this.callback = null; + this.iterator = null; + this.mdc = null; + } + MulticastTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) { this.original = original; this.pairs = pairs; this.callback = callback; this.iterator = pairs.iterator(); - this.lock = new ReentrantLock(); - this.completion = new AsyncCompletionService<>(MulticastProcessor.this::schedule, !isStreaming(), lock); - this.result = new AtomicReference<>(); if (timeout > 0) { schedule(aggregateExecutorService, this::timeout, timeout, TimeUnit.MILLISECONDS); } @@ -459,6 +494,9 @@ public class MulticastProcessor extends AsyncProcessorSupport */ protected class MulticastReactiveTask extends MulticastTask { + private MulticastReactiveTask() { + } + public MulticastReactiveTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) { super(original, pairs, callback); } @@ -555,6 +593,9 @@ public class MulticastProcessor extends AsyncProcessorSupport */ protected class MulticastTransactedTask extends MulticastTask { + private MulticastTransactedTask() { + } + public MulticastTransactedTask(Exchange original, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) { super(original, pairs, callback); } diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java index f2df1f6..e1dacf3 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java @@ -334,6 +334,10 @@ public class RecipientListProcessor extends MulticastProcessor { protected void doBuild() throws Exception { super.doBuild(); ServiceHelper.buildService(producerCache); + + // eager load classes + Object dummy = new RecipientProcessorExchangePair(0, null, null, null, null, null, null, false); + LOG.trace("Loaded {}", dummy.getClass().getName()); } @Override diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java index 19c1ce7..58851cc 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java @@ -44,6 +44,8 @@ import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.ObjectHelper; import org.apache.camel.util.IOHelper; import org.apache.camel.util.StringHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.camel.util.ObjectHelper.notNull; @@ -53,6 +55,8 @@ import static org.apache.camel.util.ObjectHelper.notNull; */ public class Splitter extends MulticastProcessor implements AsyncProcessor, Traceable { + private static final Logger LOG = LoggerFactory.getLogger(Splitter.class); + private static final String IGNORE_DELIMITER_MARKER = "false"; private final Expression expression; private final String delimiter; @@ -112,6 +116,14 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac } @Override + protected void doBuild() throws Exception { + super.doBuild(); + // eager load classes + Object dummy = new SplitterIterable(); + LOG.trace("Loaded {}", dummy.getClass().getName()); + } + + @Override protected void doInit() throws Exception { super.doInit(); expression.init(getCamelContext()); @@ -180,6 +192,18 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac private final Route route; private final Exchange original; + private SplitterIterable() { + // used for eager classloading + value = null; + iterator = null; + copy = null; + route = null; + original = null; + // for loading classes from iterator + Object dummy = iterator(); + LOG.trace("Loaded {}", dummy.getClass().getName()); + } + private SplitterIterable(Exchange exchange, Object value) { this.original = exchange; this.value = value; diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java b/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java index 1a4d3ba..28e9ff2 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java @@ -127,15 +127,11 @@ abstract class ServicePool<S extends Service> extends ServiceSupport implements } private Pool<S> getOrCreatePool(Endpoint endpoint) { - return pool.computeIfAbsent(endpoint, this::createPool); - } - - private Pool<S> createPool(Endpoint endpoint) { boolean singleton = endpoint.isSingletonProducer(); if (singleton) { - return new SinglePool(endpoint); + return pool.computeIfAbsent(endpoint, SinglePool::new); } else { - return new MultiplePool(endpoint); + return pool.computeIfAbsent(endpoint, MultiplePool::new); } } @@ -157,6 +153,15 @@ abstract class ServicePool<S extends Service> extends ServiceSupport implements } @Override + protected void doBuild() throws Exception { + // eager load classes + SinglePool dummy = new SinglePool(); + LOG.trace("Loaded {}", dummy.getClass().getName()); + MultiplePool dummy2 = new MultiplePool(); + LOG.trace("Loaded {}", dummy2.getClass().getName()); + } + + @Override protected void doStart() throws Exception { // noop } @@ -194,6 +199,11 @@ abstract class ServicePool<S extends Service> extends ServiceSupport implements private final Endpoint endpoint; private volatile S s; + private SinglePool() { + // only used for eager classloading + this.endpoint = null; + } + SinglePool(Endpoint endpoint) { this.endpoint = endpoint; } @@ -251,11 +261,13 @@ abstract class ServicePool<S extends Service> extends ServiceSupport implements } private void cleanupEvicts() { - singlePoolEvicted.forEach((e, p) -> { + for (Map.Entry<Endpoint, Pool<S>> entry : singlePoolEvicted.entrySet()) { + Endpoint e = entry.getKey(); + Pool<S> p = entry.getValue(); doStop(e); p.stop(); singlePoolEvicted.remove(e); - }); + } } void doStop(Service s) { @@ -279,6 +291,13 @@ abstract class ServicePool<S extends Service> extends ServiceSupport implements private final BlockingQueue<S> queue; private final List<S> evicts; + private MultiplePool() { + // only used for eager classloading + this.endpoint = null; + this.queue = null; + this.evicts = null; + } + MultiplePool(Endpoint endpoint) { this.endpoint = endpoint; this.queue = new ArrayBlockingQueue<>(capacity); @@ -289,8 +308,10 @@ abstract class ServicePool<S extends Service> extends ServiceSupport implements if (!evicts.isEmpty()) { synchronized (this) { if (!evicts.isEmpty()) { - evicts.forEach(this::doStop); - evicts.forEach(queue::remove); + for (S evict : evicts) { + doStop(evict); + queue.remove(evict); + } evicts.clear(); if (queue.isEmpty()) { pool.remove(endpoint);