This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 13798aa92b575a2b968ea776974bef1d4bdd76a6
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Mon Apr 5 10:15:58 2021 +0200

    CAMEL-16450: camel-core - Optimize WireTap to support pooled tasks for 
reduced object allocations.
---
 .../apache/camel/processor/WireTapProcessor.java   | 86 +++++++++++++++++-----
 1 file changed, 66 insertions(+), 20 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index e72526d..98017f6 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -30,6 +30,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.Expression;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
@@ -70,6 +71,7 @@ public class WireTapProcessor extends AsyncProcessorSupport
     private final ExecutorService executorService;
     private volatile boolean shutdownExecutorService;
     private final LongAdder taskCount = new LongAdder();
+    private PooledExchangeTaskFactory taskFactory;
 
     // expression or processor used for populating a new exchange to send
     // as opposed to traditional wiretap that sends a copy of the original 
exchange
@@ -92,6 +94,42 @@ public class WireTapProcessor extends AsyncProcessorSupport
         this.dynamicUri = dynamicUri;
     }
 
+    private final class WireTapTask implements PooledExchangeTask, Runnable {
+
+        private Exchange exchange;
+        private final AsyncCallback callback = new AsyncCallback() {
+            @Override
+            public void done(boolean doneSync) {
+                if (exchange.getException() != null) {
+                    String u = URISupport.sanitizeUri(uri);
+                    LOG.warn("Error occurred during processing " + exchange + 
" wiretap to " + u
+                             + ". This exception will be ignored.",
+                            exchange.getException());
+                }
+                taskCount.decrement();
+                taskFactory.release(WireTapTask.this);
+            }
+        };
+
+        @Override
+        public void prepare(Exchange exchange, AsyncCallback callback) {
+            this.exchange = exchange;
+            // we use our own callback
+        }
+
+        @Override
+        public void reset() {
+            this.exchange = null;
+        }
+
+        @Override
+        public void run() {
+            taskCount.increment();
+            LOG.debug(">>>> (wiretap) {} {}", uri, exchange);
+            asyncProcessor.process(exchange, callback);
+        }
+    }
+
     @Override
     public String toString() {
         return id;
@@ -172,23 +210,11 @@ public class WireTapProcessor extends 
AsyncProcessorSupport
             return true;
         }
 
-        final Exchange wireTapExchange = target;
-
         // send the exchange to the destination using an executor service
         try {
-            executorService.submit(() -> {
-                taskCount.increment();
-                LOG.debug(">>>> (wiretap) {} {}", uri, wireTapExchange);
-                asyncProcessor.process(wireTapExchange, doneSync -> {
-                    if (wireTapExchange.getException() != null) {
-                        String u = URISupport.sanitizeUri(uri);
-                        LOG.warn("Error occurred during processing " + 
wireTapExchange + " wiretap to " + u
-                                 + ". This exception will be ignored.",
-                                wireTapExchange.getException());
-                    }
-                    taskCount.decrement();
-                });
-            });
+            // create task which has state used during routing
+            PooledExchangeTask task = taskFactory.acquire(target, null);
+            executorService.submit(task);
         } catch (Throwable e) {
             // in case the thread pool rejects or cannot submit the task then 
we need to catch
             // so camel error handler can react
@@ -331,27 +357,47 @@ public class WireTapProcessor extends 
AsyncProcessorSupport
 
     @Override
     protected void doBuild() throws Exception {
-        ServiceHelper.buildService(processor);
+        boolean pooled = 
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled();
+        if (pooled) {
+            taskFactory = new PooledTaskFactory(getId()) {
+                @Override
+                public PooledExchangeTask create(Exchange exchange, 
AsyncCallback callback) {
+                    return new WireTapTask();
+                }
+            };
+            int capacity = 
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity();
+            taskFactory.setCapacity(capacity);
+        } else {
+            taskFactory = new PrototypeTaskFactory() {
+                @Override
+                public PooledExchangeTask create(Exchange exchange, 
AsyncCallback callback) {
+                    return new WireTapTask();
+                }
+            };
+        }
+        LOG.trace("Using TaskFactory: {}", taskFactory);
+
+        ServiceHelper.buildService(taskFactory, processor);
     }
 
     @Override
     protected void doInit() throws Exception {
-        ServiceHelper.initService(processor);
+        ServiceHelper.initService(taskFactory, processor);
     }
 
     @Override
     protected void doStart() throws Exception {
-        ServiceHelper.startService(processor);
+        ServiceHelper.startService(taskFactory, processor);
     }
 
     @Override
     protected void doStop() throws Exception {
-        ServiceHelper.stopService(processor);
+        ServiceHelper.stopService(taskFactory, processor);
     }
 
     @Override
     protected void doShutdown() throws Exception {
-        ServiceHelper.stopAndShutdownService(processor);
+        ServiceHelper.stopAndShutdownServices(taskFactory, processor);
         if (shutdownExecutorService) {
             
getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
         }

Reply via email to