This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-4.10.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.10.x by this push: new a34e68396d0 CAMEL-22195: camel-resilience4j - Fix using record and ignore exceptions (which can be wrapped) and only trigger fallback accordingly. (#18455) a34e68396d0 is described below commit a34e68396d090585bc9153bc423d985335073e92 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Jun 24 20:56:11 2025 +0200 CAMEL-22195: camel-resilience4j - Fix using record and ignore exceptions (which can be wrapped) and only trigger fallback accordingly. (#18455) --- .../resilience4j/ResilienceProcessor.java | 35 ++++++++- .../component/resilience4j/ResilienceReifier.java | 35 +++++++-- .../ResilienceIgnoreExceptionTest.java | 84 ++++++++++++++++++++++ .../ResilienceRecordExceptionTest.java | 84 ++++++++++++++++++++++ .../ResilienceRecordIgnoreExceptionTest.java | 84 ++++++++++++++++++++++ .../model/Resilience4jConfigurationDefinition.java | 25 +++++++ 6 files changed, 339 insertions(+), 8 deletions(-) diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java index 4e8762ef047..c3b0c4d1e5a 100644 --- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java +++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java @@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; import java.util.function.Function; +import java.util.function.Predicate; import java.util.function.Supplier; import io.github.resilience4j.bulkhead.Bulkhead; @@ -85,6 +86,8 @@ public class ResilienceProcessor extends AsyncProcessorSupport private final Processor processor; private final Processor fallback; private final boolean throwExceptionWhenHalfOpenOrOpenState; + private final Predicate<Throwable> recordPredicate; + private final Predicate<Throwable> ignorePredicate; private boolean shutdownExecutorService; private ExecutorService executorService; private ProcessorExchangeFactory processorExchangeFactory; @@ -93,13 +96,16 @@ public class ResilienceProcessor extends AsyncProcessorSupport public ResilienceProcessor(CircuitBreakerConfig circuitBreakerConfig, BulkheadConfig bulkheadConfig, TimeLimiterConfig timeLimiterConfig, Processor processor, - Processor fallback, boolean throwExceptionWhenHalfOpenOrOpenState) { + Processor fallback, boolean throwExceptionWhenHalfOpenOrOpenState, + Predicate<Throwable> recordPredicate, Predicate<Throwable> ignorePredicate) { this.circuitBreakerConfig = circuitBreakerConfig; this.bulkheadConfig = bulkheadConfig; this.timeLimiterConfig = timeLimiterConfig; this.processor = processor; this.fallback = fallback; this.throwExceptionWhenHalfOpenOrOpenState = throwExceptionWhenHalfOpenOrOpenState; + this.recordPredicate = recordPredicate; + this.ignorePredicate = ignorePredicate; } @Override @@ -621,8 +627,31 @@ public class ResilienceProcessor extends AsyncProcessorSupport @Override public Exchange apply(Throwable throwable) { + // check again if we should ignore or not record the throw exception as a failure + if (ignorePredicate != null && ignorePredicate.test(throwable)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Processing exchange: {} recover task using circuit breaker: {} ignored exception: {}", + exchange.getExchangeId(), + id, throwable); + } + // exception should be ignored + exchange.setException(null); + return exchange; + } + if (recordPredicate != null && !recordPredicate.test(throwable)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Processing exchange: {} recover task using circuit breaker: {} success exception: {}", + exchange.getExchangeId(), + id, throwable); + } + // exception is a success + exchange.setException(null); + return exchange; + } + if (LOG.isTraceEnabled()) { - LOG.trace("Processing exchange: {} recover task using circuit breaker: {} from: {}", exchange.getExchangeId(), + LOG.trace("Processing exchange: {} recover task using circuit breaker: {} failed exception: {}", + exchange.getExchangeId(), id, throwable); } @@ -688,7 +717,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport LOG.debug("Running fallback: {} with exchange: {}", fallback, exchange); // process the fallback until its fully done fallback.process(exchange); - LOG.debug("Running fallback: {} with exchange: {} done", fallback, exchange); + LOG.trace("Running fallback: {} with exchange: {} done", fallback, exchange); } catch (Throwable e) { exchange.setException(e); } diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java index 6e8cbcfc469..95647cb8cd5 100644 --- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java +++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutorService; +import java.util.function.Predicate; import io.github.resilience4j.bulkhead.BulkheadConfig; import io.github.resilience4j.circuitbreaker.CircuitBreaker; @@ -41,6 +42,7 @@ import org.apache.camel.spi.PropertyConfigurer; import org.apache.camel.support.CamelContextHelper; import org.apache.camel.support.PluginHelper; import org.apache.camel.support.PropertyBindingSupport; +import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.function.Suppliers; public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition> { @@ -71,9 +73,18 @@ public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition if (b != null) { throwExceptionWhenHalfOpenOrOpenState = b; } + Predicate<Throwable> recordPredicate = null; + if (!config.getRecordExceptions().isEmpty()) { + recordPredicate = cbConfig.getRecordExceptionPredicate(); + } + Predicate<Throwable> ignorePredicate = null; + if (!config.getIgnoreExceptions().isEmpty()) { + ignorePredicate = cbConfig.getIgnoreExceptionPredicate(); + } ResilienceProcessor answer = new ResilienceProcessor( - cbConfig, bhConfig, tlConfig, processor, fallback, throwExceptionWhenHalfOpenOrOpenState); + cbConfig, bhConfig, tlConfig, processor, fallback, throwExceptionWhenHalfOpenOrOpenState, recordPredicate, + ignorePredicate); configureTimeoutExecutorService(answer, config); // using any existing circuit breakers? if (config.getCircuitBreaker() != null) { @@ -116,11 +127,11 @@ public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition if (config.getWritableStackTraceEnabled() != null) { builder.writableStackTraceEnabled(parseBoolean(config.getWritableStackTraceEnabled())); } - if (config.getRecordExceptions() != null) { - builder.recordExceptions(createRecordExceptionClasses()); + if (!config.getRecordExceptions().isEmpty()) { + builder.recordException(createExceptionPredicate(createRecordExceptionClasses())); } - if (config.getIgnoreExceptions() != null) { - builder.ignoreExceptions(createIgnoreExceptionClasses()); + if (!config.getIgnoreExceptions().isEmpty()) { + builder.ignoreException(createExceptionPredicate(createIgnoreExceptionClasses())); } return builder.build(); } @@ -261,4 +272,18 @@ public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition } return answer.toArray(new Class[0]); } + + private Predicate<Throwable> createExceptionPredicate(final Class<? extends Throwable>[] exceptions) { + return t -> { + for (Throwable te : ObjectHelper.createExceptionIterable(t)) { + for (var ex : exceptions) { + if (ex.isInstance(te)) { + return true; + } + } + } + return false; + }; + } + } diff --git a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceIgnoreExceptionTest.java b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceIgnoreExceptionTest.java new file mode 100644 index 00000000000..016e23c4b22 --- /dev/null +++ b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceIgnoreExceptionTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.resilience4j; + +import java.io.FileNotFoundException; +import java.io.IOException; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.Test; + +public class ResilienceIgnoreExceptionTest extends CamelTestSupport { + + @Test + public void testHello() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World"); + template.sendBody("direct:start", "Hello World"); + MockEndpoint.assertIsSatisfied(context); + } + + @Test + public void testFile() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("file"); + template.sendBody("direct:start", "file"); + MockEndpoint.assertIsSatisfied(context); + } + + @Test + public void testKaboom() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Fallback message"); + template.sendBody("direct:start", "kaboom"); + MockEndpoint.assertIsSatisfied(context); + } + + @Test + public void testIo() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("io"); + template.sendBody("direct:start", "io"); + MockEndpoint.assertIsSatisfied(context); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start") + .to("log:start") + .circuitBreaker().resilience4jConfiguration().ignoreException(IOException.class).end() + .process(e -> { + String b = e.getMessage().getBody(String.class); + if ("kaboom".equals(b)) { + throw new NullPointerException(); + } else if ("file".equals(b)) { + throw new FileNotFoundException("unknown.txt"); + } else if ("io".equals(b)) { + throw new IOException("Host not found"); + } + }) + .onFallback() + .transform().constant("Fallback message") + .end() + .to("log:result") + .to("mock:result"); + } + }; + } + +} diff --git a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRecordExceptionTest.java b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRecordExceptionTest.java new file mode 100644 index 00000000000..72d20cd8058 --- /dev/null +++ b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRecordExceptionTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.resilience4j; + +import java.io.FileNotFoundException; +import java.io.IOException; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.Test; + +public class ResilienceRecordExceptionTest extends CamelTestSupport { + + @Test + public void testHello() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World"); + template.sendBody("direct:start", "Hello World"); + MockEndpoint.assertIsSatisfied(context); + } + + @Test + public void testFile() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Fallback message"); + template.sendBody("direct:start", "file"); + MockEndpoint.assertIsSatisfied(context); + } + + @Test + public void testKaboom() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("kaboom"); + template.sendBody("direct:start", "kaboom"); + MockEndpoint.assertIsSatisfied(context); + } + + @Test + public void testIo() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Fallback message"); + template.sendBody("direct:start", "io"); + MockEndpoint.assertIsSatisfied(context); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start") + .to("log:start") + .circuitBreaker().resilience4jConfiguration().recordException(IOException.class).end() + .process(e -> { + String b = e.getMessage().getBody(String.class); + if ("kaboom".equals(b)) { + throw new NullPointerException(); + } else if ("file".equals(b)) { + throw new FileNotFoundException("unknown.txt"); + } else if ("io".equals(b)) { + throw new IOException("Host not found"); + } + }) + .onFallback() + .transform().constant("Fallback message") + .end() + .to("log:result") + .to("mock:result"); + } + }; + } + +} diff --git a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRecordIgnoreExceptionTest.java b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRecordIgnoreExceptionTest.java new file mode 100644 index 00000000000..1517d027f4e --- /dev/null +++ b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRecordIgnoreExceptionTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.resilience4j; + +import java.io.FileNotFoundException; +import java.io.IOException; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.Test; + +public class ResilienceRecordIgnoreExceptionTest extends CamelTestSupport { + + @Test + public void testHello() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World"); + template.sendBody("direct:start", "Hello World"); + MockEndpoint.assertIsSatisfied(context); + } + + @Test + public void testFile() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("file"); + template.sendBody("direct:start", "file"); + MockEndpoint.assertIsSatisfied(context); + } + + @Test + public void testIo() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Fallback message"); + template.sendBody("direct:start", "io"); + MockEndpoint.assertIsSatisfied(context); + } + + @Test + public void testKaboom() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("kaboom"); + template.sendBody("direct:start", "kaboom"); + MockEndpoint.assertIsSatisfied(context); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start") + .to("log:start") + .circuitBreaker().resilience4jConfiguration().recordException(IOException.class).ignoreException(FileNotFoundException.class).end() + .process(e -> { + String b = e.getMessage().getBody(String.class); + if ("kaboom".equals(b)) { + throw new NullPointerException(); + } else if ("file".equals(b)) { + throw new FileNotFoundException("unknown.txt"); + } else if ("io".equals(b)) { + throw new IOException("Host not found"); + } + }) + .onFallback() + .transform().constant("Fallback message") + .end() + .to("log:result") + .to("mock:result"); + } + }; + } + +} diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java index 1d632ab77c7..71481510fbc 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java @@ -271,6 +271,18 @@ public class Resilience4jConfigurationDefinition extends Resilience4jConfigurati return this; } + /** + * Configure a list of exceptions that are recorded as a failure and thus increase the failure rate. Any exception + * matching or inheriting from one of the list counts as a failure, unless explicitly ignored via ignoreExceptions. + */ + @SafeVarargs + public final Resilience4jConfigurationDefinition recordException(Class<? extends Throwable>... exception) { + for (Class<? extends Throwable> t : exception) { + getRecordExceptions().add(t.getName()); + } + return this; + } + /** * Configure a list of exceptions that are ignored and neither count as a failure nor success. Any exception * matching or inheriting from one of the list will not count as a failure nor success, even if the exceptions is @@ -283,6 +295,19 @@ public class Resilience4jConfigurationDefinition extends Resilience4jConfigurati return this; } + /** + * Configure a list of exceptions that are ignored and neither count as a failure nor success. Any exception + * matching or inheriting from one of the list will not count as a failure nor success, even if the exceptions is + * part of recordExceptions. + */ + @SafeVarargs + public final Resilience4jConfigurationDefinition ignoreException(Class<? extends Throwable>... exception) { + for (Class<? extends Throwable> t : exception) { + getIgnoreExceptions().add(t.getName()); + } + return this; + } + /** * End of configuration. */