This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch opt-consumer
in repository https://gitbox.apache.org/repos/asf/camel.git

commit a99945999a47a1ff6f2551c2bd85591a72b44ce8
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Tue Mar 9 17:04:47 2021 +0100

    CAMEL-16319: camel-core - Optimize consumer default done callback to reduce 
object allocations.
---
 .../apache/camel/component/ahc/ws/WsConsumer.java  | 11 ++------
 .../atmosphere/websocket/WebsocketConsumer.java    | 33 ++++++----------------
 .../ignite/events/IgniteEventsConsumer.java        |  9 ++----
 .../apache/camel/component/jbpm/JBPMConsumer.java  | 13 ++-------
 .../camel/component/splunk/SplunkConsumer.java     | 10 ++-----
 .../camel/component/undertow/UndertowConsumer.java | 23 ++++-----------
 .../vertx/websocket/VertxWebsocketConsumer.java    | 10 ++-----
 .../camel/websocket/jsr356/JSR356Consumer.java     |  9 +++---
 .../component/websocket/WebsocketConsumer.java     | 11 ++------
 9 files changed, 37 insertions(+), 92 deletions(-)

diff --git 
a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java
 
b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java
index 8bf9923..cab25ae 100644
--- 
a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java
+++ 
b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java
@@ -79,14 +79,9 @@ public class WsConsumer extends DefaultConsumer {
             exchange.getIn().setBody(message);
         }
 
-        // send exchange using the async routing engine
-        getAsyncProcessor().process(exchange, new AsyncCallback() {
-            public void done(boolean doneSync) {
-                if (exchange.getException() != null) {
-                    getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
-                }
-            }
-        });
+        // use default consumer callback
+        AsyncCallback cb = defaultConsumerCallback(exchange, true);
+        getAsyncProcessor().process(exchange, cb);
     }
 
 }
diff --git 
a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java
 
