This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch CAMEL-13691 in repository https://gitbox.apache.org/repos/asf/camel.git
commit f3e97b122212c2427e91ffdbf36a0950cbb64514 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sat Nov 16 13:23:47 2019 +0100 CAMEL-13691: camel-resilience4j - WIP --- .../resilience4j/ResilienceProcessor.java | 11 +-- .../component/resilience4j/ResilienceReifier.java | 104 ++++++++++++++++++++- 2 files changed, 106 insertions(+), 9 deletions(-) diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java index 1a6dbef..91fa38c 100644 --- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java +++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java @@ -16,16 +16,13 @@ */ package org.apache.camel.component.resilience4j; -import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.function.Supplier; import io.github.resilience4j.circuitbreaker.CircuitBreaker; -import io.github.resilience4j.timelimiter.TimeLimiter; +import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; import io.vavr.control.Try; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; @@ -48,11 +45,13 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga private static final Logger LOG = LoggerFactory.getLogger(ResilienceProcessor.class); private String id; + private CircuitBreakerConfig config; private final Processor processor; private final Processor fallback; private final boolean fallbackViaNetwork; - public ResilienceProcessor(Processor processor, Processor fallback, boolean fallbackViaNetwork) { + public ResilienceProcessor(CircuitBreakerConfig config, Processor processor, Processor fallback, boolean fallbackViaNetwork) { + this.config = config; this.processor = processor; this.fallback = fallback; this.fallbackViaNetwork = fallbackViaNetwork; @@ -109,7 +108,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga // Future // }); - CircuitBreaker cb = CircuitBreaker.ofDefaults(id); + CircuitBreaker cb = CircuitBreaker.of(id, config); Supplier<Exchange> task = CircuitBreaker.decorateSupplier(cb, new CircuitBreakerTask(processor, exchange)); Try.ofSupplier(task) .recover(new CircuitBreakerFallbackTask(fallback, exchange)) diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java index 4bd1278..3704219 100644 --- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java +++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java @@ -16,14 +16,31 @@ */ package org.apache.camel.component.resilience4j; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; + +import org.apache.camel.CamelContext; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.Processor; import org.apache.camel.model.CircuitBreakerDefinition; +import org.apache.camel.model.Model; +import org.apache.camel.model.Resilience4jConfigurationCommon; +import org.apache.camel.model.Resilience4jConfigurationDefinition; import org.apache.camel.reifier.ProcessorReifier; +import org.apache.camel.spi.BeanIntrospection; import org.apache.camel.spi.RouteContext; +import org.apache.camel.support.PropertyBindingSupport; +import org.apache.camel.util.function.Suppliers; + +import static org.apache.camel.support.CamelContextHelper.lookup; +import static org.apache.camel.support.CamelContextHelper.mandatoryLookup; public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition> { - // TODO: Resilience configuration in camel-core / model // TODO: Timeout // TODO: Bulkhead for viaNetwork @@ -40,9 +57,90 @@ public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition fallback = ProcessorReifier.reifier(definition.getOnFallback()).createProcessor(routeContext); } - final String id = getId(definition, routeContext); + final Resilience4jConfigurationCommon config = buildResilience4jConfiguration(routeContext.getCamelContext()); + CircuitBreakerConfig cfg = configureResilience4j(config); + + return new ResilienceProcessor(cfg, processor, fallback, false); + } + + private CircuitBreakerConfig configureResilience4j(Resilience4jConfigurationCommon config) { + CircuitBreakerConfig.Builder builder = CircuitBreakerConfig.custom(); + if (config.getAutomaticTransitionFromOpenToHalfOpenEnabled() != null) { + builder.automaticTransitionFromOpenToHalfOpenEnabled(config.getAutomaticTransitionFromOpenToHalfOpenEnabled()); + } + if (config.getFailureRateThreshold() != null) { + builder.failureRateThreshold(config.getFailureRateThreshold()); + } + if (config.getMinimumNumberOfCalls() != null) { + builder.minimumNumberOfCalls(config.getMinimumNumberOfCalls()); + } + if (config.getPermittedNumberOfCallsInHalfOpenState() != null) { + builder.permittedNumberOfCallsInHalfOpenState(config.getPermittedNumberOfCallsInHalfOpenState()); + } + if (config.getSlidingWindowSize() != null) { + builder.slidingWindowSize(config.getSlidingWindowSize()); + } + if (config.getSlidingWindowType() != null) { + builder.slidingWindowType(CircuitBreakerConfig.SlidingWindowType.valueOf(config.getSlidingWindowType())); + } + if (config.getSlowCallDurationThreshold() != null) { + builder.slowCallDurationThreshold(Duration.ofSeconds(config.getSlowCallDurationThreshold())); + } + if (config.getSlowCallRateThreshold() != null) { + builder.slowCallRateThreshold(config.getSlowCallRateThreshold()); + } + if (config.getWaitDurationInOpenState() != null) { + builder.waitDurationInOpenState(Duration.ofSeconds(config.getWaitDurationInOpenState())); + } + if (config.getWritableStackTraceEnabled() != null) { + builder.writableStackTraceEnabled(config.getWritableStackTraceEnabled()); + } + return builder.build(); + } + + // ******************************* + // Helpers + // ******************************* + + Resilience4jConfigurationDefinition buildResilience4jConfiguration(CamelContext camelContext) throws Exception { + Map<String, Object> properties = new HashMap<>(); + + // Extract properties from default configuration, the one configured on + // camel context takes the precedence over those in the registry + loadProperties(camelContext, properties, Suppliers.firstNotNull( + () -> camelContext.getExtension(Model.class).getResilience4jConfiguration(null), + () -> lookup(camelContext, "Camel", Resilience4jConfigurationDefinition.class)) + ); + + // Extract properties from referenced configuration, the one configured + // on camel context takes the precedence over those in the registry + if (definition.getConfigurationRef() != null) { + final String ref = definition.getConfigurationRef(); + + loadProperties(camelContext, properties, Suppliers.firstNotNull( + () -> camelContext.getExtension(Model.class).getResilience4jConfiguration(ref), + () -> mandatoryLookup(camelContext, ref, Resilience4jConfigurationDefinition.class)) + ); + } + + // Extract properties from local configuration + loadProperties(camelContext, properties, Optional.ofNullable(definition.getResilience4jConfiguration())); + + // Extract properties from definition + BeanIntrospection beanIntrospection = camelContext.adapt(ExtendedCamelContext.class).getBeanIntrospection(); + beanIntrospection.getProperties(definition, properties, null, false); + + Resilience4jConfigurationDefinition config = new Resilience4jConfigurationDefinition(); + + // Apply properties to a new configuration + PropertyBindingSupport.bindProperties(camelContext, config, properties); + + return config; + } - return new ResilienceProcessor(processor, fallback, false); + private void loadProperties(CamelContext camelContext, Map<String, Object> properties, Optional<?> optional) { + BeanIntrospection beanIntrospection = camelContext.adapt(ExtendedCamelContext.class).getBeanIntrospection(); + optional.ifPresent(bean -> beanIntrospection.getProperties(bean, properties, null, false)); } }