This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 368f1b2 CAMEL-16353: camel-core - Force eager classloading in build phase 368f1b2 is described below commit 368f1b22366cf3ba595f03764c6a897cd858fa46 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sun Mar 14 12:13:48 2021 +0100 CAMEL-16353: camel-core - Force eager classloading in build phase --- .../java/org/apache/camel/ExtendedExchange.java | 14 +++++ .../org/apache/camel/support/AbstractExchange.java | 12 ++++ .../org/apache/camel/support/DefaultConsumer.java | 67 +++++++++++++++------- 3 files changed, 73 insertions(+), 20 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java index a898475..9ecd8e4 100644 --- a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java +++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java @@ -190,4 +190,18 @@ public interface ExtendedExchange extends Exchange { */ Map<String, Object> getInternalProperties(); + /** + * Callback used by {@link Consumer} if the consumer is completing the exchange processing with default behaviour. + * + * This is only used when pooled exchange is enabled for optimization and reducing object allocations. + */ + AsyncCallback getDefaultConsumerCallback(); + + /** + * Callback used by {@link Consumer} if the consumer is completing the exchange processing with default behaviour. + * + * This is only used when pooled exchange is enabled for optimization and reducing object allocations. + */ + void setDefaultConsumerCallback(AsyncCallback callback); + } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java index c2a532f..714a01b 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java @@ -25,6 +25,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; import org.apache.camel.CamelExecutionException; import org.apache.camel.Endpoint; @@ -83,6 +84,7 @@ class AbstractExchange implements ExtendedExchange { boolean interruptable = true; boolean redeliveryExhausted; Boolean errorHandlerHandled; + AsyncCallback defaultConsumerCallback; // optimize (do not reset) public AbstractExchange(CamelContext context) { this.context = context; @@ -847,6 +849,16 @@ class AbstractExchange implements ExtendedExchange { return map; } + @Override + public AsyncCallback getDefaultConsumerCallback() { + return defaultConsumerCallback; + } + + @Override + public void setDefaultConsumerCallback(AsyncCallback defaultConsumerCallback) { + this.defaultConsumerCallback = defaultConsumerCallback; + } + protected String createExchangeId() { return context.getUuidGenerator().generateUuid(); } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java index 762a9c5..8238f8c 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java @@ -16,8 +16,6 @@ */ package org.apache.camel.support; -import java.util.concurrent.atomic.AtomicReference; - import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Consumer; @@ -51,7 +49,6 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw private final Processor processor; private final AsyncProcessor asyncProcessor; private final ExchangeFactory exchangeFactory; - private final AtomicReference<AsyncCallback> pooledCallback = new AtomicReference<>(); private ExceptionHandler exceptionHandler; private Route route; private String routeId; @@ -101,7 +98,6 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw * @param exchange the exchange * @return the created and started unit of work * @throws Exception is thrown if error starting the unit of work - * * @see #doneUoW(org.apache.camel.Exchange) */ public UnitOfWork createUoW(Exchange exchange) throws Exception { @@ -122,7 +118,6 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw * then this method should be executed when the consumer is finished processing the message. * * @param exchange the exchange - * * @see #createUoW(org.apache.camel.Exchange) */ public void doneUoW(Exchange exchange) { @@ -151,21 +146,17 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw @Override public AsyncCallback defaultConsumerCallback(Exchange exchange, boolean autoRelease) { boolean pooled = exchangeFactory.isPooled(); - AsyncCallback answer = pooled ? pooledCallback.get() : null; - if (answer == null) { - answer = doneSync -> { - // handle any thrown exception - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, - exchange.getException()); - } - releaseExchange(exchange, autoRelease); - }; - if (pooled) { - pooledCallback.set(answer); + if (pooled) { + ExtendedExchange ee = exchange.adapt(ExtendedExchange.class); + AsyncCallback answer = ee.getDefaultConsumerCallback(); + if (answer == null) { + answer = new DefaultConsumerCallback(this, exchange, autoRelease); + ee.setDefaultConsumerCallback(answer); } + return answer; + } else { + return new DefaultConsumerCallback(this, exchange, autoRelease); } - return answer; } @Override @@ -198,6 +189,11 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw protected void doBuild() throws Exception { LOG.debug("Build consumer: {}", this); ServiceHelper.buildService(exchangeFactory, processor); + + // force to create and load the class during build time so the JVM does not + // load the class on first exchange to be created + Object dummy = new DefaultConsumerCallback(this, null, false); + LOG.trace("Warming up DefaultConsumer loaded class: {}", dummy.getClass().getName()); } @Override @@ -223,12 +219,11 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw protected void doShutdown() throws Exception { LOG.debug("Shutting down consumer: {}", this); ServiceHelper.stopAndShutdownServices(exchangeFactory, processor); - pooledCallback.set(null); } /** * Handles the given exception using the {@link #getExceptionHandler()} - * + * * @param t the exception to handle */ protected void handleException(Throwable t) { @@ -246,4 +241,36 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw Throwable newt = (t == null) ? new IllegalArgumentException("Handling [null] exception") : t; getExceptionHandler().handleException(message, newt); } + + private static final class DefaultConsumerCallback implements AsyncCallback { + + private final DefaultConsumer consumer; + private final Exchange exchange; + private final boolean autoRelease; + + public DefaultConsumerCallback(DefaultConsumer consumer, Exchange exchange, boolean autoRelease) { + this.consumer = consumer; + this.exchange = exchange; + this.autoRelease = autoRelease; + } + + @Override + public void done(boolean doneSync) { + try { + // handle any thrown exception + if (exchange.getException() != null) { + consumer.getExceptionHandler().handleException("Error processing exchange", exchange, + exchange.getException()); + } + } finally { + consumer.releaseExchange(exchange, autoRelease); + } + } + + @Override + public String toString() { + return "DefaultConsumerCallback"; + } + } + }