This is an automated email from the ASF dual-hosted git repository. github-bot pushed a commit to branch camel-main in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
commit 21b9941985e8031420e7439dd89a3b1d13c4bfd5 Author: James Netherton <jamesnether...@gmail.com> AuthorDate: Fri Feb 11 10:08:03 2022 +0000 Temporary hacks to handle version misalignment of smallrye-health and smallrye-faulttolerance in Quarkus & Camel --- .../MicroprofileFaultToleranceProcessor.java | 9 + .../microprofile-fault-tolerance/runtime/pom.xml | 2 + .../FaultToleranceConfiguration.java | 120 +++++ .../faulttolerance/FaultToleranceConstants.java} | 21 +- .../faulttolerance/FaultToleranceProcessor.java | 536 +++++++++++++++++++++ .../FaultToleranceProcessorFactory.java} | 28 +- .../faulttolerance/FaultToleranceReifier.java | 193 ++++++++ .../apache/camel/model/CircuitBreakerDefinition | 18 + .../runtime/CamelMicroProfileHealthRecorder.java | 3 +- ...amelQuarkusMicroProfileHealthCheckRegistry.java | 46 ++ 10 files changed, 942 insertions(+), 34 deletions(-) diff --git a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java b/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java index 05673b1..6e8c382 100644 --- a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java +++ b/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java @@ -16,9 +16,13 @@ */ package org.apache.camel.quarkus.component.microprofile.fault.tolerance.deployment; +import java.nio.file.Paths; + import io.quarkus.deployment.annotations.BuildStep; import io.quarkus.deployment.builditem.FeatureBuildItem; import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem; +import org.apache.camel.component.microprofile.faulttolerance.FaultToleranceProcessorFactory; +import org.apache.camel.quarkus.core.deployment.spi.CamelServiceBuildItem; class MicroprofileFaultToleranceProcessor { @@ -35,4 +39,9 @@ class MicroprofileFaultToleranceProcessor { "META-INF/services/org/apache/camel/model/CircuitBreakerDefinition"); } + @BuildStep + CamelServiceBuildItem camelCronServicePattern() { + return new CamelServiceBuildItem(Paths.get("META-INF/services/org/apache/camel/model/CircuitBreakerDefinition"), + FaultToleranceProcessorFactory.class.getName()); + } } diff --git a/extensions/microprofile-fault-tolerance/runtime/pom.xml b/extensions/microprofile-fault-tolerance/runtime/pom.xml index 3401f07..22e3962 100644 --- a/extensions/microprofile-fault-tolerance/runtime/pom.xml +++ b/extensions/microprofile-fault-tolerance/runtime/pom.xml @@ -56,10 +56,12 @@ <groupId>org.apache.camel.quarkus</groupId> <artifactId>camel-quarkus-core</artifactId> </dependency> + <!-- Not compatible with Quarkus 2.7.x <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-microprofile-fault-tolerance</artifactId> </dependency> + --> </dependencies> <build> diff --git a/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConfiguration.java b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConfiguration.java new file mode 100644 index 0000000..7cb3d4d --- /dev/null +++ b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConfiguration.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.microprofile.faulttolerance; + +public class FaultToleranceConfiguration { + + private long delay; + private int successThreshold; + private int requestVolumeThreshold; + private float failureRatio; + private boolean timeoutEnabled; + private long timeoutDuration; + private int timeoutPoolSize; + private String timeoutExecutorServiceRef; + private boolean bulkheadEnabled; + private int bulkheadMaxConcurrentCalls; + private int bulkheadWaitingTaskQueue; + + public long getDelay() { + return delay; + } + + public void setDelay(long delay) { + this.delay = delay; + } + + public int getSuccessThreshold() { + return successThreshold; + } + + public void setSuccessThreshold(int successThreshold) { + this.successThreshold = successThreshold; + } + + public int getRequestVolumeThreshold() { + return requestVolumeThreshold; + } + + public void setRequestVolumeThreshold(int requestVolumeThreshold) { + this.requestVolumeThreshold = requestVolumeThreshold; + } + + public float getFailureRatio() { + return failureRatio; + } + + public void setFailureRatio(float failureRatio) { + this.failureRatio = failureRatio; + } + + public boolean isTimeoutEnabled() { + return timeoutEnabled; + } + + public void setTimeoutEnabled(boolean timeoutEnabled) { + this.timeoutEnabled = timeoutEnabled; + } + + public long getTimeoutDuration() { + return timeoutDuration; + } + + public void setTimeoutDuration(long timeoutDuration) { + this.timeoutDuration = timeoutDuration; + } + + public int getTimeoutPoolSize() { + return timeoutPoolSize; + } + + public void setTimeoutPoolSize(int timeoutPoolSize) { + this.timeoutPoolSize = timeoutPoolSize; + } + + public String getTimeoutExecutorServiceRef() { + return timeoutExecutorServiceRef; + } + + public void setTimeoutExecutorServiceRef(String timeoutExecutorServiceRef) { + this.timeoutExecutorServiceRef = timeoutExecutorServiceRef; + } + + public boolean isBulkheadEnabled() { + return bulkheadEnabled; + } + + public void setBulkheadEnabled(boolean bulkheadEnabled) { + this.bulkheadEnabled = bulkheadEnabled; + } + + public int getBulkheadMaxConcurrentCalls() { + return bulkheadMaxConcurrentCalls; + } + + public void setBulkheadMaxConcurrentCalls(int bulkheadMaxConcurrentCalls) { + this.bulkheadMaxConcurrentCalls = bulkheadMaxConcurrentCalls; + } + + public int getBulkheadWaitingTaskQueue() { + return bulkheadWaitingTaskQueue; + } + + public void setBulkheadWaitingTaskQueue(int bulkheadWaitingTaskQueue) { + this.bulkheadWaitingTaskQueue = bulkheadWaitingTaskQueue; + } +} diff --git a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConstants.java similarity index 53% copy from extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java copy to extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConstants.java index 05673b1..3bb0027 100644 --- a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java +++ b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConstants.java @@ -14,25 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.quarkus.component.microprofile.fault.tolerance.deployment; +package org.apache.camel.component.microprofile.faulttolerance; -import io.quarkus.deployment.annotations.BuildStep; -import io.quarkus.deployment.builditem.FeatureBuildItem; -import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem; +public interface FaultToleranceConstants { -class MicroprofileFaultToleranceProcessor { - - private static final String FEATURE = "camel-microprofile-fault-tolerance"; - - @BuildStep - FeatureBuildItem feature() { - return new FeatureBuildItem(FEATURE); - } - - @BuildStep - NativeImageResourceBuildItem initResources() { - return new NativeImageResourceBuildItem( - "META-INF/services/org/apache/camel/model/CircuitBreakerDefinition"); - } + String DEFAULT_FAULT_TOLERANCE_CONFIGURATION_ID = "fault-tolerance-configuration"; } diff --git a/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java new file mode 100644 index 0000000..2195b89 --- /dev/null +++ b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java @@ -0,0 +1,536 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.microprofile.faulttolerance; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +import io.smallrye.faulttolerance.core.FaultToleranceStrategy; +import io.smallrye.faulttolerance.core.InvocationContext; +import io.smallrye.faulttolerance.core.bulkhead.FutureThreadPoolBulkhead; +import io.smallrye.faulttolerance.core.circuit.breaker.CircuitBreaker; +import io.smallrye.faulttolerance.core.fallback.Fallback; +import io.smallrye.faulttolerance.core.stopwatch.SystemStopwatch; +import io.smallrye.faulttolerance.core.timeout.ScheduledExecutorTimeoutWatcher; +import io.smallrye.faulttolerance.core.timeout.Timeout; +import io.smallrye.faulttolerance.core.timeout.TimeoutWatcher; +import io.smallrye.faulttolerance.core.util.SetOfThrowables; +import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePropertyKey; +import org.apache.camel.ExtendedCamelContext; +import org.apache.camel.ExtendedExchange; +import org.apache.camel.Navigate; +import org.apache.camel.Processor; +import org.apache.camel.Route; +import org.apache.camel.RuntimeExchangeException; +import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.processor.PooledExchangeTask; +import org.apache.camel.processor.PooledExchangeTaskFactory; +import org.apache.camel.processor.PooledTaskFactory; +import org.apache.camel.processor.PrototypeTaskFactory; +import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.ProcessorExchangeFactory; +import org.apache.camel.spi.RouteIdAware; +import org.apache.camel.spi.UnitOfWork; +import org.apache.camel.support.AsyncProcessorSupport; +import org.apache.camel.support.ExchangeHelper; +import org.apache.camel.support.UnitOfWorkHelper; +import org.apache.camel.support.service.ServiceHelper; +import org.apache.camel.util.ObjectHelper; +import org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException; +import org.eclipse.microprofile.faulttolerance.exceptions.TimeoutException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static io.smallrye.faulttolerance.core.Invocation.invocation; + +/** + * Implementation of Circuit Breaker EIP using microprofile fault tolerance. + */ +@ManagedResource(description = "Managed FaultTolerance Processor") +public class FaultToleranceProcessor extends AsyncProcessorSupport + implements CamelContextAware, Navigate<Processor>, org.apache.camel.Traceable, IdAware, RouteIdAware { + + private static final Logger LOG = LoggerFactory.getLogger(FaultToleranceProcessor.class); + + private volatile CircuitBreaker circuitBreaker; + private CamelContext camelContext; + private String id; + private String routeId; + private final FaultToleranceConfiguration config; + private final Processor processor; + private final Processor fallbackProcessor; + private ScheduledExecutorService scheduledExecutorService; + private boolean shutdownScheduledExecutorService; + private ExecutorService executorService; + private boolean shutdownExecutorService; + private ProcessorExchangeFactory processorExchangeFactory; + private PooledExchangeTaskFactory taskFactory; + private PooledExchangeTaskFactory fallbackTaskFactory; + + public FaultToleranceProcessor(FaultToleranceConfiguration config, Processor processor, + Processor fallbackProcessor) { + this.config = config; + this.processor = processor; + this.fallbackProcessor = fallbackProcessor; + } + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public String getId() { + return id; + } + + @Override + public void setId(String id) { + this.id = id; + } + + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + public CircuitBreaker getCircuitBreaker() { + return circuitBreaker; + } + + public void setCircuitBreaker(CircuitBreaker circuitBreaker) { + this.circuitBreaker = circuitBreaker; + } + + public boolean isShutdownExecutorService() { + return shutdownExecutorService; + } + + public void setShutdownExecutorService(boolean shutdownExecutorService) { + this.shutdownExecutorService = shutdownExecutorService; + } + + public ExecutorService getExecutorService() { + return executorService; + } + + public void setExecutorService(ExecutorService executorService) { + this.executorService = executorService; + } + + @Override + public String getTraceLabel() { + return "faultTolerance"; + } + + @ManagedAttribute(description = "Returns the current delay in milliseconds.") + public long getDelay() { + return config.getDelay(); + } + + @ManagedAttribute(description = "Returns the current failure rate in percentage.") + public float getFailureRate() { + return config.getFailureRatio(); + } + + @ManagedAttribute(description = "Returns the current request volume threshold.") + public int getRequestVolumeThreshold() { + return config.getRequestVolumeThreshold(); + } + + @ManagedAttribute(description = "Returns the current success threshold.") + public int getSuccessThreshold() { + return config.getSuccessThreshold(); + } + + @ManagedAttribute(description = "Is timeout enabled") + public boolean isTimeoutEnabled() { + return config.isTimeoutEnabled(); + } + + @ManagedAttribute(description = "The timeout wait duration") + public long getTimeoutDuration() { + return config.getTimeoutDuration(); + } + + @ManagedAttribute(description = "The timeout pool size for the thread pool") + public int getTimeoutPoolSize() { + return config.getTimeoutPoolSize(); + } + + @ManagedAttribute(description = "Is bulkhead enabled") + public boolean isBulkheadEnabled() { + return config.isBulkheadEnabled(); + } + + @ManagedAttribute(description = "The max amount of concurrent calls the bulkhead will support.") + public int getBulkheadMaxConcurrentCalls() { + return config.getBulkheadMaxConcurrentCalls(); + } + + @ManagedAttribute(description = "The task queue size for holding waiting tasks to be processed by the bulkhead") + public int getBulkheadWaitingTaskQueue() { + return config.getBulkheadWaitingTaskQueue(); + } + + @Override + public List<Processor> next() { + if (!hasNext()) { + return null; + } + List<Processor> answer = new ArrayList<>(); + answer.add(processor); + if (fallbackProcessor != null) { + answer.add(fallbackProcessor); + } + return answer; + } + + @Override + public boolean hasNext() { + return true; + } + + @Override + @SuppressWarnings("unchecked") + public boolean process(Exchange exchange, AsyncCallback callback) { + // run this as if we run inside try .. catch so there is no regular + // Camel error handler + exchange.setProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK, true); + + CircuitBreakerFallbackTask fallbackTask = null; + CircuitBreakerTask task = (CircuitBreakerTask) taskFactory.acquire(exchange, callback); + + // circuit breaker + FaultToleranceStrategy target = circuitBreaker; + + // 1. bulkhead + if (config.isBulkheadEnabled()) { + target = new FutureThreadPoolBulkhead( + target, "bulkhead", config.getBulkheadMaxConcurrentCalls(), + config.getBulkheadWaitingTaskQueue()); + } + // 2. timeout + if (config.isTimeoutEnabled()) { + TimeoutWatcher watcher = new ScheduledExecutorTimeoutWatcher(scheduledExecutorService); + target = new Timeout(target, "timeout", config.getTimeoutDuration(), watcher); + } + // 3. fallback + if (fallbackProcessor != null) { + fallbackTask = (CircuitBreakerFallbackTask) fallbackTaskFactory.acquire(exchange, callback); + final CircuitBreakerFallbackTask fFallbackTask = fallbackTask; + target = new Fallback(target, "fallback", fallbackContext -> { + exchange.setException(fallbackContext.failure); + return fFallbackTask.call(); + }, SetOfThrowables.ALL, SetOfThrowables.EMPTY); + } + + try { + target.apply(new InvocationContext(task)); + } catch (CircuitBreakerOpenException e) { + // the circuit breaker triggered a call rejected + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SUCCESSFUL_EXECUTION, false); + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_FROM_FALLBACK, false); + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SHORT_CIRCUITED, true); + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_REJECTED, true); + } catch (Exception e) { + // some other kind of exception + exchange.setException(e); + } finally { + taskFactory.release(task); + if (fallbackTask != null) { + fallbackTaskFactory.release(fallbackTask); + } + } + + exchange.removeProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK); + callback.done(true); + return true; + } + + @Override + protected void doBuild() throws Exception { + ObjectHelper.notNull(camelContext, "CamelContext", this); + + boolean pooled = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled(); + if (pooled) { + int capacity = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity(); + taskFactory = new PooledTaskFactory(getId()) { + @Override + public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { + return new CircuitBreakerTask(); + } + }; + taskFactory.setCapacity(capacity); + fallbackTaskFactory = new PooledTaskFactory(getId()) { + @Override + public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { + return new CircuitBreakerFallbackTask(); + } + }; + fallbackTaskFactory.setCapacity(capacity); + } else { + taskFactory = new PrototypeTaskFactory() { + @Override + public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { + return new CircuitBreakerTask(); + } + }; + fallbackTaskFactory = new PrototypeTaskFactory() { + @Override + public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { + return new CircuitBreakerFallbackTask(); + } + }; + } + + // create a per processor exchange factory + this.processorExchangeFactory = getCamelContext().adapt(ExtendedCamelContext.class) + .getProcessorExchangeFactory().newProcessorExchangeFactory(this); + this.processorExchangeFactory.setRouteId(getRouteId()); + this.processorExchangeFactory.setId(getId()); + + ServiceHelper.buildService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor); + } + + @Override + @SuppressWarnings("unchecked") + protected void doInit() throws Exception { + ObjectHelper.notNull(camelContext, "CamelContext", this); + if (circuitBreaker == null) { + circuitBreaker = new CircuitBreaker( + invocation(), id, SetOfThrowables.ALL, + SetOfThrowables.EMPTY, config.getDelay(), config.getRequestVolumeThreshold(), config.getFailureRatio(), + config.getSuccessThreshold(), new SystemStopwatch()); + } + + ServiceHelper.initService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor); + } + + @Override + protected void doStart() throws Exception { + if (config.isTimeoutEnabled() && scheduledExecutorService == null) { + scheduledExecutorService = getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this, + "CircuitBreakerTimeout", config.getTimeoutPoolSize()); + shutdownScheduledExecutorService = true; + } + if (config.isBulkheadEnabled() && executorService == null) { + executorService = getCamelContext().getExecutorServiceManager().newThreadPool(this, "CircuitBreakerBulkhead", + config.getBulkheadMaxConcurrentCalls(), config.getBulkheadMaxConcurrentCalls()); + shutdownExecutorService = true; + } + + ServiceHelper.startService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor); + } + + @Override + protected void doStop() throws Exception { + if (shutdownScheduledExecutorService && scheduledExecutorService != null) { + getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutorService); + scheduledExecutorService = null; + } + if (shutdownExecutorService && executorService != null) { + getCamelContext().getExecutorServiceManager().shutdownNow(executorService); + executorService = null; + } + + ServiceHelper.stopService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor); + } + + @Override + protected void doShutdown() throws Exception { + ServiceHelper.stopAndShutdownServices(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor); + } + + private final class CircuitBreakerTask implements PooledExchangeTask, Callable<Exchange> { + + private Exchange exchange; + + @Override + public void prepare(Exchange exchange, AsyncCallback callback) { + this.exchange = exchange; + // callback not in use + } + + @Override + public void reset() { + this.exchange = null; + } + + @Override + public void run() { + // not in use + } + + @Override + public Exchange call() throws Exception { + Exchange copy = null; + UnitOfWork uow = null; + Throwable cause; + + // turn of interruption to allow fault tolerance to process the exchange under its handling + exchange.adapt(ExtendedExchange.class).setInterruptable(false); + + try { + LOG.debug("Running processor: {} with exchange: {}", processor, exchange); + + // prepare a copy of exchange so downstream processors don't + // cause side-effects if they mutate the exchange + // in case timeout processing and continue with the fallback etc + copy = processorExchangeFactory.createCorrelatedCopy(exchange, false); + if (copy.getUnitOfWork() != null) { + uow = copy.getUnitOfWork(); + } else { + // prepare uow on copy + uow = copy.getContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory().createUnitOfWork(copy); + copy.adapt(ExtendedExchange.class).setUnitOfWork(uow); + // the copy must be starting from the route where its copied from + Route route = ExchangeHelper.getRoute(exchange); + if (route != null) { + uow.pushRoute(route); + } + } + + // process the processor until its fully done + processor.process(copy); + + // handle the processing result + if (copy.getException() != null) { + exchange.setException(copy.getException()); + } else { + // copy the result as its regarded as success + ExchangeHelper.copyResults(exchange, copy); + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SUCCESSFUL_EXECUTION, true); + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_FROM_FALLBACK, false); + } + } catch (Exception e) { + exchange.setException(e); + } finally { + // must done uow + UnitOfWorkHelper.doneUow(uow, copy); + // remember any thrown exception + cause = exchange.getException(); + } + + // and release exchange back in pool + processorExchangeFactory.release(exchange); + + if (cause != null) { + // throw exception so resilient4j know it was a failure + throw RuntimeExchangeException.wrapRuntimeException(cause); + } + return exchange; + } + } + + private final class CircuitBreakerFallbackTask implements PooledExchangeTask, Callable<Exchange> { + + private Exchange exchange; + + @Override + public void prepare(Exchange exchange, AsyncCallback callback) { + this.exchange = exchange; + // callback not in use + } + + @Override + public void reset() { + this.exchange = null; + } + + @Override + public void run() { + // not in use + } + + @Override + public Exchange call() throws Exception { + Throwable throwable = exchange.getException(); + if (fallbackProcessor == null) { + if (throwable instanceof TimeoutException) { + // the circuit breaker triggered a timeout (and there is no + // fallback) so lets mark the exchange as failed + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SUCCESSFUL_EXECUTION, false); + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_FROM_FALLBACK, false); + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SHORT_CIRCUITED, false); + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_TIMED_OUT, true); + exchange.setException(throwable); + return exchange; + } else if (throwable instanceof CircuitBreakerOpenException) { + // the circuit breaker triggered a call rejected + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SUCCESSFUL_EXECUTION, false); + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_FROM_FALLBACK, false); + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SHORT_CIRCUITED, true); + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_REJECTED, true); + return exchange; + } else { + // throw exception so fault tolerance know it was a failure + throw RuntimeExchangeException.wrapRuntimeException(throwable); + } + } + + // fallback route is handling the exception so its short-circuited + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SUCCESSFUL_EXECUTION, false); + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_FROM_FALLBACK, true); + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SHORT_CIRCUITED, true); + + // store the last to endpoint as the failure endpoint + if (exchange.getProperty(ExchangePropertyKey.FAILURE_ENDPOINT) == null) { + exchange.setProperty(ExchangePropertyKey.FAILURE_ENDPOINT, + exchange.getProperty(ExchangePropertyKey.TO_ENDPOINT)); + } + // give the rest of the pipeline another chance + exchange.setProperty(ExchangePropertyKey.EXCEPTION_HANDLED, true); + exchange.setProperty(ExchangePropertyKey.EXCEPTION_CAUGHT, exchange.getException()); + exchange.setRouteStop(false); + exchange.setException(null); + // and we should not be regarded as exhausted as we are in a try .. + // catch block + exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false); + // run the fallback processor + try { + LOG.debug("Running fallback: {} with exchange: {}", fallbackProcessor, exchange); + // process the fallback until its fully done + fallbackProcessor.process(exchange); + LOG.debug("Running fallback: {} with exchange: {} done", fallbackProcessor, exchange); + } catch (Exception e) { + exchange.setException(e); + } + + return exchange; + } + } + +} diff --git a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessorFactory.java similarity index 52% copy from extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java copy to extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessorFactory.java index 05673b1..2b70ca9 100644 --- a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java +++ b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessorFactory.java @@ -14,25 +14,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.quarkus.component.microprofile.fault.tolerance.deployment; +package org.apache.camel.component.microprofile.faulttolerance; -import io.quarkus.deployment.annotations.BuildStep; -import io.quarkus.deployment.builditem.FeatureBuildItem; -import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem; +import org.apache.camel.Processor; +import org.apache.camel.Route; +import org.apache.camel.model.CircuitBreakerDefinition; +import org.apache.camel.support.TypedProcessorFactory; -class MicroprofileFaultToleranceProcessor { - - private static final String FEATURE = "camel-microprofile-fault-tolerance"; +/** + * To integrate camel-microprofile-faulttolerance with the Camel routes using the Circuit Breaker EIP. + */ +public class FaultToleranceProcessorFactory extends TypedProcessorFactory<CircuitBreakerDefinition> { - @BuildStep - FeatureBuildItem feature() { - return new FeatureBuildItem(FEATURE); + public FaultToleranceProcessorFactory() { + super(CircuitBreakerDefinition.class); } - @BuildStep - NativeImageResourceBuildItem initResources() { - return new NativeImageResourceBuildItem( - "META-INF/services/org/apache/camel/model/CircuitBreakerDefinition"); + @Override + public Processor doCreateProcessor(Route route, CircuitBreakerDefinition definition) throws Exception { + return new FaultToleranceReifier(route, definition).createProcessor(); } } diff --git a/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceReifier.java b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceReifier.java new file mode 100644 index 0000000..d7e156b --- /dev/null +++ b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceReifier.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.microprofile.faulttolerance; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutorService; + +import io.smallrye.faulttolerance.core.circuit.breaker.CircuitBreaker; +import org.apache.camel.ExtendedCamelContext; +import org.apache.camel.Processor; +import org.apache.camel.Route; +import org.apache.camel.model.CircuitBreakerDefinition; +import org.apache.camel.model.FaultToleranceConfigurationCommon; +import org.apache.camel.model.FaultToleranceConfigurationDefinition; +import org.apache.camel.model.Model; +import org.apache.camel.reifier.ProcessorReifier; +import org.apache.camel.spi.BeanIntrospection; +import org.apache.camel.spi.ExtendedPropertyConfigurerGetter; +import org.apache.camel.spi.PropertyConfigurer; +import org.apache.camel.support.PropertyBindingSupport; +import org.apache.camel.util.function.Suppliers; + +public class FaultToleranceReifier extends ProcessorReifier<CircuitBreakerDefinition> { + + public FaultToleranceReifier(Route route, CircuitBreakerDefinition definition) { + super(route, definition); + } + + @Override + public Processor createProcessor() throws Exception { + // create the regular and fallback processors + Processor processor = createChildProcessor(true); + Processor fallback = null; + if (definition.getOnFallback() != null) { + fallback = createProcessor(definition.getOnFallback()); + } + boolean fallbackViaNetwork = definition.getOnFallback() != null + && parseBoolean(definition.getOnFallback().getFallbackViaNetwork(), false); + if (fallbackViaNetwork) { + throw new UnsupportedOperationException("camel-microprofile-fault-tolerance does not support onFallbackViaNetwork"); + } + final FaultToleranceConfigurationCommon config = buildFaultToleranceConfiguration(); + + FaultToleranceConfiguration configuration = new FaultToleranceConfiguration(); + configureCircuitBreaker(config, configuration); + configureTimeLimiter(config, configuration); + configureBulkhead(config, configuration); + + FaultToleranceProcessor answer = new FaultToleranceProcessor(configuration, processor, fallback); + // using any existing circuit breakers? + if (config.getCircuitBreakerRef() != null) { + CircuitBreaker cb = mandatoryLookup(parseString(config.getCircuitBreakerRef()), CircuitBreaker.class); + answer.setCircuitBreaker(cb); + } + configureBulkheadExecutorService(answer, config); + return answer; + } + + private void configureCircuitBreaker(FaultToleranceConfigurationCommon config, FaultToleranceConfiguration target) { + target.setDelay(parseDuration(config.getDelay(), 5000)); + target.setSuccessThreshold(parseInt(config.getSuccessThreshold(), 1)); + target.setRequestVolumeThreshold(parseInt(config.getRequestVolumeThreshold(), 20)); + if (config.getFailureRatio() != null) { + float num = parseFloat(config.getFailureRatio(), 50); + if (num < 1 || num > 100) { + throw new IllegalArgumentException("FailureRatio must be between 1 and 100, was: " + num); + } + float percent = num / 100; + target.setFailureRatio(percent); + } else { + target.setFailureRatio(0.5f); + } + } + + private void configureTimeLimiter(FaultToleranceConfigurationCommon config, FaultToleranceConfiguration target) { + if (!parseBoolean(config.getTimeoutEnabled(), false)) { + target.setTimeoutEnabled(false); + } else { + target.setTimeoutEnabled(true); + } + + target.setTimeoutDuration(parseDuration(config.getTimeoutDuration(), 1000)); + target.setTimeoutPoolSize(parseInt(config.getTimeoutPoolSize(), 10)); + } + + private void configureBulkhead(FaultToleranceConfigurationCommon config, FaultToleranceConfiguration target) { + if (!parseBoolean(config.getBulkheadEnabled(), false)) { + return; + } + + target.setBulkheadMaxConcurrentCalls(parseInt(config.getBulkheadMaxConcurrentCalls(), 10)); + target.setBulkheadWaitingTaskQueue(parseInt(config.getBulkheadWaitingTaskQueue(), 10)); + } + + private void configureBulkheadExecutorService(FaultToleranceProcessor processor, FaultToleranceConfigurationCommon config) { + if (!parseBoolean(config.getBulkheadEnabled(), false)) { + return; + } + + if (config.getBulkheadExecutorServiceRef() != null) { + String ref = config.getBulkheadExecutorServiceRef(); + boolean shutdownThreadPool = false; + ExecutorService executorService = lookup(ref, ExecutorService.class); + if (executorService == null) { + executorService = lookupExecutorServiceRef("CircuitBreaker", definition, ref); + shutdownThreadPool = true; + } + processor.setExecutorService(executorService); + processor.setShutdownExecutorService(shutdownThreadPool); + } + } + + // ******************************* + // Helpers + // ******************************* + + FaultToleranceConfigurationDefinition buildFaultToleranceConfiguration() throws Exception { + Map<String, Object> properties = new HashMap<>(); + + final PropertyConfigurer configurer = camelContext.adapt(ExtendedCamelContext.class) + .getConfigurerResolver() + .resolvePropertyConfigurer(FaultToleranceConfigurationDefinition.class.getName(), camelContext); + + // Extract properties from default configuration, the one configured on + // camel context takes the precedence over those in the registry + loadProperties(properties, Suppliers.firstNotNull( + () -> camelContext.getExtension(Model.class).getFaultToleranceConfiguration(null), + () -> lookup(FaultToleranceConstants.DEFAULT_FAULT_TOLERANCE_CONFIGURATION_ID, + FaultToleranceConfigurationDefinition.class)), + configurer); + + // Extract properties from referenced configuration, the one configured + // on camel context takes the precedence over those in the registry + if (definition.getConfigurationRef() != null) { + final String ref = parseString(definition.getConfigurationRef()); + + loadProperties(properties, Suppliers.firstNotNull( + () -> camelContext.getExtension(Model.class).getFaultToleranceConfiguration(ref), + () -> mandatoryLookup(ref, FaultToleranceConfigurationDefinition.class)), + configurer); + } + + // Extract properties from local configuration + loadProperties(properties, Optional.ofNullable(definition.getFaultToleranceConfiguration()), configurer); + + // Apply properties to a new configuration + FaultToleranceConfigurationDefinition config = new FaultToleranceConfigurationDefinition(); + PropertyBindingSupport.build() + .withCamelContext(camelContext) + .withConfigurer(configurer) + .withProperties(properties) + .withTarget(config) + .bind(); + + return config; + } + + private void loadProperties(Map<String, Object> properties, Optional<?> optional, PropertyConfigurer configurer) { + BeanIntrospection beanIntrospection = camelContext.adapt(ExtendedCamelContext.class).getBeanIntrospection(); + optional.ifPresent(bean -> { + if (configurer instanceof ExtendedPropertyConfigurerGetter) { + ExtendedPropertyConfigurerGetter getter = (ExtendedPropertyConfigurerGetter) configurer; + Map<String, Object> types = getter.getAllOptions(bean); + types.forEach((k, t) -> { + Object value = getter.getOptionValue(bean, k, true); + if (value != null) { + properties.put(k, value); + } + }); + } else { + // no configurer found so use bean introspection (reflection) + beanIntrospection.getProperties(bean, properties, null, false); + } + }); + } + +} diff --git a/extensions/microprofile-fault-tolerance/runtime/src/main/resources/META-INF/services/org/apache/camel/model/CircuitBreakerDefinition b/extensions/microprofile-fault-tolerance/runtime/src/main/resources/META-INF/services/org/apache/camel/model/CircuitBreakerDefinition new file mode 100644 index 0000000..c43d558 --- /dev/null +++ b/extensions/microprofile-fault-tolerance/runtime/src/main/resources/META-INF/services/org/apache/camel/model/CircuitBreakerDefinition @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.component.microprofile.faulttolerance.FaultToleranceProcessorFactory diff --git a/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthRecorder.java b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthRecorder.java index e33777b..907c133 100644 --- a/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthRecorder.java +++ b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthRecorder.java @@ -20,7 +20,6 @@ import io.quarkus.runtime.RuntimeValue; import io.quarkus.runtime.annotations.Recorder; import org.apache.camel.CamelContext; import org.apache.camel.health.HealthCheckRegistry; -import org.apache.camel.microprofile.health.CamelMicroProfileHealthCheckRegistry; import org.apache.camel.spi.CamelContextCustomizer; @Recorder @@ -31,7 +30,7 @@ public class CamelMicroProfileHealthRecorder { return new RuntimeValue<>(new CamelContextCustomizer() { @Override public void configure(CamelContext camelContext) { - HealthCheckRegistry registry = new CamelMicroProfileHealthCheckRegistry(camelContext); + HealthCheckRegistry registry = new CamelQuarkusMicroProfileHealthCheckRegistry(camelContext); registry.setId("camel-microprofile-health"); registry.setEnabled(true); diff --git a/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelQuarkusMicroProfileHealthCheckRegistry.java b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelQuarkusMicroProfileHealthCheckRegistry.java new file mode 100644 index 0000000..a0becea --- /dev/null +++ b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelQuarkusMicroProfileHealthCheckRegistry.java @@ -0,0 +1,46 @@ +package org.apache.camel.quarkus.component.microprofile.health.runtime; + +import java.lang.annotation.Annotation; +import java.util.Set; + +import javax.enterprise.inject.spi.Bean; +import javax.enterprise.inject.spi.BeanManager; +import javax.enterprise.inject.spi.CDI; + +import io.smallrye.health.api.HealthRegistry; +import io.smallrye.health.registry.LivenessHealthRegistry; +import io.smallrye.health.registry.ReadinessHealthRegistry; +import org.apache.camel.CamelContext; +import org.apache.camel.microprofile.health.CamelMicroProfileHealthCheckRegistry; +import org.eclipse.microprofile.health.Liveness; +import org.eclipse.microprofile.health.Readiness; + +public class CamelQuarkusMicroProfileHealthCheckRegistry extends CamelMicroProfileHealthCheckRegistry { + + CamelQuarkusMicroProfileHealthCheckRegistry(CamelContext camelContext) { + super(camelContext); + } + + @Override + protected HealthRegistry getLivenessRegistry() { + return getHealthRegistryBean(LivenessHealthRegistry.class, Liveness.Literal.INSTANCE); + } + + @Override + protected HealthRegistry getReadinessRegistry() { + return getHealthRegistryBean(ReadinessHealthRegistry.class, Readiness.Literal.INSTANCE); + } + + private static HealthRegistry getHealthRegistryBean(Class<? extends HealthRegistry> type, Annotation qualifier) { + BeanManager beanManager = CDI.current().getBeanManager(); + Set<Bean<?>> beans = beanManager.getBeans(type, qualifier); + if (beans.isEmpty()) { + throw new IllegalStateException( + "Beans for type " + type.getName() + " with qualifier " + qualifier + " could not be found."); + } + + Bean<?> bean = beanManager.resolve(beans); + Object reference = beanManager.getReference(bean, type, beanManager.createCreationalContext(bean)); + return type.cast(reference); + } +}