This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 13798aa92b575a2b968ea776974bef1d4bdd76a6 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Apr 5 10:15:58 2021 +0200 CAMEL-16450: camel-core - Optimize WireTap to support pooled tasks for reduced object allocations. --- .../apache/camel/processor/WireTapProcessor.java | 86 +++++++++++++++++----- 1 file changed, 66 insertions(+), 20 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java index e72526d..98017f6 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java @@ -30,6 +30,7 @@ import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.ExchangePropertyKey; import org.apache.camel.Expression; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; @@ -70,6 +71,7 @@ public class WireTapProcessor extends AsyncProcessorSupport private final ExecutorService executorService; private volatile boolean shutdownExecutorService; private final LongAdder taskCount = new LongAdder(); + private PooledExchangeTaskFactory taskFactory; // expression or processor used for populating a new exchange to send // as opposed to traditional wiretap that sends a copy of the original exchange @@ -92,6 +94,42 @@ public class WireTapProcessor extends AsyncProcessorSupport this.dynamicUri = dynamicUri; } + private final class WireTapTask implements PooledExchangeTask, Runnable { + + private Exchange exchange; + private final AsyncCallback callback = new AsyncCallback() { + @Override + public void done(boolean doneSync) { + if (exchange.getException() != null) { + String u = URISupport.sanitizeUri(uri); + LOG.warn("Error occurred during processing " + exchange + " wiretap to " + u + + ". This exception will be ignored.", + exchange.getException()); + } + taskCount.decrement(); + taskFactory.release(WireTapTask.this); + } + }; + + @Override + public void prepare(Exchange exchange, AsyncCallback callback) { + this.exchange = exchange; + // we use our own callback + } + + @Override + public void reset() { + this.exchange = null; + } + + @Override + public void run() { + taskCount.increment(); + LOG.debug(">>>> (wiretap) {} {}", uri, exchange); + asyncProcessor.process(exchange, callback); + } + } + @Override public String toString() { return id; @@ -172,23 +210,11 @@ public class WireTapProcessor extends AsyncProcessorSupport return true; } - final Exchange wireTapExchange = target; - // send the exchange to the destination using an executor service try { - executorService.submit(() -> { - taskCount.increment(); - LOG.debug(">>>> (wiretap) {} {}", uri, wireTapExchange); - asyncProcessor.process(wireTapExchange, doneSync -> { - if (wireTapExchange.getException() != null) { - String u = URISupport.sanitizeUri(uri); - LOG.warn("Error occurred during processing " + wireTapExchange + " wiretap to " + u - + ". This exception will be ignored.", - wireTapExchange.getException()); - } - taskCount.decrement(); - }); - }); + // create task which has state used during routing + PooledExchangeTask task = taskFactory.acquire(target, null); + executorService.submit(task); } catch (Throwable e) { // in case the thread pool rejects or cannot submit the task then we need to catch // so camel error handler can react @@ -331,27 +357,47 @@ public class WireTapProcessor extends AsyncProcessorSupport @Override protected void doBuild() throws Exception { - ServiceHelper.buildService(processor); + boolean pooled = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled(); + if (pooled) { + taskFactory = new PooledTaskFactory(getId()) { + @Override + public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { + return new WireTapTask(); + } + }; + int capacity = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity(); + taskFactory.setCapacity(capacity); + } else { + taskFactory = new PrototypeTaskFactory() { + @Override + public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { + return new WireTapTask(); + } + }; + } + LOG.trace("Using TaskFactory: {}", taskFactory); + + ServiceHelper.buildService(taskFactory, processor); } @Override protected void doInit() throws Exception { - ServiceHelper.initService(processor); + ServiceHelper.initService(taskFactory, processor); } @Override protected void doStart() throws Exception { - ServiceHelper.startService(processor); + ServiceHelper.startService(taskFactory, processor); } @Override protected void doStop() throws Exception { - ServiceHelper.stopService(processor); + ServiceHelper.stopService(taskFactory, processor); } @Override protected void doShutdown() throws Exception { - ServiceHelper.stopAndShutdownService(processor); + ServiceHelper.stopAndShutdownServices(taskFactory, processor); if (shutdownExecutorService) { getCamelContext().getExecutorServiceManager().shutdownNow(executorService); }