This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new a7243c1  CAMEL-16455: Optimize CircuitBreaker EIP with task pooling
a7243c1 is described below

commit a7243c15068752eaf51953f727908989137ecb2b
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Thu Apr 8 16:24:43 2021 +0200

    CAMEL-16455: Optimize CircuitBreaker EIP with task pooling
---
 .../faulttolerance/FaultToleranceProcessor.java    | 111 ++++++++++++++++-----
 .../resilience4j/ResilienceProcessor.java          |   1 +
 2 files changed, 88 insertions(+), 24 deletions(-)

diff --git 
a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
 
b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
index 97a64f6..ad51366 100644
--- 
a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
+++ 
b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
@@ -44,6 +44,10 @@ import org.apache.camel.Processor;
 import org.apache.camel.RuntimeExchangeException;
 import org.apache.camel.api.management.ManagedAttribute;
 import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.processor.PooledExchangeTask;
+import org.apache.camel.processor.PooledExchangeTaskFactory;
+import org.apache.camel.processor.PooledTaskFactory;
+import org.apache.camel.processor.PrototypeTaskFactory;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.ProcessorExchangeFactory;
 import org.apache.camel.spi.RouteIdAware;
@@ -81,6 +85,8 @@ public class FaultToleranceProcessor extends 
AsyncProcessorSupport
     private ExecutorService executorService;
     private boolean shutdownExecutorService;
     private ProcessorExchangeFactory processorExchangeFactory;
+    private PooledExchangeTaskFactory taskFactory;
+    private PooledExchangeTaskFactory fallbackTaskFactory;
 
     public FaultToleranceProcessor(FaultToleranceConfiguration config, 
Processor processor,
                                    Processor fallbackProcessor) {
@@ -223,7 +229,8 @@ public class FaultToleranceProcessor extends 
AsyncProcessorSupport
         // Camel error handler
         exchange.setProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK, true);
 
-        Callable<Exchange> task = new 
CircuitBreakerTask(processorExchangeFactory, processor, exchange);
+        CircuitBreakerFallbackTask fallbackTask = null;
+        CircuitBreakerTask task = (CircuitBreakerTask) 
taskFactory.acquire(exchange, callback);
 
         // circuit breaker
         FaultToleranceStrategy target = circuitBreaker;
@@ -241,10 +248,11 @@ public class FaultToleranceProcessor extends 
AsyncProcessorSupport
         }
         // 3. fallback
         if (fallbackProcessor != null) {
-            Callable fallbackTask = new 
CircuitBreakerFallbackTask(fallbackProcessor, exchange);
+            fallbackTask = (CircuitBreakerFallbackTask) 
fallbackTaskFactory.acquire(exchange, callback);
+            final CircuitBreakerFallbackTask fFallbackTask = fallbackTask;
             target = new Fallback(target, "fallback", fallbackContext -> {
                 exchange.setException(fallbackContext.failure);
-                return fallbackTask.call();
+                return fFallbackTask.call();
             }, SetOfThrowables.ALL, SetOfThrowables.EMPTY, null);
         }
 
@@ -259,6 +267,11 @@ public class FaultToleranceProcessor extends 
AsyncProcessorSupport
         } catch (Exception e) {
             // some other kind of exception
             exchange.setException(e);
+        } finally {
+            taskFactory.release(task);
+            if (fallbackTask != null) {
+                fallbackTaskFactory.release(fallbackTask);
+            }
         }
 
         exchange.removeProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK);
@@ -270,13 +283,45 @@ public class FaultToleranceProcessor extends 
AsyncProcessorSupport
     protected void doBuild() throws Exception {
         ObjectHelper.notNull(camelContext, "CamelContext", this);
 
+        boolean pooled = 
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled();
+        if (pooled) {
+            int capacity = 
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity();
+            taskFactory = new PooledTaskFactory(getId()) {
+                @Override
+                public PooledExchangeTask create(Exchange exchange, 
AsyncCallback callback) {
+                    return new CircuitBreakerTask();
+                }
+            };
+            taskFactory.setCapacity(capacity);
+            fallbackTaskFactory = new PooledTaskFactory(getId()) {
+                @Override
+                public PooledExchangeTask create(Exchange exchange, 
AsyncCallback callback) {
+                    return new CircuitBreakerFallbackTask();
+                }
+            };
+            fallbackTaskFactory.setCapacity(capacity);
+        } else {
+            taskFactory = new PrototypeTaskFactory() {
+                @Override
+                public PooledExchangeTask create(Exchange exchange, 
AsyncCallback callback) {
+                    return new CircuitBreakerTask();
+                }
+            };
+            fallbackTaskFactory = new PrototypeTaskFactory() {
+                @Override
+                public PooledExchangeTask create(Exchange exchange, 
AsyncCallback callback) {
+                    return new CircuitBreakerFallbackTask();
+                }
+            };
+        }
+
         // create a per processor exchange factory
         this.processorExchangeFactory = 
getCamelContext().adapt(ExtendedCamelContext.class)
                 
.getProcessorExchangeFactory().newProcessorExchangeFactory(this);
         this.processorExchangeFactory.setRouteId(getRouteId());
         this.processorExchangeFactory.setId(getId());
 
-        ServiceHelper.buildService(processorExchangeFactory, processor);
+        ServiceHelper.buildService(processorExchangeFactory, taskFactory, 
fallbackTaskFactory, processor);
     }
 
     @Override
