This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new a7243c1 CAMEL-16455: Optimize CircuitBreaker EIP with task pooling a7243c1 is described below commit a7243c15068752eaf51953f727908989137ecb2b Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Apr 8 16:24:43 2021 +0200 CAMEL-16455: Optimize CircuitBreaker EIP with task pooling --- .../faulttolerance/FaultToleranceProcessor.java | 111 ++++++++++++++++----- .../resilience4j/ResilienceProcessor.java | 1 + 2 files changed, 88 insertions(+), 24 deletions(-) diff --git a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java index 97a64f6..ad51366 100644 --- a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java +++ b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java @@ -44,6 +44,10 @@ import org.apache.camel.Processor; import org.apache.camel.RuntimeExchangeException; import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.processor.PooledExchangeTask; +import org.apache.camel.processor.PooledExchangeTaskFactory; +import org.apache.camel.processor.PooledTaskFactory; +import org.apache.camel.processor.PrototypeTaskFactory; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.ProcessorExchangeFactory; import org.apache.camel.spi.RouteIdAware; @@ -81,6 +85,8 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport private ExecutorService executorService; private boolean shutdownExecutorService; private ProcessorExchangeFactory processorExchangeFactory; + private PooledExchangeTaskFactory taskFactory; + private PooledExchangeTaskFactory fallbackTaskFactory; public FaultToleranceProcessor(FaultToleranceConfiguration config, Processor processor, Processor fallbackProcessor) { @@ -223,7 +229,8 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport // Camel error handler exchange.setProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK, true); - Callable<Exchange> task = new CircuitBreakerTask(processorExchangeFactory, processor, exchange); + CircuitBreakerFallbackTask fallbackTask = null; + CircuitBreakerTask task = (CircuitBreakerTask) taskFactory.acquire(exchange, callback); // circuit breaker FaultToleranceStrategy target = circuitBreaker; @@ -241,10 +248,11 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport } // 3. fallback if (fallbackProcessor != null) { - Callable fallbackTask = new CircuitBreakerFallbackTask(fallbackProcessor, exchange); + fallbackTask = (CircuitBreakerFallbackTask) fallbackTaskFactory.acquire(exchange, callback); + final CircuitBreakerFallbackTask fFallbackTask = fallbackTask; target = new Fallback(target, "fallback", fallbackContext -> { exchange.setException(fallbackContext.failure); - return fallbackTask.call(); + return fFallbackTask.call(); }, SetOfThrowables.ALL, SetOfThrowables.EMPTY, null); } @@ -259,6 +267,11 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport } catch (Exception e) { // some other kind of exception exchange.setException(e); + } finally { + taskFactory.release(task); + if (fallbackTask != null) { + fallbackTaskFactory.release(fallbackTask); + } } exchange.removeProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK); @@ -270,13 +283,45 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport protected void doBuild() throws Exception { ObjectHelper.notNull(camelContext, "CamelContext", this); + boolean pooled = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled(); + if (pooled) { + int capacity = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity(); + taskFactory = new PooledTaskFactory(getId()) { + @Override + public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { + return new CircuitBreakerTask(); + } + }; + taskFactory.setCapacity(capacity); + fallbackTaskFactory = new PooledTaskFactory(getId()) { + @Override + public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { + return new CircuitBreakerFallbackTask(); + } + }; + fallbackTaskFactory.setCapacity(capacity); + } else { + taskFactory = new PrototypeTaskFactory() { + @Override + public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { + return new CircuitBreakerTask(); + } + }; + fallbackTaskFactory = new PrototypeTaskFactory() { + @Override + public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { + return new CircuitBreakerFallbackTask(); + } + }; + } + // create a per processor exchange factory this.processorExchangeFactory = getCamelContext().adapt(ExtendedCamelContext.class) .getProcessorExchangeFactory().newProcessorExchangeFactory(this); this.processorExchangeFactory.setRouteId(getRouteId()); this.processorExchangeFactory.setId(getId()); - ServiceHelper.buildService(processorExchangeFactory, processor); + ServiceHelper.buildService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor); } @Override @@ -290,7 +335,7 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport config.getSuccessThreshold(), new SystemStopwatch(), null); } - ServiceHelper.initService(processorExchangeFactory, processor); + ServiceHelper.initService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor); } @Override @@ -306,7 +351,7 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport shutdownExecutorService = true; } - ServiceHelper.startService(processorExchangeFactory, processor); + ServiceHelper.startService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor); } @Override @@ -320,24 +365,32 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport executorService = null; } - ServiceHelper.stopService(processorExchangeFactory, processor); + ServiceHelper.stopService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor); } @Override protected void doShutdown() throws Exception { - ServiceHelper.stopAndShutdownServices(processorExchangeFactory, processor); + ServiceHelper.stopAndShutdownServices(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor); } - private static final class CircuitBreakerTask implements Callable<Exchange> { + private final class CircuitBreakerTask implements PooledExchangeTask, Callable<Exchange> { - private final ProcessorExchangeFactory processorExchangeFactory; - private final Processor processor; - private final Exchange exchange; + private Exchange exchange; - private CircuitBreakerTask(ProcessorExchangeFactory processorExchangeFactory, Processor processor, Exchange exchange) { - this.processorExchangeFactory = processorExchangeFactory; - this.processor = processor; + @Override + public void prepare(Exchange exchange, AsyncCallback callback) { this.exchange = exchange; + // callback not in use + } + + @Override + public void reset() { + this.exchange = null; + } + + @Override + public void run() { + // not in use } @Override @@ -396,20 +449,30 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport } } - private static final class CircuitBreakerFallbackTask implements Callable<Exchange> { + private final class CircuitBreakerFallbackTask implements PooledExchangeTask, Callable<Exchange> { - private final Processor processor; - private final Exchange exchange; + private Exchange exchange; - private CircuitBreakerFallbackTask(Processor processor, Exchange exchange) { - this.processor = processor; + @Override + public void prepare(Exchange exchange, AsyncCallback callback) { this.exchange = exchange; + // callback not in use + } + + @Override + public void reset() { + this.exchange = null; + } + + @Override + public void run() { + // not in use } @Override public Exchange call() throws Exception { Throwable throwable = exchange.getException(); - if (processor == null) { + if (fallbackProcessor == null) { if (throwable instanceof TimeoutException) { // the circuit breaker triggered a timeout (and there is no // fallback) so lets mark the exchange as failed @@ -452,10 +515,10 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false); // run the fallback processor try { - LOG.debug("Running fallback: {} with exchange: {}", processor, exchange); + LOG.debug("Running fallback: {} with exchange: {}", fallbackProcessor, exchange); // process the fallback until its fully done - processor.process(exchange); - LOG.debug("Running fallback: {} with exchange: {} done", processor, exchange); + fallbackProcessor.process(exchange); + LOG.debug("Running fallback: {} with exchange: {} done", fallbackProcessor, exchange); } catch (Exception e) { exchange.setException(e); } diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java index 05377ba..7853325 100644 --- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java +++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java @@ -495,6 +495,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport failed); } + exchange.removeProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK); callback.done(true); return true; }