This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch opt-consumer in repository https://gitbox.apache.org/repos/asf/camel.git
commit a99945999a47a1ff6f2551c2bd85591a72b44ce8 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Mar 9 17:04:47 2021 +0100 CAMEL-16319: camel-core - Optimize consumer default done callback to reduce object allocations. --- .../apache/camel/component/ahc/ws/WsConsumer.java | 11 ++------ .../atmosphere/websocket/WebsocketConsumer.java | 33 ++++++---------------- .../ignite/events/IgniteEventsConsumer.java | 9 ++---- .../apache/camel/component/jbpm/JBPMConsumer.java | 13 ++------- .../camel/component/splunk/SplunkConsumer.java | 10 ++----- .../camel/component/undertow/UndertowConsumer.java | 23 ++++----------- .../vertx/websocket/VertxWebsocketConsumer.java | 10 ++----- .../camel/websocket/jsr356/JSR356Consumer.java | 9 +++--- .../component/websocket/WebsocketConsumer.java | 11 ++------ 9 files changed, 37 insertions(+), 92 deletions(-) diff --git a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java index 8bf9923..cab25ae 100644 --- a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java +++ b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java @@ -79,14 +79,9 @@ public class WsConsumer extends DefaultConsumer { exchange.getIn().setBody(message); } - // send exchange using the async routing engine - getAsyncProcessor().process(exchange, new AsyncCallback() { - public void done(boolean doneSync) { - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); - } - } - }); + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, true); + getAsyncProcessor().process(exchange, cb); } } diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java index 8957b6a..db10801 100644 --- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java +++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java @@ -90,14 +90,9 @@ public class WebsocketConsumer extends ServletConsumer { exchange.getIn().setHeader(WebsocketConstants.CONNECTION_KEY, connectionKey); exchange.getIn().setBody(message); - // send exchange using the async routing engine - getAsyncProcessor().process(exchange, new AsyncCallback() { - public void done(boolean doneSync) { - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); - } - } - }); + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, true); + getAsyncProcessor().process(exchange, cb); } public void sendEventNotification(String connectionKey, int eventType) { @@ -111,14 +106,9 @@ public class WebsocketConsumer extends ServletConsumer { exchange.getIn().setHeader(param.getKey(), param.getValue()); } - // send exchange using the async routing engine - getAsyncProcessor().process(exchange, new AsyncCallback() { - public void done(boolean doneSync) { - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); - } - } - }); + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, true); + getAsyncProcessor().process(exchange, cb); } public void sendNotDeliveredMessage(List<String> failedConnectionKeys, Object message) { @@ -129,14 +119,9 @@ public class WebsocketConsumer extends ServletConsumer { exchange.getIn().setHeader(WebsocketConstants.ERROR_TYPE, WebsocketConstants.MESSAGE_NOT_SENT_ERROR_TYPE); exchange.getIn().setBody(message); - // send exchange using the async routing engine - getAsyncProcessor().process(exchange, new AsyncCallback() { - public void done(boolean doneSync) { - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); - } - } - }); + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, true); + getAsyncProcessor().process(exchange, cb); } public boolean isEnableEventsResending() { diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java index 9b682f0..bcc8c45 100644 --- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java @@ -53,12 +53,9 @@ public class IgniteEventsConsumer extends DefaultConsumer { if (LOG.isTraceEnabled()) { LOG.trace("Processing Ignite Event: {}.", event); } - getAsyncProcessor().process(exchange, new AsyncCallback() { - @Override - public void done(boolean doneSync) { - // do nothing - } - }); + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, true); + getAsyncProcessor().process(exchange, cb); } catch (Exception e) { LOG.error(String.format("Exception while processing Ignite Event: %s.", event), e); } diff --git a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConsumer.java b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConsumer.java index a8d6c59..5e8d165 100644 --- a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConsumer.java +++ b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConsumer.java @@ -103,16 +103,9 @@ public class JBPMConsumer extends DefaultConsumer implements DeploymentEventList exchange.getIn().setBody(body); if (!endpoint.isSynchronous()) { - getAsyncProcessor().process(exchange, new AsyncCallback() { - @Override - public void done(boolean doneSync) { - // handle any thrown exception - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); - } - releaseExchange(exchange, false); - } - }); + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, false); + getAsyncProcessor().process(exchange, cb); } else { try { getProcessor().process(exchange); diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java index 72834d2..c7edcab 100644 --- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java +++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java @@ -70,13 +70,9 @@ public class SplunkConsumer extends ScheduledBatchPollingConsumer { Message message = exchange.getIn(); message.setBody(splunkEvent); - LOG.trace("Processing exchange [{}]...", exchange); - getAsyncProcessor().process(exchange, new AsyncCallback() { - @Override - public void done(boolean doneSync) { - LOG.trace("Done processing exchange [{}]...", exchange); - } - }); + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, true); + getAsyncProcessor().process(exchange, cb); } }); // Return 0: no exchanges returned by poll, as exchanges have been returned asynchronously diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java index dece6cc..719b8dd 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowConsumer.java @@ -289,15 +289,9 @@ public class UndertowConsumer extends DefaultConsumer implements HttpHandler, Su } exchange.getIn().setBody(message); - // send exchange using the async routing engine - getAsyncProcessor().process(exchange, new AsyncCallback() { - public void done(boolean doneSync) { - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, - exchange.getException()); - } - } - }); + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, true); + getAsyncProcessor().process(exchange, cb); } /** @@ -322,14 +316,9 @@ public class UndertowConsumer extends DefaultConsumer implements HttpHandler, Su if (transportExchange != null) { in.setHeader(UndertowConstants.EXCHANGE, transportExchange); } - // send exchange using the async routing engine - getAsyncProcessor().process(exchange, new AsyncCallback() { - public void done(boolean doneSync) { - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); - } - } - }); + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, true); + getAsyncProcessor().process(exchange, cb); } private Object getResponseBody(HttpServerExchange httpExchange, Exchange camelExchange) throws IOException { diff --git a/components/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java b/components/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java index 73eed0a..33af68a 100644 --- a/components/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java +++ b/components/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java @@ -63,13 +63,9 @@ public class VertxWebsocketConsumer extends DefaultConsumer { exchange.getMessage().setHeader(VertxWebsocketContants.CONNECTION_KEY, connectionKey); exchange.getMessage().setBody(message); - getAsyncProcessor().process(exchange, new AsyncCallback() { - public void done(boolean doneSync) { - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); - } - } - }); + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, true); + getAsyncProcessor().process(exchange, cb); } public void onException(String connectionKey, Throwable cause) { diff --git a/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Consumer.java b/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Consumer.java index ee43004..4d72ccb 100644 --- a/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Consumer.java +++ b/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Consumer.java @@ -23,6 +23,7 @@ import javax.websocket.ClientEndpointConfig; import javax.websocket.Session; import javax.websocket.server.ServerEndpointConfig; +import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.support.DefaultConsumer; @@ -37,11 +38,9 @@ public class JSR356Consumer extends DefaultConsumer { final Exchange exchange = createExchange(true); exchange.getIn().setHeader(JSR356Constants.SESSION, session); exchange.getIn().setBody(message); - getAsyncProcessor().process(exchange, doneSync -> { - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); - } - }); + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, true); + getAsyncProcessor().process(exchange, cb); }; JSR356Consumer(final JSR356Endpoint jsr356Endpoint, final Processor processor) { diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java index 80a5f31..4a7f1a4 100644 --- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java +++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java @@ -72,14 +72,9 @@ public class WebsocketConsumer extends DefaultConsumer implements WebsocketProdu exchange.getIn().setHeader(WebsocketConstants.CONNECTION_KEY, connectionKey); exchange.getIn().setBody(message); - // send exchange using the async routing engine - getAsyncProcessor().process(exchange, new AsyncCallback() { - public void done(boolean doneSync) { - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); - } - } - }); + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, true); + getAsyncProcessor().process(exchange, cb); } }