@@ -290,7 +335,7 @@ public class FaultToleranceProcessor extends 
AsyncProcessorSupport
                     config.getSuccessThreshold(), new SystemStopwatch(), null);
         }
 
-        ServiceHelper.initService(processorExchangeFactory, processor);
+        ServiceHelper.initService(processorExchangeFactory, taskFactory, 
fallbackTaskFactory, processor);
     }
 
     @Override
@@ -306,7 +351,7 @@ public class FaultToleranceProcessor extends 
AsyncProcessorSupport
             shutdownExecutorService = true;
         }
 
-        ServiceHelper.startService(processorExchangeFactory, processor);
+        ServiceHelper.startService(processorExchangeFactory, taskFactory, 
fallbackTaskFactory, processor);
     }
 
     @Override
@@ -320,24 +365,32 @@ public class FaultToleranceProcessor extends 
AsyncProcessorSupport
             executorService = null;
         }
 
-        ServiceHelper.stopService(processorExchangeFactory, processor);
+        ServiceHelper.stopService(processorExchangeFactory, taskFactory, 
fallbackTaskFactory, processor);
     }
 
     @Override
     protected void doShutdown() throws Exception {
-        ServiceHelper.stopAndShutdownServices(processorExchangeFactory, 
processor);
+        ServiceHelper.stopAndShutdownServices(processorExchangeFactory, 
taskFactory, fallbackTaskFactory, processor);
     }
 
-    private static final class CircuitBreakerTask implements 
Callable<Exchange> {
+    private final class CircuitBreakerTask implements PooledExchangeTask, 
Callable<Exchange> {
 
-        private final ProcessorExchangeFactory processorExchangeFactory;
-        private final Processor processor;
-        private final Exchange exchange;
+        private Exchange exchange;
 
-        private CircuitBreakerTask(ProcessorExchangeFactory 
processorExchangeFactory, Processor processor, Exchange exchange) {
-            this.processorExchangeFactory = processorExchangeFactory;
-            this.processor = processor;
+        @Override
+        public void prepare(Exchange exchange, AsyncCallback callback) {
             this.exchange = exchange;
+            // callback not in use
+        }
+
+        @Override
+        public void reset() {
+            this.exchange = null;
+        }
+
+        @Override
+        public void run() {
+            // not in use
         }
 
         @Override
@@ -396,20 +449,30 @@ public class FaultToleranceProcessor extends 
AsyncProcessorSupport
         }
     }
 
-    private static final class CircuitBreakerFallbackTask implements 
Callable<Exchange> {
+    private final class CircuitBreakerFallbackTask implements 
PooledExchangeTask, Callable<Exchange> {
 
-        private final Processor processor;
-        private final Exchange exchange;
+        private Exchange exchange;
 
-        private CircuitBreakerFallbackTask(Processor processor, Exchange 
exchange) {
-            this.processor = processor;
+        @Override
+        public void prepare(Exchange exchange, AsyncCallback callback) {
             this.exchange = exchange;
+            // callback not in use
+        }
+
+        @Override
+        public void reset() {
+            this.exchange = null;
+        }
+
+        @Override
+        public void run() {
+            // not in use
         }
 
         @Override
         public Exchange call() throws Exception {
             Throwable throwable = exchange.getException();
-            if (processor == null) {
+            if (fallbackProcessor == null) {
                 if (throwable instanceof TimeoutException) {
                     // the circuit breaker triggered a timeout (and there is no
                     // fallback) so lets mark the exchange as failed
@@ -452,10 +515,10 @@ public class FaultToleranceProcessor extends 
AsyncProcessorSupport
             
exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
             // run the fallback processor
             try {
-                LOG.debug("Running fallback: {} with exchange: {}", processor, 
exchange);
+                LOG.debug("Running fallback: {} with exchange: {}", 
fallbackProcessor, exchange);
                 // process the fallback until its fully done
-                processor.process(exchange);
-                LOG.debug("Running fallback: {} with exchange: {} done", 
processor, exchange);
+                fallbackProcessor.process(exchange);
+                LOG.debug("Running fallback: {} with exchange: {} done", 
fallbackProcessor, exchange);
             } catch (Exception e) {
                 exchange.setException(e);
             }
diff --git 
a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
 
b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index 05377ba..7853325 100644
--- 
a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ 
b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -495,6 +495,7 @@ public class ResilienceProcessor extends 
AsyncProcessorSupport
                     failed);
         }
 
+        exchange.removeProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK);
         callback.done(true);
         return true;
     }

Reply via email to