b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java
index 8957b6a..db10801 100644
--- 
a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java
+++ 
b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java
@@ -90,14 +90,9 @@ public class WebsocketConsumer extends ServletConsumer {
         exchange.getIn().setHeader(WebsocketConstants.CONNECTION_KEY, 
connectionKey);
         exchange.getIn().setBody(message);
 
-        // send exchange using the async routing engine
-        getAsyncProcessor().process(exchange, new AsyncCallback() {
-            public void done(boolean doneSync) {
-                if (exchange.getException() != null) {
-                    getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
-                }
-            }
-        });
+        // use default consumer callback
+        AsyncCallback cb = defaultConsumerCallback(exchange, true);
+        getAsyncProcessor().process(exchange, cb);
     }
 
     public void sendEventNotification(String connectionKey, int eventType) {
@@ -111,14 +106,9 @@ public class WebsocketConsumer extends ServletConsumer {
             exchange.getIn().setHeader(param.getKey(), param.getValue());
         }
 
-        // send exchange using the async routing engine
-        getAsyncProcessor().process(exchange, new AsyncCallback() {
-            public void done(boolean doneSync) {
-                if (exchange.getException() != null) {
-                    getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
-                }
-            }
-        });
+        // use default consumer callback
+        AsyncCallback cb = defaultConsumerCallback(exchange, true);
+        getAsyncProcessor().process(exchange, cb);
     }
 
     public void sendNotDeliveredMessage(List<String> failedConnectionKeys, 
Object message) {
@@ -129,14 +119,9 @@ public class WebsocketConsumer extends ServletConsumer {
         exchange.getIn().setHeader(WebsocketConstants.ERROR_TYPE, 
WebsocketConstants.MESSAGE_NOT_SENT_ERROR_TYPE);
         exchange.getIn().setBody(message);
 
-        // send exchange using the async routing engine
-        getAsyncProcessor().process(exchange, new AsyncCallback() {
-            public void done(boolean doneSync) {
-                if (exchange.getException() != null) {
-                    getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
-                }
-            }
-        });
+        // use default consumer callback
+        AsyncCallback cb = defaultConsumerCallback(exchange, true);
+        getAsyncProcessor().process(exchange, cb);
     }
 
     public boolean isEnableEventsResending() {
diff --git 
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java
 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java
index 9b682f0..bcc8c45 100644
--- 
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java
+++ 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java
@@ -53,12 +53,9 @@ public class IgniteEventsConsumer extends DefaultConsumer {
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("Processing Ignite Event: {}.", event);
                 }
-                getAsyncProcessor().process(exchange, new AsyncCallback() {
-                    @Override
-                    public void done(boolean doneSync) {
-                        // do nothing
-                    }
-                });
+                // use default consumer callback
+                AsyncCallback cb = defaultConsumerCallback(exchange, true);
+                getAsyncProcessor().process(exchange, cb);
             } catch (Exception e) {
                 LOG.error(String.format("Exception while processing Ignite 
Event: %s.", event), e);
             }
diff --git 
a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConsumer.java
 
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConsumer.java
index a8d6c59..5e8d165 100644
--- 
a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConsumer.java
+++ 
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConsumer.java
@@ -103,16 +103,9 @@ public class JBPMConsumer extends DefaultConsumer 
implements DeploymentEventList
         exchange.getIn().setBody(body);
 
         if (!endpoint.isSynchronous()) {
-            getAsyncProcessor().process(exchange, new AsyncCallback() {
-                @Override
-                public void done(boolean doneSync) {
-                    // handle any thrown exception
-                    if (exchange.getException() != null) {
-                        getExceptionHandler().handleException("Error 
processing exchange", exchange, exchange.getException());
-                    }
-                    releaseExchange(exchange, false);
-                }
-            });
+            // use default consumer callback
+            AsyncCallback cb = defaultConsumerCallback(exchange, false);
+            getAsyncProcessor().process(exchange, cb);
         } else {
             try {
                 getProcessor().process(exchange);
diff --git 
a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java
 
b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java
index 72834d2..c7edcab 100644
--- 
a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java
+++ 
b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java
@@ -70,13 +70,9 @@ public class SplunkConsumer extends 
ScheduledBatchPollingConsumer {
                         Message message = exchange.getIn();
                         message.setBody(splunkEvent);
 
-                        LOG.trace("Processing exchange [{}]...", exchange);
-                        getAsyncProcessor().process(exchange, new 
AsyncCallback() {
-                            @Override
-                            public void done(boolean doneSync) {
-                                LOG.trace("Done processing exchange [{}]...", 
exchange);
-                            }
-                        });
+                        // use default consumer callback
+                        AsyncCallback cb = defaultConsumerCallback(exchange, 
true);
+                        getAsyncProcessor().process(exchange, cb);
                     }
                 });
                 // Return 0: no exchanges returned by poll, as exchanges have 
been returned asynchronously
diff --git 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java
 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java
index dece6cc..719b8dd 100644
--- 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java
+++ 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java
@@ -289,15 +289,9 @@ public class UndertowConsumer extends DefaultConsumer 
implements HttpHandler, Su
         }
         exchange.getIn().setBody(message);
 
-        // send exchange using the async routing engine
-        getAsyncProcessor().process(exchange, new AsyncCallback() {
-            public void done(boolean doneSync) {
-                if (exchange.getException() != null) {
-                    getExceptionHandler().handleException("Error processing 
exchange", exchange,
-                            exchange.getException());
-                }
-            }
-        });
+        // use default consumer callback
+        AsyncCallback cb = defaultConsumerCallback(exchange, true);
+        getAsyncProcessor().process(exchange, cb);
     }
 
     /**
@@ -322,14 +316,9 @@ public class UndertowConsumer extends DefaultConsumer 
implements HttpHandler, Su
         if (transportExchange != null) {
             in.setHeader(UndertowConstants.EXCHANGE, transportExchange);
         }
-        // send exchange using the async routing engine
-        getAsyncProcessor().process(exchange, new AsyncCallback() {
-            public void done(boolean doneSync) {
-                if (exchange.getException() != null) {
-                    getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
-                }
-            }
-        });
+        // use default consumer callback
+        AsyncCallback cb = defaultConsumerCallback(exchange, true);
+        getAsyncProcessor().process(exchange, cb);
     }
 
     private Object getResponseBody(HttpServerExchange httpExchange, Exchange 
camelExchange) throws IOException {
diff --git 
a/components/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java
 
b/components/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java
index 73eed0a..33af68a 100644
--- 
a/components/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java
+++ 
b/components/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java
@@ -63,13 +63,9 @@ public class VertxWebsocketConsumer extends DefaultConsumer {
         exchange.getMessage().setHeader(VertxWebsocketContants.CONNECTION_KEY, 
connectionKey);
         exchange.getMessage().setBody(message);
 
-        getAsyncProcessor().process(exchange, new AsyncCallback() {
-            public void done(boolean doneSync) {
-                if (exchange.getException() != null) {
-                    getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
-                }
-            }
-        });
+        // use default consumer callback
+        AsyncCallback cb = defaultConsumerCallback(exchange, true);
+        getAsyncProcessor().process(exchange, cb);
     }
 
     public void onException(String connectionKey, Throwable cause) {
diff --git 
a/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Consumer.java
 
b/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Consumer.java
index ee43004..4d72ccb 100644
--- 
a/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Consumer.java
+++ 
b/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Consumer.java
@@ -23,6 +23,7 @@ import javax.websocket.ClientEndpointConfig;
 import javax.websocket.Session;
 import javax.websocket.server.ServerEndpointConfig;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.support.DefaultConsumer;
@@ -37,11 +38,9 @@ public class JSR356Consumer extends DefaultConsumer {
         final Exchange exchange = createExchange(true);
         exchange.getIn().setHeader(JSR356Constants.SESSION, session);
         exchange.getIn().setBody(message);
-        getAsyncProcessor().process(exchange, doneSync -> {
-            if (exchange.getException() != null) {
-                getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
-            }
-        });
+        // use default consumer callback
+        AsyncCallback cb = defaultConsumerCallback(exchange, true);
+        getAsyncProcessor().process(exchange, cb);
     };
 
     JSR356Consumer(final JSR356Endpoint jsr356Endpoint, final Processor 
processor) {
diff --git 
a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java
 
b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java
index 80a5f31..4a7f1a4 100644
--- 
a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java
+++ 
b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java
@@ -72,14 +72,9 @@ public class WebsocketConsumer extends DefaultConsumer 
implements WebsocketProdu
         exchange.getIn().setHeader(WebsocketConstants.CONNECTION_KEY, 
connectionKey);
         exchange.getIn().setBody(message);
 
-        // send exchange using the async routing engine
-        getAsyncProcessor().process(exchange, new AsyncCallback() {
-            public void done(boolean doneSync) {
-                if (exchange.getException() != null) {
-                    getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
-                }
-            }
-        });
+        // use default consumer callback
+        AsyncCallback cb = defaultConsumerCallback(exchange, true);
+        getAsyncProcessor().process(exchange, cb);
     }
 
 }

Reply via email to