This is an automated email from the ASF dual-hosted git repository. github-bot pushed a commit to branch quarkus-main in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
commit c49f8bfcd593f385c66ff0368b4532953f78dc8c Author: James Netherton <jamesnether...@gmail.com> AuthorDate: Fri Feb 11 10:08:03 2022 +0000 Temporary hacks to handle version misalignment of smallrye-health and smallrye-faulttolerance in Quarkus & Camel --- .../MicroprofileFaultToleranceProcessor.java | 9 + .../microprofile-fault-tolerance/runtime/pom.xml | 2 + .../FaultToleranceConfiguration.java | 120 +++++ .../faulttolerance/FaultToleranceConstants.java} | 21 +- .../faulttolerance/FaultToleranceProcessor.java | 536 +++++++++++++++++++++ .../FaultToleranceProcessorFactory.java} | 28 +- .../faulttolerance/FaultToleranceReifier.java | 193 ++++++++ .../apache/camel/model/CircuitBreakerDefinition | 18 + .../deployment/MicroProfileHealthEnabledTest.java | 4 +- .../runtime/CamelMicroProfileHealthCheck.java | 67 +++ .../runtime/CamelMicroProfileHealthHelper.java | 63 +++ .../runtime/CamelMicroProfileHealthRecorder.java | 3 +- .../CamelMicroProfileRepositoryHealthCheck.java | 72 +++ ...amelQuarkusMicroProfileHealthCheckRegistry.java | 185 +++++++ .../it/CoreFaultToleranceProducers.java | 4 +- 15 files changed, 1287 insertions(+), 38 deletions(-) diff --git a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java b/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java index 05673b1..6e8c382 100644 --- a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java +++ b/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java @@ -16,9 +16,13 @@ */ package org.apache.camel.quarkus.component.microprofile.fault.tolerance.deployment; +import java.nio.file.Paths; + import io.quarkus.deployment.annotations.BuildStep; import io.quarkus.deployment.builditem.FeatureBuildItem; import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem; +import org.apache.camel.component.microprofile.faulttolerance.FaultToleranceProcessorFactory; +import org.apache.camel.quarkus.core.deployment.spi.CamelServiceBuildItem; class MicroprofileFaultToleranceProcessor { @@ -35,4 +39,9 @@ class MicroprofileFaultToleranceProcessor { "META-INF/services/org/apache/camel/model/CircuitBreakerDefinition"); } + @BuildStep + CamelServiceBuildItem camelCronServicePattern() { + return new CamelServiceBuildItem(Paths.get("META-INF/services/org/apache/camel/model/CircuitBreakerDefinition"), + FaultToleranceProcessorFactory.class.getName()); + } } diff --git a/extensions/microprofile-fault-tolerance/runtime/pom.xml b/extensions/microprofile-fault-tolerance/runtime/pom.xml index 3401f07..22e3962 100644 --- a/extensions/microprofile-fault-tolerance/runtime/pom.xml +++ b/extensions/microprofile-fault-tolerance/runtime/pom.xml @@ -56,10 +56,12 @@ <groupId>org.apache.camel.quarkus</groupId> <artifactId>camel-quarkus-core</artifactId> </dependency> + <!-- Not compatible with Quarkus 2.7.x <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-microprofile-fault-tolerance</artifactId> </dependency> + --> </dependencies> <build> diff --git a/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConfiguration.java b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConfiguration.java new file mode 100644 index 0000000..7cb3d4d --- /dev/null +++ b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConfiguration.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.microprofile.faulttolerance; + +public class FaultToleranceConfiguration { + + private long delay; + private int successThreshold; + private int requestVolumeThreshold; + private float failureRatio; + private boolean timeoutEnabled; + private long timeoutDuration; + private int timeoutPoolSize; + private String timeoutExecutorServiceRef; + private boolean bulkheadEnabled; + private int bulkheadMaxConcurrentCalls; + private int bulkheadWaitingTaskQueue; + + public long getDelay() { + return delay; + } + + public void setDelay(long delay) { + this.delay = delay; + } + + public int getSuccessThreshold() { + return successThreshold; + } + + public void setSuccessThreshold(int successThreshold) { + this.successThreshold = successThreshold; + } + + public int getRequestVolumeThreshold() { + return requestVolumeThreshold; + } + + public void setRequestVolumeThreshold(int requestVolumeThreshold) { + this.requestVolumeThreshold = requestVolumeThreshold; + } + + public float getFailureRatio() { + return failureRatio; + } + + public void setFailureRatio(float failureRatio) { + this.failureRatio = failureRatio; + } + + public boolean isTimeoutEnabled() { + return timeoutEnabled; + } + + public void setTimeoutEnabled(boolean timeoutEnabled) { + this.timeoutEnabled = timeoutEnabled; + } + + public long getTimeoutDuration() { + return timeoutDuration; + } + + public void setTimeoutDuration(long timeoutDuration) { + this.timeoutDuration = timeoutDuration; + } + + public int getTimeoutPoolSize() { + return timeoutPoolSize; + } + + public void setTimeoutPoolSize(int timeoutPoolSize) { + this.timeoutPoolSize = timeoutPoolSize; + } + + public String getTimeoutExecutorServiceRef() { + return timeoutExecutorServiceRef; + } + + public void setTimeoutExecutorServiceRef(String timeoutExecutorServiceRef) { + this.timeoutExecutorServiceRef = timeoutExecutorServiceRef; + } + + public boolean isBulkheadEnabled() { + return bulkheadEnabled; + } + + public void setBulkheadEnabled(boolean bulkheadEnabled) { + this.bulkheadEnabled = bulkheadEnabled; + } + + public int getBulkheadMaxConcurrentCalls() { + return bulkheadMaxConcurrentCalls; + } + + public void setBulkheadMaxConcurrentCalls(int bulkheadMaxConcurrentCalls) { + this.bulkheadMaxConcurrentCalls = bulkheadMaxConcurrentCalls; + } + + public int getBulkheadWaitingTaskQueue() { + return bulkheadWaitingTaskQueue; + } + + public void setBulkheadWaitingTaskQueue(int bulkheadWaitingTaskQueue) { + this.bulkheadWaitingTaskQueue = bulkheadWaitingTaskQueue; + } +} diff --git a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConstants.java similarity index 53% copy from extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java copy to extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConstants.java index 05673b1..3bb0027 100644 --- a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java +++ b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConstants.java @@ -14,25 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.quarkus.component.microprofile.fault.tolerance.deployment; +package org.apache.camel.component.microprofile.faulttolerance; -import io.quarkus.deployment.annotations.BuildStep; -import io.quarkus.deployment.builditem.FeatureBuildItem; -import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem; +public interface FaultToleranceConstants { -class MicroprofileFaultToleranceProcessor { - - private static final String FEATURE = "camel-microprofile-fault-tolerance"; - - @BuildStep - FeatureBuildItem feature() { - return new FeatureBuildItem(FEATURE); - } - - @BuildStep - NativeImageResourceBuildItem initResources() { - return new NativeImageResourceBuildItem( - "META-INF/services/org/apache/camel/model/CircuitBreakerDefinition"); - } + String DEFAULT_FAULT_TOLERANCE_CONFIGURATION_ID = "fault-tolerance-configuration"; } diff --git a/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java new file mode 100644 index 0000000..e1ff645 --- /dev/null +++ b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java @@ -0,0 +1,536 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.microprofile.faulttolerance; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +import io.smallrye.faulttolerance.core.FaultToleranceStrategy; +import io.smallrye.faulttolerance.core.InvocationContext; +import io.smallrye.faulttolerance.core.bulkhead.FutureThreadPoolBulkhead; +import io.smallrye.faulttolerance.core.circuit.breaker.CircuitBreaker; +import io.smallrye.faulttolerance.core.fallback.Fallback; +import io.smallrye.faulttolerance.core.stopwatch.SystemStopwatch; +import io.smallrye.faulttolerance.core.timeout.ScheduledExecutorTimeoutWatcher; +import io.smallrye.faulttolerance.core.timeout.Timeout; +import io.smallrye.faulttolerance.core.timeout.TimeoutWatcher; +import io.smallrye.faulttolerance.core.util.ExceptionDecision; +import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePropertyKey; +import org.apache.camel.ExtendedCamelContext; +import org.apache.camel.ExtendedExchange; +import org.apache.camel.Navigate; +import org.apache.camel.Processor; +import org.apache.camel.Route; +import org.apache.camel.RuntimeExchangeException; +import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.processor.PooledExchangeTask; +import org.apache.camel.processor.PooledExchangeTaskFactory; +import org.apache.camel.processor.PooledTaskFactory; +import org.apache.camel.processor.PrototypeTaskFactory; +import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.ProcessorExchangeFactory; +import org.apache.camel.spi.RouteIdAware; +import org.apache.camel.spi.UnitOfWork; +import org.apache.camel.support.AsyncProcessorSupport; +import org.apache.camel.support.ExchangeHelper; +import org.apache.camel.support.UnitOfWorkHelper; +import org.apache.camel.support.service.ServiceHelper; +import org.apache.camel.util.ObjectHelper; +import org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException; +import org.eclipse.microprofile.faulttolerance.exceptions.TimeoutException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static io.smallrye.faulttolerance.core.Invocation.invocation; + +/** + * Implementation of Circuit Breaker EIP using microprofile fault tolerance. + */ +@ManagedResource(description = "Managed FaultTolerance Processor") +public class FaultToleranceProcessor extends AsyncProcessorSupport + implements CamelContextAware, Navigate<Processor>, org.apache.camel.Traceable, IdAware, RouteIdAware { + + private static final Logger LOG = LoggerFactory.getLogger(FaultToleranceProcessor.class); + + private volatile CircuitBreaker circuitBreaker; + private CamelContext camelContext; + private String id; + private String routeId; + private final FaultToleranceConfiguration config; + private final Processor processor; + private final Processor fallbackProcessor; + private ScheduledExecutorService scheduledExecutorService; + private boolean shutdownScheduledExecutorService; + private ExecutorService executorService; + private boolean shutdownExecutorService; + private ProcessorExchangeFactory processorExchangeFactory; + private PooledExchangeTaskFactory taskFactory; + private PooledExchangeTaskFactory fallbackTaskFactory; + + public FaultToleranceProcessor(FaultToleranceConfiguration config, Processor processor, + Processor fallbackProcessor) { + this.config = config; + this.processor = processor; + this.fallbackProcessor = fallbackProcessor; + } + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public String getId() { + return id; + } + + @Override + public void setId(String id) { + this.id = id; + } + + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + public CircuitBreaker getCircuitBreaker() { + return circuitBreaker; + } + + public void setCircuitBreaker(CircuitBreaker circuitBreaker) { + this.circuitBreaker = circuitBreaker; + } + + public boolean isShutdownExecutorService() { + return shutdownExecutorService; + } + + public void setShutdownExecutorService(boolean shutdownExecutorService) { + this.shutdownExecutorService = shutdownExecutorService; + } + + public ExecutorService getExecutorService() { + return executorService; + } + + public void setExecutorService(ExecutorService executorService) { + this.executorService = executorService; + } + + @Override + public String getTraceLabel() { + return "faultTolerance"; + } + + @ManagedAttribute(description = "Returns the current delay in milliseconds.") + public long getDelay() { + return config.getDelay(); + } + + @ManagedAttribute(description = "Returns the current failure rate in percentage.") + public float getFailureRate() { + return config.getFailureRatio(); + } + + @ManagedAttribute(description = "Returns the current request volume threshold.") + public int getRequestVolumeThreshold() { + return config.getRequestVolumeThreshold(); + } + + @ManagedAttribute(description = "Returns the current success threshold.") + public int getSuccessThreshold() { + return config.getSuccessThreshold(); + } + + @ManagedAttribute(description = "Is timeout enabled") + public boolean isTimeoutEnabled() { + return config.isTimeoutEnabled(); + } + + @ManagedAttribute(description = "The timeout wait duration") + public long getTimeoutDuration() { + return config.getTimeoutDuration(); + } + + @ManagedAttribute(description = "The timeout pool size for the thread pool") + public int getTimeoutPoolSize() { + return config.getTimeoutPoolSize(); + } + + @ManagedAttribute(description = "Is bulkhead enabled") + public boolean isBulkheadEnabled() { + return config.isBulkheadEnabled(); + } + + @ManagedAttribute(description = "The max amount of concurrent calls the bulkhead will support.") + public int getBulkheadMaxConcurrentCalls() { + return config.getBulkheadMaxConcurrentCalls(); + } + + @ManagedAttribute(description = "The task queue size for holding waiting tasks to be processed by the bulkhead") + public int getBulkheadWaitingTaskQueue() { + return config.getBulkheadWaitingTaskQueue(); + } + + @Override + public List<Processor> next() { + if (!hasNext()) { + return null; + } + List<Processor> answer = new ArrayList<>(); + answer.add(processor); + if (fallbackProcessor != null) { + answer.add(fallbackProcessor); + } + return answer; + } + + @Override + public boolean hasNext() { + return true; + } + + @Override + @SuppressWarnings("unchecked") + public boolean process(Exchange exchange, AsyncCallback callback) { + // run this as if we run inside try .. catch so there is no regular + // Camel error handler + exchange.setProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK, true); + + CircuitBreakerFallbackTask fallbackTask = null; + CircuitBreakerTask task = (CircuitBreakerTask) taskFactory.acquire(exchange, callback); + + // circuit breaker + FaultToleranceStrategy target = circuitBreaker; + + // 1. bulkhead + if (config.isBulkheadEnabled()) { + target = new FutureThreadPoolBulkhead( + target, "bulkhead", config.getBulkheadMaxConcurrentCalls(), + config.getBulkheadWaitingTaskQueue()); + } + // 2. timeout + if (config.isTimeoutEnabled()) { + TimeoutWatcher watcher = new ScheduledExecutorTimeoutWatcher(scheduledExecutorService); + target = new Timeout(target, "timeout", config.getTimeoutDuration(), watcher); + } + // 3. fallback + if (fallbackProcessor != null) { + fallbackTask = (CircuitBreakerFallbackTask) fallbackTaskFactory.acquire(exchange, callback); + final CircuitBreakerFallbackTask fFallbackTask = fallbackTask; + target = new Fallback(target, "fallback", fallbackContext -> { + exchange.setException(fallbackContext.failure); + return fFallbackTask.call(); + }, ExceptionDecision.ALWAYS_FAILURE); + } + + try { + target.apply(new InvocationContext(task)); + } catch (CircuitBreakerOpenException e) { + // the circuit breaker triggered a call rejected + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SUCCESSFUL_EXECUTION, false); + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_FROM_FALLBACK, false); + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SHORT_CIRCUITED, true); + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_REJECTED, true); + } catch (Exception e) { + // some other kind of exception + exchange.setException(e); + } finally { + taskFactory.release(task); + if (fallbackTask != null) { + fallbackTaskFactory.release(fallbackTask); + } + } + + exchange.removeProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK); + callback.done(true); + return true; + } + + @Override + protected void doBuild() throws Exception { + ObjectHelper.notNull(camelContext, "CamelContext", this); + + boolean pooled = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled(); + if (pooled) { + int capacity = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity(); + taskFactory = new PooledTaskFactory(getId()) { + @Override + public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { + return new CircuitBreakerTask(); + } + }; + taskFactory.setCapacity(capacity); + fallbackTaskFactory = new PooledTaskFactory(getId()) { + @Override + public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { + return new CircuitBreakerFallbackTask(); + } + }; + fallbackTaskFactory.setCapacity(capacity); + } else { + taskFactory = new PrototypeTaskFactory() { + @Override + public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { + return new CircuitBreakerTask(); + } + }; + fallbackTaskFactory = new PrototypeTaskFactory() { + @Override + public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { + return new CircuitBreakerFallbackTask(); + } + }; + } + + // create a per processor exchange factory + this.processorExchangeFactory = getCamelContext().adapt(ExtendedCamelContext.class) + .getProcessorExchangeFactory().newProcessorExchangeFactory(this); + this.processorExchangeFactory.setRouteId(getRouteId()); + this.processorExchangeFactory.setId(getId()); + + ServiceHelper.buildService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor); + } + + @Override + @SuppressWarnings("unchecked") + protected void doInit() throws Exception { + ObjectHelper.notNull(camelContext, "CamelContext", this); + if (circuitBreaker == null) { + circuitBreaker = new CircuitBreaker( + invocation(), id, ExceptionDecision.ALWAYS_FAILURE, config.getDelay(), config.getRequestVolumeThreshold(), + config.getFailureRatio(), + config.getSuccessThreshold(), new SystemStopwatch()); + } + + ServiceHelper.initService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor); + } + + @Override + protected void doStart() throws Exception { + if (config.isTimeoutEnabled() && scheduledExecutorService == null) { + scheduledExecutorService = getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this, + "CircuitBreakerTimeout", config.getTimeoutPoolSize()); + shutdownScheduledExecutorService = true; + } + if (config.isBulkheadEnabled() && executorService == null) { + executorService = getCamelContext().getExecutorServiceManager().newThreadPool(this, "CircuitBreakerBulkhead", + config.getBulkheadMaxConcurrentCalls(), config.getBulkheadMaxConcurrentCalls()); + shutdownExecutorService = true; + } + + ServiceHelper.startService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor); + } + + @Override + protected void doStop() throws Exception { + if (shutdownScheduledExecutorService && scheduledExecutorService != null) { + getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutorService); + scheduledExecutorService = null; + } + if (shutdownExecutorService && executorService != null) { + getCamelContext().getExecutorServiceManager().shutdownNow(executorService); + executorService = null; + } + + ServiceHelper.stopService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor); + } + + @Override + protected void doShutdown() throws Exception { + ServiceHelper.stopAndShutdownServices(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor); + } + + private final class CircuitBreakerTask implements PooledExchangeTask, Callable<Exchange> { + + private Exchange exchange; + + @Override + public void prepare(Exchange exchange, AsyncCallback callback) { + this.exchange = exchange; + // callback not in use + } + + @Override + public void reset() { + this.exchange = null; + } + + @Override + public void run() { + // not in use + } + + @Override + public Exchange call() throws Exception { + Exchange copy = null; + UnitOfWork uow = null; + Throwable cause; + + // turn of interruption to allow fault tolerance to process the exchange under its handling + exchange.adapt(ExtendedExchange.class).setInterruptable(false); + + try { + LOG.debug("Running processor: {} with exchange: {}", processor, exchange); + + // prepare a copy of exchange so downstream processors don't + // cause side-effects if they mutate the exchange + // in case timeout processing and continue with the fallback etc + copy = processorExchangeFactory.createCorrelatedCopy(exchange, false); + if (copy.getUnitOfWork() != null) { + uow = copy.getUnitOfWork(); + } else { + // prepare uow on copy + uow = copy.getContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory().createUnitOfWork(copy); + copy.adapt(ExtendedExchange.class).setUnitOfWork(uow); + // the copy must be starting from the route where its copied from + Route route = ExchangeHelper.getRoute(exchange); + if (route != null) { + uow.pushRoute(route); + } + } + + // process the processor until its fully done + processor.process(copy); + + // handle the processing result + if (copy.getException() != null) { + exchange.setException(copy.getException()); + } else { + // copy the result as its regarded as success + ExchangeHelper.copyResults(exchange, copy); + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SUCCESSFUL_EXECUTION, true); + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_FROM_FALLBACK, false); + } + } catch (Exception e) { + exchange.setException(e); + } finally { + // must done uow + UnitOfWorkHelper.doneUow(uow, copy); + // remember any thrown exception + cause = exchange.getException(); + } + + // and release exchange back in pool + processorExchangeFactory.release(exchange); + + if (cause != null) { + // throw exception so resilient4j know it was a failure + throw RuntimeExchangeException.wrapRuntimeException(cause); + } + return exchange; + } + } + + private final class CircuitBreakerFallbackTask implements PooledExchangeTask, Callable<Exchange> { + + private Exchange exchange; + + @Override + public void prepare(Exchange exchange, AsyncCallback callback) { + this.exchange = exchange; + // callback not in use + } + + @Override + public void reset() { + this.exchange = null; + } + + @Override + public void run() { + // not in use + } + + @Override + public Exchange call() throws Exception { + Throwable throwable = exchange.getException(); + if (fallbackProcessor == null) { + if (throwable instanceof TimeoutException) { + // the circuit breaker triggered a timeout (and there is no + // fallback) so lets mark the exchange as failed + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SUCCESSFUL_EXECUTION, false); + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_FROM_FALLBACK, false); + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SHORT_CIRCUITED, false); + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_TIMED_OUT, true); + exchange.setException(throwable); + return exchange; + } else if (throwable instanceof CircuitBreakerOpenException) { + // the circuit breaker triggered a call rejected + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SUCCESSFUL_EXECUTION, false); + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_FROM_FALLBACK, false); + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SHORT_CIRCUITED, true); + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_REJECTED, true); + return exchange; + } else { + // throw exception so fault tolerance know it was a failure + throw RuntimeExchangeException.wrapRuntimeException(throwable); + } + } + + // fallback route is handling the exception so its short-circuited + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SUCCESSFUL_EXECUTION, false); + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_FROM_FALLBACK, true); + exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SHORT_CIRCUITED, true); + + // store the last to endpoint as the failure endpoint + if (exchange.getProperty(ExchangePropertyKey.FAILURE_ENDPOINT) == null) { + exchange.setProperty(ExchangePropertyKey.FAILURE_ENDPOINT, + exchange.getProperty(ExchangePropertyKey.TO_ENDPOINT)); + } + // give the rest of the pipeline another chance + exchange.setProperty(ExchangePropertyKey.EXCEPTION_HANDLED, true); + exchange.setProperty(ExchangePropertyKey.EXCEPTION_CAUGHT, exchange.getException()); + exchange.setRouteStop(false); + exchange.setException(null); + // and we should not be regarded as exhausted as we are in a try .. + // catch block + exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false); + // run the fallback processor + try { + LOG.debug("Running fallback: {} with exchange: {}", fallbackProcessor, exchange); + // process the fallback until its fully done + fallbackProcessor.process(exchange); + LOG.debug("Running fallback: {} with exchange: {} done", fallbackProcessor, exchange); + } catch (Exception e) { + exchange.setException(e); + } + + return exchange; + } + } + +} diff --git a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessorFactory.java similarity index 52% copy from extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java copy to extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessorFactory.java index 05673b1..2b70ca9 100644 --- a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java +++ b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessorFactory.java @@ -14,25 +14,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.quarkus.component.microprofile.fault.tolerance.deployment; +package org.apache.camel.component.microprofile.faulttolerance; -import io.quarkus.deployment.annotations.BuildStep; -import io.quarkus.deployment.builditem.FeatureBuildItem; -import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem; +import org.apache.camel.Processor; +import org.apache.camel.Route; +import org.apache.camel.model.CircuitBreakerDefinition; +import org.apache.camel.support.TypedProcessorFactory; -class MicroprofileFaultToleranceProcessor { - - private static final String FEATURE = "camel-microprofile-fault-tolerance"; +/** + * To integrate camel-microprofile-faulttolerance with the Camel routes using the Circuit Breaker EIP. + */ +public class FaultToleranceProcessorFactory extends TypedProcessorFactory<CircuitBreakerDefinition> { - @BuildStep - FeatureBuildItem feature() { - return new FeatureBuildItem(FEATURE); + public FaultToleranceProcessorFactory() { + super(CircuitBreakerDefinition.class); } - @BuildStep - NativeImageResourceBuildItem initResources() { - return new NativeImageResourceBuildItem( - "META-INF/services/org/apache/camel/model/CircuitBreakerDefinition"); + @Override + public Processor doCreateProcessor(Route route, CircuitBreakerDefinition definition) throws Exception { + return new FaultToleranceReifier(route, definition).createProcessor(); } } diff --git a/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceReifier.java b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceReifier.java new file mode 100644 index 0000000..d7e156b --- /dev/null +++ b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceReifier.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.microprofile.faulttolerance; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutorService; + +import io.smallrye.faulttolerance.core.circuit.breaker.CircuitBreaker; +import org.apache.camel.ExtendedCamelContext; +import org.apache.camel.Processor; +import org.apache.camel.Route; +import org.apache.camel.model.CircuitBreakerDefinition; +import org.apache.camel.model.FaultToleranceConfigurationCommon; +import org.apache.camel.model.FaultToleranceConfigurationDefinition; +import org.apache.camel.model.Model; +import org.apache.camel.reifier.ProcessorReifier; +import org.apache.camel.spi.BeanIntrospection; +import org.apache.camel.spi.ExtendedPropertyConfigurerGetter; +import org.apache.camel.spi.PropertyConfigurer; +import org.apache.camel.support.PropertyBindingSupport; +import org.apache.camel.util.function.Suppliers; + +public class FaultToleranceReifier extends ProcessorReifier<CircuitBreakerDefinition> { + + public FaultToleranceReifier(Route route, CircuitBreakerDefinition definition) { + super(route, definition); + } + + @Override + public Processor createProcessor() throws Exception { + // create the regular and fallback processors + Processor processor = createChildProcessor(true); + Processor fallback = null; + if (definition.getOnFallback() != null) { + fallback = createProcessor(definition.getOnFallback()); + } + boolean fallbackViaNetwork = definition.getOnFallback() != null + && parseBoolean(definition.getOnFallback().getFallbackViaNetwork(), false); + if (fallbackViaNetwork) { + throw new UnsupportedOperationException("camel-microprofile-fault-tolerance does not support onFallbackViaNetwork"); + } + final FaultToleranceConfigurationCommon config = buildFaultToleranceConfiguration(); + + FaultToleranceConfiguration configuration = new FaultToleranceConfiguration(); + configureCircuitBreaker(config, configuration); + configureTimeLimiter(config, configuration); + configureBulkhead(config, configuration); + + FaultToleranceProcessor answer = new FaultToleranceProcessor(configuration, processor, fallback); + // using any existing circuit breakers? + if (config.getCircuitBreakerRef() != null) { + CircuitBreaker cb = mandatoryLookup(parseString(config.getCircuitBreakerRef()), CircuitBreaker.class); + answer.setCircuitBreaker(cb); + } + configureBulkheadExecutorService(answer, config); + return answer; + } + + private void configureCircuitBreaker(FaultToleranceConfigurationCommon config, FaultToleranceConfiguration target) { + target.setDelay(parseDuration(config.getDelay(), 5000)); + target.setSuccessThreshold(parseInt(config.getSuccessThreshold(), 1)); + target.setRequestVolumeThreshold(parseInt(config.getRequestVolumeThreshold(), 20)); + if (config.getFailureRatio() != null) { + float num = parseFloat(config.getFailureRatio(), 50); + if (num < 1 || num > 100) { + throw new IllegalArgumentException("FailureRatio must be between 1 and 100, was: " + num); + } + float percent = num / 100; + target.setFailureRatio(percent); + } else { + target.setFailureRatio(0.5f); + } + } + + private void configureTimeLimiter(FaultToleranceConfigurationCommon config, FaultToleranceConfiguration target) { + if (!parseBoolean(config.getTimeoutEnabled(), false)) { + target.setTimeoutEnabled(false); + } else { + target.setTimeoutEnabled(true); + } + + target.setTimeoutDuration(parseDuration(config.getTimeoutDuration(), 1000)); + target.setTimeoutPoolSize(parseInt(config.getTimeoutPoolSize(), 10)); + } + + private void configureBulkhead(FaultToleranceConfigurationCommon config, FaultToleranceConfiguration target) { + if (!parseBoolean(config.getBulkheadEnabled(), false)) { + return; + } + + target.setBulkheadMaxConcurrentCalls(parseInt(config.getBulkheadMaxConcurrentCalls(), 10)); + target.setBulkheadWaitingTaskQueue(parseInt(config.getBulkheadWaitingTaskQueue(), 10)); + } + + private void configureBulkheadExecutorService(FaultToleranceProcessor processor, FaultToleranceConfigurationCommon config) { + if (!parseBoolean(config.getBulkheadEnabled(), false)) { + return; + } + + if (config.getBulkheadExecutorServiceRef() != null) { + String ref = config.getBulkheadExecutorServiceRef(); + boolean shutdownThreadPool = false; + ExecutorService executorService = lookup(ref, ExecutorService.class); + if (executorService == null) { + executorService = lookupExecutorServiceRef("CircuitBreaker", definition, ref); + shutdownThreadPool = true; + } + processor.setExecutorService(executorService); + processor.setShutdownExecutorService(shutdownThreadPool); + } + } + + // ******************************* + // Helpers + // ******************************* + + FaultToleranceConfigurationDefinition buildFaultToleranceConfiguration() throws Exception { + Map<String, Object> properties = new HashMap<>(); + + final PropertyConfigurer configurer = camelContext.adapt(ExtendedCamelContext.class) + .getConfigurerResolver() + .resolvePropertyConfigurer(FaultToleranceConfigurationDefinition.class.getName(), camelContext); + + // Extract properties from default configuration, the one configured on + // camel context takes the precedence over those in the registry + loadProperties(properties, Suppliers.firstNotNull( + () -> camelContext.getExtension(Model.class).getFaultToleranceConfiguration(null), + () -> lookup(FaultToleranceConstants.DEFAULT_FAULT_TOLERANCE_CONFIGURATION_ID, + FaultToleranceConfigurationDefinition.class)), + configurer); + + // Extract properties from referenced configuration, the one configured + // on camel context takes the precedence over those in the registry + if (definition.getConfigurationRef() != null) { + final String ref = parseString(definition.getConfigurationRef()); + + loadProperties(properties, Suppliers.firstNotNull( + () -> camelContext.getExtension(Model.class).getFaultToleranceConfiguration(ref), + () -> mandatoryLookup(ref, FaultToleranceConfigurationDefinition.class)), + configurer); + } + + // Extract properties from local configuration + loadProperties(properties, Optional.ofNullable(definition.getFaultToleranceConfiguration()), configurer); + + // Apply properties to a new configuration + FaultToleranceConfigurationDefinition config = new FaultToleranceConfigurationDefinition(); + PropertyBindingSupport.build() + .withCamelContext(camelContext) + .withConfigurer(configurer) + .withProperties(properties) + .withTarget(config) + .bind(); + + return config; + } + + private void loadProperties(Map<String, Object> properties, Optional<?> optional, PropertyConfigurer configurer) { + BeanIntrospection beanIntrospection = camelContext.adapt(ExtendedCamelContext.class).getBeanIntrospection(); + optional.ifPresent(bean -> { + if (configurer instanceof ExtendedPropertyConfigurerGetter) { + ExtendedPropertyConfigurerGetter getter = (ExtendedPropertyConfigurerGetter) configurer; + Map<String, Object> types = getter.getAllOptions(bean); + types.forEach((k, t) -> { + Object value = getter.getOptionValue(bean, k, true); + if (value != null) { + properties.put(k, value); + } + }); + } else { + // no configurer found so use bean introspection (reflection) + beanIntrospection.getProperties(bean, properties, null, false); + } + }); + } + +} diff --git a/extensions/microprofile-fault-tolerance/runtime/src/main/resources/META-INF/services/org/apache/camel/model/CircuitBreakerDefinition b/extensions/microprofile-fault-tolerance/runtime/src/main/resources/META-INF/services/org/apache/camel/model/CircuitBreakerDefinition new file mode 100644 index 0000000..c43d558 --- /dev/null +++ b/extensions/microprofile-fault-tolerance/runtime/src/main/resources/META-INF/services/org/apache/camel/model/CircuitBreakerDefinition @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.component.microprofile.faulttolerance.FaultToleranceProcessorFactory diff --git a/extensions/microprofile-health/deployment/src/test/java/org/apache/camel/quarkus/component/microprofile/health/deployment/MicroProfileHealthEnabledTest.java b/extensions/microprofile-health/deployment/src/test/java/org/apache/camel/quarkus/component/microprofile/health/deployment/MicroProfileHealthEnabledTest.java index cbd278d..1be17f3 100644 --- a/extensions/microprofile-health/deployment/src/test/java/org/apache/camel/quarkus/component/microprofile/health/deployment/MicroProfileHealthEnabledTest.java +++ b/extensions/microprofile-health/deployment/src/test/java/org/apache/camel/quarkus/component/microprofile/health/deployment/MicroProfileHealthEnabledTest.java @@ -24,7 +24,7 @@ import org.apache.camel.health.HealthCheckRegistry; import org.apache.camel.impl.health.ConsumersHealthCheckRepository; import org.apache.camel.impl.health.ContextHealthCheck; import org.apache.camel.impl.health.RoutesHealthCheckRepository; -import org.apache.camel.microprofile.health.CamelMicroProfileHealthCheckRegistry; +import org.apache.camel.quarkus.component.microprofile.health.runtime.CamelQuarkusMicroProfileHealthCheckRegistry; import org.jboss.shrinkwrap.api.ShrinkWrap; import org.jboss.shrinkwrap.api.spec.JavaArchive; import org.junit.jupiter.api.Test; @@ -47,7 +47,7 @@ public class MicroProfileHealthEnabledTest { public void healthCheckRegistryNotNull() { HealthCheckRegistry registry = HealthCheckRegistry.get(context); assertNotNull(registry); - assertTrue(registry instanceof CamelMicroProfileHealthCheckRegistry); + assertTrue(registry instanceof CamelQuarkusMicroProfileHealthCheckRegistry); assertEquals("camel-microprofile-health", registry.getId()); } diff --git a/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthCheck.java b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthCheck.java new file mode 100644 index 0000000..03dbfa9 --- /dev/null +++ b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthCheck.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.microprofile.health.runtime; + +import java.util.Map; + +import org.apache.camel.impl.health.AbstractHealthCheck; +import org.eclipse.microprofile.health.HealthCheck; +import org.eclipse.microprofile.health.HealthCheckResponse; +import org.eclipse.microprofile.health.HealthCheckResponseBuilder; + +import static org.apache.camel.health.HealthCheck.Result; +import static org.apache.camel.health.HealthCheck.State; + +/** + * A MicroProfile {@link HealthCheck} that invokes the supplied Camel health check, reports its health status and + * associated details. + */ +final class CamelMicroProfileHealthCheck implements HealthCheck { + + private final org.apache.camel.health.HealthCheck camelHealthCheck; + + CamelMicroProfileHealthCheck(org.apache.camel.health.HealthCheck camelHealthCheck) { + this.camelHealthCheck = camelHealthCheck; + } + + @Override + public HealthCheckResponse call() { + final HealthCheckResponseBuilder builder = HealthCheckResponse.builder(); + builder.name(camelHealthCheck.getId()); + builder.up(); + + Result result = camelHealthCheck.call(); + Map<String, Object> details = result.getDetails(); + boolean enabled = true; + + if (details.containsKey(AbstractHealthCheck.CHECK_ENABLED)) { + enabled = (boolean) details.get(AbstractHealthCheck.CHECK_ENABLED); + } + + if (enabled) { + CamelMicroProfileHealthHelper.applyHealthDetail(builder, result); + + if (result.getState() == State.DOWN) { + builder.down(); + } + } else { + builder.withData(AbstractHealthCheck.CHECK_ENABLED, false); + } + + return builder.build(); + } +} diff --git a/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthHelper.java b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthHelper.java new file mode 100644 index 0000000..43359a5 --- /dev/null +++ b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthHelper.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.microprofile.health.runtime; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Set; + +import org.apache.camel.health.HealthCheck; +import org.apache.camel.health.HealthCheck.Result; +import org.eclipse.microprofile.health.HealthCheckResponseBuilder; + +/** + * Helper utility class for MicroProfile health checks. + */ +final class CamelMicroProfileHealthHelper { + + private CamelMicroProfileHealthHelper() { + // Utility class + } + + /** + * Propagates details from the Camel Health {@link Result} to the MicroProfile {@link HealthCheckResponseBuilder}. + * + * @param builder The health check response builder + * @param result The Camel health check result + */ + public static void applyHealthDetail(HealthCheckResponseBuilder builder, Result result) { + HealthCheck check = result.getCheck(); + Set<String> metaKeys = check.getMetaData().keySet(); + + result.getDetails().forEach((key, value) -> { + // Filter health check metadata to have a less verbose output + if (!metaKeys.contains(key)) { + builder.withData(key, value.toString()); + } + }); + + result.getError().ifPresent(error -> { + builder.withData("error.message", error.getMessage()); + + final StringWriter stackTraceWriter = new StringWriter(); + try (final PrintWriter pw = new PrintWriter(stackTraceWriter, true)) { + error.printStackTrace(pw); + builder.withData("error.stacktrace", stackTraceWriter.toString()); + } + }); + } +} diff --git a/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthRecorder.java b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthRecorder.java index e33777b..907c133 100644 --- a/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthRecorder.java +++ b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthRecorder.java @@ -20,7 +20,6 @@ import io.quarkus.runtime.RuntimeValue; import io.quarkus.runtime.annotations.Recorder; import org.apache.camel.CamelContext; import org.apache.camel.health.HealthCheckRegistry; -import org.apache.camel.microprofile.health.CamelMicroProfileHealthCheckRegistry; import org.apache.camel.spi.CamelContextCustomizer; @Recorder @@ -31,7 +30,7 @@ public class CamelMicroProfileHealthRecorder { return new RuntimeValue<>(new CamelContextCustomizer() { @Override public void configure(CamelContext camelContext) { - HealthCheckRegistry registry = new CamelMicroProfileHealthCheckRegistry(camelContext); + HealthCheckRegistry registry = new CamelQuarkusMicroProfileHealthCheckRegistry(camelContext); registry.setId("camel-microprofile-health"); registry.setEnabled(true); diff --git a/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileRepositoryHealthCheck.java b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileRepositoryHealthCheck.java new file mode 100644 index 0000000..7ecba41 --- /dev/null +++ b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileRepositoryHealthCheck.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.microprofile.health.runtime; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import org.apache.camel.health.HealthCheck.Result; +import org.apache.camel.health.HealthCheck.State; +import org.apache.camel.health.HealthCheckRepository; +import org.apache.camel.impl.health.AbstractHealthCheck; +import org.eclipse.microprofile.health.HealthCheck; +import org.eclipse.microprofile.health.HealthCheckResponse; +import org.eclipse.microprofile.health.HealthCheckResponseBuilder; + +/** + * Invokes health checks registered with a {@link HealthCheckRepository} and resolves / aggregates the results into a + * single UP / DOWN status. + */ +final class CamelMicroProfileRepositoryHealthCheck implements HealthCheck { + + private final HealthCheckRepository repository; + private final String name; + + CamelMicroProfileRepositoryHealthCheck(HealthCheckRepository repository, String name) { + this.repository = repository; + this.name = name; + } + + @Override + public HealthCheckResponse call() { + final HealthCheckResponseBuilder builder = HealthCheckResponse.builder(); + builder.name(name); + builder.up(); + + if (repository.isEnabled()) { + List<Result> results = repository.stream() + .filter(healthCheck -> healthCheck.getConfiguration().isEnabled()) + .map(org.apache.camel.health.HealthCheck::call) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + // If any of the result statuses is DOWN, find the first one and report any error details + results.stream() + .filter(result -> result.getState().equals(State.DOWN)) + .findFirst() + .ifPresent(result -> { + CamelMicroProfileHealthHelper.applyHealthDetail(builder, result); + builder.down(); + }); + } else { + builder.withData(AbstractHealthCheck.CHECK_ENABLED, false); + } + + return builder.build(); + } +} diff --git a/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelQuarkusMicroProfileHealthCheckRegistry.java b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelQuarkusMicroProfileHealthCheckRegistry.java new file mode 100644 index 0000000..492fb6e --- /dev/null +++ b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelQuarkusMicroProfileHealthCheckRegistry.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.microprofile.health.runtime; + +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + +import io.smallrye.health.api.HealthRegistry; +import io.smallrye.health.api.HealthType; +import io.smallrye.health.registry.HealthRegistries; +import org.apache.camel.CamelContext; +import org.apache.camel.StartupListener; +import org.apache.camel.health.HealthCheck; +import org.apache.camel.health.HealthCheckRegistry; +import org.apache.camel.health.HealthCheckRepository; +import org.apache.camel.impl.health.ConsumersHealthCheckRepository; +import org.apache.camel.impl.health.DefaultHealthCheckRegistry; +import org.apache.camel.impl.health.RoutesHealthCheckRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link HealthCheckRegistry} implementation to register Camel health checks as MicroProfile health checks on SmallRye + * Health. + */ +public class CamelQuarkusMicroProfileHealthCheckRegistry extends DefaultHealthCheckRegistry implements StartupListener { + + public static final String CONSUMERS_CHECK_NAME = "camel-consumers"; + public static final String ROUTES_CHECK_NAME = "camel-routes"; + private static final Logger LOG = LoggerFactory.getLogger(CamelQuarkusMicroProfileHealthCheckRegistry.class); + private final Set<HealthCheckRepository> repositories = new CopyOnWriteArraySet<>(); + + public CamelQuarkusMicroProfileHealthCheckRegistry() { + this(null); + } + + public CamelQuarkusMicroProfileHealthCheckRegistry(CamelContext camelContext) { + super(camelContext); + super.setId("camel-microprofile-health"); + } + + @Override + protected void doInit() throws Exception { + super.doInit(); + super.getCamelContext().addStartupListener(this); + } + + @Override + public boolean register(Object obj) { + boolean registered = super.register(obj); + if (obj instanceof HealthCheck) { + HealthCheck check = (HealthCheck) obj; + if (check.getConfiguration().isEnabled()) { + registerMicroProfileHealthCheck(check); + } + } else { + HealthCheckRepository repository = (HealthCheckRepository) obj; + if (repository.stream().findAny().isPresent()) { + registerRepositoryChecks(repository); + } else { + // Try health check registration again on CamelContext started + repositories.add(repository); + } + } + return registered; + } + + @Override + public boolean unregister(Object obj) { + boolean unregistered = super.unregister(obj); + if (obj instanceof HealthCheck) { + HealthCheck check = (HealthCheck) obj; + removeMicroProfileHealthCheck(check); + } else { + HealthCheckRepository repository = (HealthCheckRepository) obj; + if (repository instanceof ConsumersHealthCheckRepository || repository instanceof RoutesHealthCheckRepository) { + try { + getReadinessRegistry().remove(repository.getId()); + } catch (IllegalStateException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to remove repository readiness health {} check due to: {}", repository.getId(), + e.getMessage()); + } + } + } else { + repository.stream().forEach(this::removeMicroProfileHealthCheck); + } + } + return unregistered; + } + + @Override + public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception { + //Noop + } + + @Override + public void onCamelContextFullyStarted(CamelContext context, boolean alreadyStarted) throws Exception { + // Some repository checks may not be resolvable earlier in the lifecycle, so try one last time on CamelContext started + if (alreadyStarted) { + repositories.stream() + .filter(repository -> repository.stream().findAny().isPresent()) + .forEach(this::registerRepositoryChecks); + repositories.clear(); + } + } + + protected void registerRepositoryChecks(HealthCheckRepository repository) { + if (repository.isEnabled()) { + // Since the number of potential checks for consumers / routes is non-deterministic + // avoid registering each one with SmallRye health and instead aggregate the results so + // that we avoid highly verbose health output + if (repository instanceof ConsumersHealthCheckRepository) { + CamelMicroProfileRepositoryHealthCheck repositoryHealthCheck = new CamelMicroProfileRepositoryHealthCheck( + repository, CONSUMERS_CHECK_NAME); + getReadinessRegistry().register(repository.getId(), repositoryHealthCheck); + } else if (repository instanceof RoutesHealthCheckRepository) { + CamelMicroProfileRepositoryHealthCheck repositoryHealthCheck = new CamelMicroProfileRepositoryHealthCheck( + repository, ROUTES_CHECK_NAME); + getReadinessRegistry().register(repository.getId(), repositoryHealthCheck); + } else { + repository.stream() + .filter(healthCheck -> healthCheck.getConfiguration().isEnabled()) + .forEach(this::registerMicroProfileHealthCheck); + } + } + } + + protected void registerMicroProfileHealthCheck(HealthCheck camelHealthCheck) { + org.eclipse.microprofile.health.HealthCheck microProfileHealthCheck = new CamelMicroProfileHealthCheck( + camelHealthCheck); + + if (camelHealthCheck.isReadiness()) { + getReadinessRegistry().register(camelHealthCheck.getId(), microProfileHealthCheck); + } + + if (camelHealthCheck.isLiveness()) { + getLivenessRegistry().register(camelHealthCheck.getId(), microProfileHealthCheck); + } + } + + protected void removeMicroProfileHealthCheck(HealthCheck camelHealthCheck) { + if (camelHealthCheck.isReadiness()) { + try { + getReadinessRegistry().remove(camelHealthCheck.getId()); + } catch (IllegalStateException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to remove readiness health check due to: {}", e.getMessage()); + } + } + } + + if (camelHealthCheck.isLiveness()) { + try { + getLivenessRegistry().remove(camelHealthCheck.getId()); + } catch (IllegalStateException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to remove liveness health check due to: {}", e.getMessage()); + } + } + } + } + + protected HealthRegistry getLivenessRegistry() { + return HealthRegistries.getRegistry(HealthType.LIVENESS); + } + + protected HealthRegistry getReadinessRegistry() { + return HealthRegistries.getRegistry(HealthType.READINESS); + } +} diff --git a/integration-test-groups/foundation/core-fault-tolerance/src/main/java/org/apache/camel/quarkus/core/faulttolerance/it/CoreFaultToleranceProducers.java b/integration-test-groups/foundation/core-fault-tolerance/src/main/java/org/apache/camel/quarkus/core/faulttolerance/it/CoreFaultToleranceProducers.java index 5b70d4e..aa93941 100644 --- a/integration-test-groups/foundation/core-fault-tolerance/src/main/java/org/apache/camel/quarkus/core/faulttolerance/it/CoreFaultToleranceProducers.java +++ b/integration-test-groups/foundation/core-fault-tolerance/src/main/java/org/apache/camel/quarkus/core/faulttolerance/it/CoreFaultToleranceProducers.java @@ -26,7 +26,7 @@ import io.smallrye.faulttolerance.core.FaultToleranceStrategy; import io.smallrye.faulttolerance.core.InvocationContext; import io.smallrye.faulttolerance.core.circuit.breaker.CircuitBreaker; import io.smallrye.faulttolerance.core.stopwatch.SystemStopwatch; -import io.smallrye.faulttolerance.core.util.SetOfThrowables; +import io.smallrye.faulttolerance.core.util.ExceptionDecision; public class CoreFaultToleranceProducers { @@ -39,7 +39,7 @@ public class CoreFaultToleranceProducers { return null; } }; - return new CircuitBreaker<Integer>(delegate, "description", SetOfThrowables.EMPTY, SetOfThrowables.EMPTY, 10, 40, 0.1, + return new CircuitBreaker<Integer>(delegate, "description", ExceptionDecision.ALWAYS_FAILURE, 10, 40, 0.1, 2, new SystemStopwatch()) { @Override public String toString() {