This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit b25b61079228a61d88b5b5c786e2406164c4d26d
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Fri Jul 29 10:28:43 2022 +0200

    CAMEL-18324: camel-core - Exception during preparing exchange task can 
block thread
---
 .../faulttolerance/FaultToleranceProcessor.java    | 53 ++++++++++++----------
 .../resilience4j/ResilienceProcessor.java          | 47 +++++++++++--------
 .../PopulateInitialHeadersFailedIssueTest.java     |  4 +-
 3 files changed, 57 insertions(+), 47 deletions(-)

diff --git 
a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
 
b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
index b29ead5be06..b11d0f7042c 100644
--- 
a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
+++ 
b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
@@ -231,33 +231,34 @@ public class FaultToleranceProcessor extends 
AsyncProcessorSupport
         exchange.setProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK, true);
 
         CircuitBreakerFallbackTask fallbackTask = null;
-        CircuitBreakerTask task = (CircuitBreakerTask) 
taskFactory.acquire(exchange, callback);
+        CircuitBreakerTask task = null;
+        try {
+            task = (CircuitBreakerTask) taskFactory.acquire(exchange, 
callback);
 
-        // circuit breaker
-        FaultToleranceStrategy target = circuitBreaker;
+            // circuit breaker
+            FaultToleranceStrategy target = circuitBreaker;
 
-        // 1. bulkhead
-        if (config.isBulkheadEnabled()) {
-            target = new FutureThreadPoolBulkhead(
-                    target, "bulkhead", config.getBulkheadMaxConcurrentCalls(),
-                    config.getBulkheadWaitingTaskQueue());
-        }
-        // 2. timeout
-        if (config.isTimeoutEnabled()) {
-            TimeoutWatcher watcher = new 
ScheduledExecutorTimeoutWatcher(scheduledExecutorService);
-            target = new Timeout(target, "timeout", 
config.getTimeoutDuration(), watcher);
-        }
-        // 3. fallback
-        if (fallbackProcessor != null) {
-            fallbackTask = (CircuitBreakerFallbackTask) 
fallbackTaskFactory.acquire(exchange, callback);
-            final CircuitBreakerFallbackTask fFallbackTask = fallbackTask;
-            target = new Fallback(target, "fallback", fallbackContext -> {
-                exchange.setException(fallbackContext.failure);
-                return fFallbackTask.call();
-            }, SetOfThrowables.ALL, SetOfThrowables.EMPTY);
-        }
+            // 1. bulkhead
+            if (config.isBulkheadEnabled()) {
+                target = new FutureThreadPoolBulkhead(
+                        target, "bulkhead", 
config.getBulkheadMaxConcurrentCalls(),
+                        config.getBulkheadWaitingTaskQueue());
+            }
+            // 2. timeout
+            if (config.isTimeoutEnabled()) {
+                TimeoutWatcher watcher = new 
ScheduledExecutorTimeoutWatcher(scheduledExecutorService);
+                target = new Timeout(target, "timeout", 
config.getTimeoutDuration(), watcher);
+            }
+            // 3. fallback
+            if (fallbackProcessor != null) {
+                fallbackTask = (CircuitBreakerFallbackTask) 
fallbackTaskFactory.acquire(exchange, callback);
+                final CircuitBreakerFallbackTask fFallbackTask = fallbackTask;
+                target = new Fallback(target, "fallback", fallbackContext -> {
+                    exchange.setException(fallbackContext.failure);
+                    return fFallbackTask.call();
+                }, SetOfThrowables.ALL, SetOfThrowables.EMPTY);
+            }
 
-        try {
             target.apply(new InvocationContext(task));
         } catch (CircuitBreakerOpenException e) {
             // the circuit breaker triggered a call rejected
@@ -269,7 +270,9 @@ public class FaultToleranceProcessor extends 
AsyncProcessorSupport
             // some other kind of exception
             exchange.setException(e);
         } finally {
-            taskFactory.release(task);
+            if (task != null) {
+                taskFactory.release(task);
+            }
             if (fallbackTask != null) {
                 fallbackTaskFactory.release(fallbackTask);
             }
diff --git 
a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
 
b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index 1dda3242f63..5b52420f031 100644
--- 
a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ 
b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -458,27 +458,30 @@ public class ResilienceProcessor extends 
AsyncProcessorSupport
         // Camel error handler
         exchange.setProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK, true);
 
-        CircuitBreakerFallbackTask fallbackTask = (CircuitBreakerFallbackTask) 
fallbackTaskFactory.acquire(exchange, callback);
-        CircuitBreakerTask task = (CircuitBreakerTask) 
taskFactory.acquire(exchange, callback);
-        Callable<Exchange> callable;
-
-        if (timeLimiter != null) {
-            Supplier<CompletableFuture<Exchange>> futureSupplier;
-            if (executorService == null) {
-                futureSupplier = () -> CompletableFuture.supplyAsync(task);
+        CircuitBreakerFallbackTask fallbackTask = null;
+        CircuitBreakerTask task = null;
+        try {
+            fallbackTask = (CircuitBreakerFallbackTask) 
fallbackTaskFactory.acquire(exchange, callback);
+            task = (CircuitBreakerTask) taskFactory.acquire(exchange, 
callback);
+            final CircuitBreakerTask ftask = task; // annoying final java 
thingy!
+            Callable<Exchange> callable;
+
+            if (timeLimiter != null) {
+                Supplier<CompletableFuture<Exchange>> futureSupplier;
+                if (executorService == null) {
+                    futureSupplier = () -> 
CompletableFuture.supplyAsync(ftask);
+                } else {
+                    futureSupplier = () -> 
CompletableFuture.supplyAsync(ftask, executorService);
+                }
+                callable = TimeLimiter.decorateFutureSupplier(timeLimiter, 
futureSupplier);
             } else {
-                futureSupplier = () -> CompletableFuture.supplyAsync(task, 
executorService);
+                callable = task;
+            }
+            if (bulkhead != null) {
+                callable = Bulkhead.decorateCallable(bulkhead, callable);
             }
-            callable = TimeLimiter.decorateFutureSupplier(timeLimiter, 
futureSupplier);
-        } else {
-            callable = task;
-        }
-        if (bulkhead != null) {
-            callable = Bulkhead.decorateCallable(bulkhead, callable);
-        }
 
-        callable = CircuitBreaker.decorateCallable(circuitBreaker, callable);
-        try {
+            callable = CircuitBreaker.decorateCallable(circuitBreaker, 
callable);
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Processing exchange: {} using circuit breaker: {}", 
exchange.getExchangeId(), id);
             }
@@ -486,8 +489,12 @@ public class ResilienceProcessor extends 
AsyncProcessorSupport
         } catch (Exception e) {
             exchange.setException(e);
         } finally {
-            taskFactory.release(task);
-            fallbackTaskFactory.release(fallbackTask);
+            if (task != null) {
+                taskFactory.release(task);
+            }
+            if (fallbackTask != null) {
+                fallbackTaskFactory.release(fallbackTask);
+            }
         }
 
         if (LOG.isTraceEnabled()) {
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/issues/PopulateInitialHeadersFailedIssueTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/issues/PopulateInitialHeadersFailedIssueTest.java
index dbcea545606..88d08bd3d7c 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/issues/PopulateInitialHeadersFailedIssueTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/issues/PopulateInitialHeadersFailedIssueTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.issues;
 
+import java.util.Map;
+
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
@@ -25,8 +27,6 @@ import org.apache.camel.support.DefaultMessage;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-import java.util.Map;
-
 public class PopulateInitialHeadersFailedIssueTest extends ContextTestSupport {
 
     @Test

Reply via email to