This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 368f1b2  CAMEL-16353: camel-core - Force eager classloading in build 
phase
368f1b2 is described below

commit 368f1b22366cf3ba595f03764c6a897cd858fa46
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Sun Mar 14 12:13:48 2021 +0100

    CAMEL-16353: camel-core - Force eager classloading in build phase
---
 .../java/org/apache/camel/ExtendedExchange.java    | 14 +++++
 .../org/apache/camel/support/AbstractExchange.java | 12 ++++
 .../org/apache/camel/support/DefaultConsumer.java  | 67 +++++++++++++++-------
 3 files changed, 73 insertions(+), 20 deletions(-)

diff --git 
a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java 
b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
index a898475..9ecd8e4 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
@@ -190,4 +190,18 @@ public interface ExtendedExchange extends Exchange {
      */
     Map<String, Object> getInternalProperties();
 
+    /**
+     * Callback used by {@link Consumer} if the consumer is completing the 
exchange processing with default behaviour.
+     *
+     * This is only used when pooled exchange is enabled for optimization and 
reducing object allocations.
+     */
+    AsyncCallback getDefaultConsumerCallback();
+
+    /**
+     * Callback used by {@link Consumer} if the consumer is completing the 
exchange processing with default behaviour.
+     *
+     * This is only used when pooled exchange is enabled for optimization and 
reducing object allocations.
+     */
+    void setDefaultConsumerCallback(AsyncCallback callback);
+
 }
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
index c2a532f..714a01b 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelExecutionException;
 import org.apache.camel.Endpoint;
@@ -83,6 +84,7 @@ class AbstractExchange implements ExtendedExchange {
     boolean interruptable = true;
     boolean redeliveryExhausted;
     Boolean errorHandlerHandled;
+    AsyncCallback defaultConsumerCallback; // optimize (do not reset)
 
     public AbstractExchange(CamelContext context) {
         this.context = context;
@@ -847,6 +849,16 @@ class AbstractExchange implements ExtendedExchange {
         return map;
     }
 
+    @Override
+    public AsyncCallback getDefaultConsumerCallback() {
+        return defaultConsumerCallback;
+    }
+
+    @Override
+    public void setDefaultConsumerCallback(AsyncCallback 
defaultConsumerCallback) {
+        this.defaultConsumerCallback = defaultConsumerCallback;
+    }
+
     protected String createExchangeId() {
         return context.getUuidGenerator().generateUuid();
     }
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
index 762a9c5..8238f8c 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.support;
 
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Consumer;
@@ -51,7 +49,6 @@ public class DefaultConsumer extends ServiceSupport 
implements Consumer, RouteAw
     private final Processor processor;
     private final AsyncProcessor asyncProcessor;
     private final ExchangeFactory exchangeFactory;
-    private final AtomicReference<AsyncCallback> pooledCallback = new 
AtomicReference<>();
     private ExceptionHandler exceptionHandler;
     private Route route;
     private String routeId;
@@ -101,7 +98,6 @@ public class DefaultConsumer extends ServiceSupport 
implements Consumer, RouteAw
      * @param  exchange  the exchange
      * @return           the created and started unit of work
      * @throws Exception is thrown if error starting the unit of work
-     *
      * @see              #doneUoW(org.apache.camel.Exchange)
      */
     public UnitOfWork createUoW(Exchange exchange) throws Exception {
@@ -122,7 +118,6 @@ public class DefaultConsumer extends ServiceSupport 
implements Consumer, RouteAw
      * then this method should be executed when the consumer is finished 
processing the message.
      *
      * @param exchange the exchange
-     *
      * @see            #createUoW(org.apache.camel.Exchange)
      */
     public void doneUoW(Exchange exchange) {
@@ -151,21 +146,17 @@ public class DefaultConsumer extends ServiceSupport 
implements Consumer, RouteAw
     @Override
     public AsyncCallback defaultConsumerCallback(Exchange exchange, boolean 
autoRelease) {
         boolean pooled = exchangeFactory.isPooled();
-        AsyncCallback answer = pooled ? pooledCallback.get() : null;
-        if (answer == null) {
-            answer = doneSync -> {
-                // handle any thrown exception
-                if (exchange.getException() != null) {
-                    getExceptionHandler().handleException("Error processing 
exchange", exchange,
-                            exchange.getException());
-                }
-                releaseExchange(exchange, autoRelease);
-            };
-            if (pooled) {
-                pooledCallback.set(answer);
+        if (pooled) {
+            ExtendedExchange ee = exchange.adapt(ExtendedExchange.class);
+            AsyncCallback answer = ee.getDefaultConsumerCallback();
+            if (answer == null) {
+                answer = new DefaultConsumerCallback(this, exchange, 
autoRelease);
+                ee.setDefaultConsumerCallback(answer);
             }
+            return answer;
+        } else {
+            return new DefaultConsumerCallback(this, exchange, autoRelease);
         }
-        return answer;
     }
 
     @Override
@@ -198,6 +189,11 @@ public class DefaultConsumer extends ServiceSupport 
implements Consumer, RouteAw
     protected void doBuild() throws Exception {
         LOG.debug("Build consumer: {}", this);
         ServiceHelper.buildService(exchangeFactory, processor);
+
+        // force to create and load the class during build time so the JVM 
does not
+        // load the class on first exchange to be created
+        Object dummy = new DefaultConsumerCallback(this, null, false);
+        LOG.trace("Warming up DefaultConsumer loaded class: {}", 
dummy.getClass().getName());
     }
 
     @Override
@@ -223,12 +219,11 @@ public class DefaultConsumer extends ServiceSupport 
implements Consumer, RouteAw
     protected void doShutdown() throws Exception {
         LOG.debug("Shutting down consumer: {}", this);
         ServiceHelper.stopAndShutdownServices(exchangeFactory, processor);
-        pooledCallback.set(null);
     }
 
     /**
      * Handles the given exception using the {@link #getExceptionHandler()}
-     * 
+     *
      * @param t the exception to handle
      */
     protected void handleException(Throwable t) {
@@ -246,4 +241,36 @@ public class DefaultConsumer extends ServiceSupport 
implements Consumer, RouteAw
         Throwable newt = (t == null) ? new IllegalArgumentException("Handling 
[null] exception") : t;
         getExceptionHandler().handleException(message, newt);
     }
+
+    private static final class DefaultConsumerCallback implements 
AsyncCallback {
+
+        private final DefaultConsumer consumer;
+        private final Exchange exchange;
+        private final boolean autoRelease;
+
+        public DefaultConsumerCallback(DefaultConsumer consumer, Exchange 
exchange, boolean autoRelease) {
+            this.consumer = consumer;
+            this.exchange = exchange;
+            this.autoRelease = autoRelease;
+        }
+
+        @Override
+        public void done(boolean doneSync) {
+            try {
+                // handle any thrown exception
+                if (exchange.getException() != null) {
+                    consumer.getExceptionHandler().handleException("Error 
processing exchange", exchange,
+                            exchange.getException());
+                }
+            } finally {
+                consumer.releaseExchange(exchange, autoRelease);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "DefaultConsumerCallback";
+        }
+    }
+
 }

Reply via email to