This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.14.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit b25b61079228a61d88b5b5c786e2406164c4d26d Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Fri Jul 29 10:28:43 2022 +0200 CAMEL-18324: camel-core - Exception during preparing exchange task can block thread --- .../faulttolerance/FaultToleranceProcessor.java | 53 ++++++++++++---------- .../resilience4j/ResilienceProcessor.java | 47 +++++++++++-------- .../PopulateInitialHeadersFailedIssueTest.java | 4 +- 3 files changed, 57 insertions(+), 47 deletions(-) diff --git a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java index b29ead5be06..b11d0f7042c 100644 --- a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java +++ b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java @@ -231,33 +231,34 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport exchange.setProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK, true); CircuitBreakerFallbackTask fallbackTask = null; - CircuitBreakerTask task = (CircuitBreakerTask) taskFactory.acquire(exchange, callback); + CircuitBreakerTask task = null; + try { + task = (CircuitBreakerTask) taskFactory.acquire(exchange, callback); - // circuit breaker - FaultToleranceStrategy target = circuitBreaker; + // circuit breaker + FaultToleranceStrategy target = circuitBreaker; - // 1. bulkhead - if (config.isBulkheadEnabled()) { - target = new FutureThreadPoolBulkhead( - target, "bulkhead", config.getBulkheadMaxConcurrentCalls(), - config.getBulkheadWaitingTaskQueue()); - } - // 2. timeout - if (config.isTimeoutEnabled()) { - TimeoutWatcher watcher = new ScheduledExecutorTimeoutWatcher(scheduledExecutorService); - target = new Timeout(target, "timeout", config.getTimeoutDuration(), watcher); - } - // 3. fallback - if (fallbackProcessor != null) { - fallbackTask = (CircuitBreakerFallbackTask) fallbackTaskFactory.acquire(exchange, callback); - final CircuitBreakerFallbackTask fFallbackTask = fallbackTask; - target = new Fallback(target, "fallback", fallbackContext -> { - exchange.setException(fallbackContext.failure); - return fFallbackTask.call(); - }, SetOfThrowables.ALL, SetOfThrowables.EMPTY); - } + // 1. bulkhead + if (config.isBulkheadEnabled()) { + target = new FutureThreadPoolBulkhead( + target, "bulkhead", config.getBulkheadMaxConcurrentCalls(), + config.getBulkheadWaitingTaskQueue()); + } + // 2. timeout + if (config.isTimeoutEnabled()) { + TimeoutWatcher watcher = new ScheduledExecutorTimeoutWatcher(scheduledExecutorService); + target = new Timeout(target, "timeout", config.getTimeoutDuration(), watcher); + } + // 3. fallback + if (fallbackProcessor != null) { + fallbackTask = (CircuitBreakerFallbackTask) fallbackTaskFactory.acquire(exchange, callback); + final CircuitBreakerFallbackTask fFallbackTask = fallbackTask; + target = new Fallback(target, "fallback", fallbackContext -> { + exchange.setException(fallbackContext.failure); + return fFallbackTask.call(); + }, SetOfThrowables.ALL, SetOfThrowables.EMPTY); + } - try { target.apply(new InvocationContext(task)); } catch (CircuitBreakerOpenException e) { // the circuit breaker triggered a call rejected @@ -269,7 +270,9 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport // some other kind of exception exchange.setException(e); } finally { - taskFactory.release(task); + if (task != null) { + taskFactory.release(task); + } if (fallbackTask != null) { fallbackTaskFactory.release(fallbackTask); } diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java index 1dda3242f63..5b52420f031 100644 --- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java +++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java @@ -458,27 +458,30 @@ public class ResilienceProcessor extends AsyncProcessorSupport // Camel error handler exchange.setProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK, true); - CircuitBreakerFallbackTask fallbackTask = (CircuitBreakerFallbackTask) fallbackTaskFactory.acquire(exchange, callback); - CircuitBreakerTask task = (CircuitBreakerTask) taskFactory.acquire(exchange, callback); - Callable<Exchange> callable; - - if (timeLimiter != null) { - Supplier<CompletableFuture<Exchange>> futureSupplier; - if (executorService == null) { - futureSupplier = () -> CompletableFuture.supplyAsync(task); + CircuitBreakerFallbackTask fallbackTask = null; + CircuitBreakerTask task = null; + try { + fallbackTask = (CircuitBreakerFallbackTask) fallbackTaskFactory.acquire(exchange, callback); + task = (CircuitBreakerTask) taskFactory.acquire(exchange, callback); + final CircuitBreakerTask ftask = task; // annoying final java thingy! + Callable<Exchange> callable; + + if (timeLimiter != null) { + Supplier<CompletableFuture<Exchange>> futureSupplier; + if (executorService == null) { + futureSupplier = () -> CompletableFuture.supplyAsync(ftask); + } else { + futureSupplier = () -> CompletableFuture.supplyAsync(ftask, executorService); + } + callable = TimeLimiter.decorateFutureSupplier(timeLimiter, futureSupplier); } else { - futureSupplier = () -> CompletableFuture.supplyAsync(task, executorService); + callable = task; + } + if (bulkhead != null) { + callable = Bulkhead.decorateCallable(bulkhead, callable); } - callable = TimeLimiter.decorateFutureSupplier(timeLimiter, futureSupplier); - } else { - callable = task; - } - if (bulkhead != null) { - callable = Bulkhead.decorateCallable(bulkhead, callable); - } - callable = CircuitBreaker.decorateCallable(circuitBreaker, callable); - try { + callable = CircuitBreaker.decorateCallable(circuitBreaker, callable); if (LOG.isTraceEnabled()) { LOG.trace("Processing exchange: {} using circuit breaker: {}", exchange.getExchangeId(), id); } @@ -486,8 +489,12 @@ public class ResilienceProcessor extends AsyncProcessorSupport } catch (Exception e) { exchange.setException(e); } finally { - taskFactory.release(task); - fallbackTaskFactory.release(fallbackTask); + if (task != null) { + taskFactory.release(task); + } + if (fallbackTask != null) { + fallbackTaskFactory.release(fallbackTask); + } } if (LOG.isTraceEnabled()) { diff --git a/core/camel-core/src/test/java/org/apache/camel/issues/PopulateInitialHeadersFailedIssueTest.java b/core/camel-core/src/test/java/org/apache/camel/issues/PopulateInitialHeadersFailedIssueTest.java index dbcea545606..88d08bd3d7c 100644 --- a/core/camel-core/src/test/java/org/apache/camel/issues/PopulateInitialHeadersFailedIssueTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/issues/PopulateInitialHeadersFailedIssueTest.java @@ -16,6 +16,8 @@ */ package org.apache.camel.issues; +import java.util.Map; + import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; @@ -25,8 +27,6 @@ import org.apache.camel.support.DefaultMessage; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.util.Map; - public class PopulateInitialHeadersFailedIssueTest extends ContextTestSupport { @Test