This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 41f580f CAMEL-16173: Camel Resilience4j Bulkhead seems to not limit concurrent requests. Thanks to Jesper Duelund Isaksen for the excellent reproducer example. 41f580f is described below commit 41f580f720b042419d877c2e4aac07fecfebf7d9 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Mar 18 20:32:29 2021 +0100 CAMEL-16173: Camel Resilience4j Bulkhead seems to not limit concurrent requests. Thanks to Jesper Duelund Isaksen for the excellent reproducer example. --- .../resilience4j/ResilienceProcessor.java | 78 +++++++++++++++++----- .../component/resilience4j/ResilienceReifier.java | 7 +- .../model/Resilience4jConfigurationDefinition.java | 2 +- 3 files changed, 68 insertions(+), 19 deletions(-) diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java index 537d59e..26e6bfe 100644 --- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java +++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java @@ -27,6 +27,7 @@ import java.util.function.Supplier; import io.github.resilience4j.bulkhead.Bulkhead; import io.github.resilience4j.bulkhead.BulkheadConfig; +import io.github.resilience4j.bulkhead.BulkheadFullException; import io.github.resilience4j.circuitbreaker.CallNotPermittedException; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; @@ -67,7 +68,9 @@ public class ResilienceProcessor extends AsyncProcessorSupport private String id; private final CircuitBreakerConfig circuitBreakerConfig; private final BulkheadConfig bulkheadConfig; + private Bulkhead bulkhead; private final TimeLimiterConfig timeLimiterConfig; + private TimeLimiter timeLimiter; private final Processor processor; private final Processor fallback; private boolean shutdownExecutorService; @@ -84,6 +87,17 @@ public class ResilienceProcessor extends AsyncProcessorSupport } @Override + protected void doBuild() throws Exception { + super.doBuild(); + if (timeLimiterConfig != null) { + timeLimiter = TimeLimiter.of(id, timeLimiterConfig); + } + if (bulkheadConfig != null) { + bulkhead = Bulkhead.of(id, bulkheadConfig); + } + } + + @Override public CamelContext getCamelContext() { return camelContext; } @@ -358,31 +372,39 @@ public class ResilienceProcessor extends AsyncProcessorSupport Callable<Exchange> task; - if (timeLimiterConfig != null) { - // timeout handling is more complex with thread-pools - - TimeLimiter tl = TimeLimiter.of(id, timeLimiterConfig); + if (timeLimiter != null) { Supplier<CompletableFuture<Exchange>> futureSupplier; if (executorService == null) { futureSupplier = () -> CompletableFuture.supplyAsync(() -> processInCopy(exchange)); } else { futureSupplier = () -> CompletableFuture.supplyAsync(() -> processInCopy(exchange), executorService); } - task = TimeLimiter.decorateFutureSupplier(tl, futureSupplier); + task = TimeLimiter.decorateFutureSupplier(timeLimiter, futureSupplier); } else { task = new CircuitBreakerTask(() -> processInCopy(exchange)); } - if (bulkheadConfig != null) { - Bulkhead bh = Bulkhead.of(id, bulkheadConfig); - task = Bulkhead.decorateCallable(bh, task); + if (bulkhead != null) { + task = Bulkhead.decorateCallable(bulkhead, task); } task = CircuitBreaker.decorateCallable(circuitBreaker, task); - - Function<Throwable, Exchange> fallbackTask = new CircuitBreakerFallbackTask(this.fallback, exchange); - Try.ofCallable(task).recover(fallbackTask).andFinally(() -> callback.done(false)).get(); - return false; + Function<Throwable, Exchange> fallbackTask = new CircuitBreakerFallbackTask(this.id, this.fallback, exchange); + try { + if (LOG.isTraceEnabled()) { + LOG.trace("Processing exchange: {} using circuit breaker: {}", exchange.getExchangeId(), id); + } + Try.ofCallable(task).recover(fallbackTask).get(); + } catch (Throwable e) { + exchange.setException(e); + } + if (LOG.isTraceEnabled()) { + boolean failed = exchange.isFailed(); + LOG.trace("Processing exchange: {} using circuit breaker: {} complete (failed: {})", exchange.getExchangeId(), id, + failed); + } + callback.done(true); + return true; } private Exchange processInCopy(Exchange exchange) { @@ -443,16 +465,23 @@ public class ResilienceProcessor extends AsyncProcessorSupport private static final class CircuitBreakerFallbackTask implements Function<Throwable, Exchange> { + private final String id; private final Processor processor; private final Exchange exchange; - private CircuitBreakerFallbackTask(Processor processor, Exchange exchange) { + private CircuitBreakerFallbackTask(String id, Processor processor, Exchange exchange) { + this.id = id; this.processor = processor; this.exchange = exchange; } @Override public Exchange apply(Throwable throwable) { + if (LOG.isTraceEnabled()) { + LOG.trace("Processing exchange: {} recover task using circuit breaker: {} from: {}", exchange.getExchangeId(), + id, throwable); + } + if (processor == null) { if (throwable instanceof TimeoutException) { // the circuit breaker triggered a timeout (and there is no @@ -465,14 +494,29 @@ public class ResilienceProcessor extends AsyncProcessorSupport return exchange; } else if (throwable instanceof CallNotPermittedException) { // the circuit breaker triggered a call rejected + // where the circuit breaker is half-open / open and therefore + // we should just set properties and do not set any exception + exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false); + exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false); + exchange.setProperty(CircuitBreakerConstants.RESPONSE_SHORT_CIRCUITED, true); + exchange.setProperty(CircuitBreakerConstants.RESPONSE_REJECTED, true); + return exchange; + } else if (throwable instanceof BulkheadFullException) { + // the circuit breaker bulkhead is full exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false); exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false); exchange.setProperty(CircuitBreakerConstants.RESPONSE_SHORT_CIRCUITED, true); exchange.setProperty(CircuitBreakerConstants.RESPONSE_REJECTED, true); - throw RuntimeExchangeException.wrapRuntimeException(throwable); + exchange.setException(throwable); + return exchange; } else { - // throw exception so resilient4j know it was a failure - throw RuntimeExchangeException.wrapRuntimeException(throwable); + // other kind of exception + exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false); + exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false); + exchange.setProperty(CircuitBreakerConstants.RESPONSE_SHORT_CIRCUITED, true); + exchange.setProperty(CircuitBreakerConstants.RESPONSE_REJECTED, true); + exchange.setException(throwable); + return exchange; } } @@ -500,7 +544,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport // process the fallback until its fully done processor.process(exchange); LOG.debug("Running fallback: {} with exchange: {} done", processor, exchange); - } catch (Exception e) { + } catch (Throwable e) { exchange.setException(e); } diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java index 76c9597..c65eb29 100644 --- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java +++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java @@ -120,7 +120,12 @@ public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition builder.maxConcurrentCalls(parseInt(config.getBulkheadMaxConcurrentCalls())); } if (config.getBulkheadMaxWaitDuration() != null) { - builder.maxWaitDuration(Duration.ofMillis(parseLong(config.getBulkheadMaxWaitDuration()))); + long duration = parseLong(config.getBulkheadMaxWaitDuration()); + if (duration <= 0) { + builder.maxWaitDuration(Duration.ZERO); + } else { + builder.maxWaitDuration(Duration.ofMillis(duration)); + } } return builder.build(); } diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java index 9fb6891..77468c2 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java @@ -198,7 +198,7 @@ public class Resilience4jConfigurationDefinition extends Resilience4jConfigurati * Configures the max amount of concurrent calls the bulkhead will support. */ public Resilience4jConfigurationDefinition bulkheadMaxConcurrentCalls(int bulkheadMaxConcurrentCalls) { - setBulkheadMaxWaitDuration(Integer.toString(bulkheadMaxConcurrentCalls)); + setBulkheadMaxConcurrentCalls(Integer.toString(bulkheadMaxConcurrentCalls)); return this; }