This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new d9c0388 CAMEL-15928: Fix not invoking fallback for resilience4j circuit breaker and not triggering circuit breaker for timeout exceptions (#4810) d9c0388 is described below commit d9c0388397101376c2c198ec6cbb4d211185d028 Author: Liroyd <alex_lir...@yahoo.com> AuthorDate: Wed Dec 23 10:26:45 2020 +0200 CAMEL-15928: Fix not invoking fallback for resilience4j circuit breaker and not triggering circuit breaker for timeout exceptions (#4810) --- .../resilience4j/ResilienceProcessor.java | 111 +++++++++------------ .../ResilienceExistingCircuitBreakerTest.java | 16 ++- .../ResilienceInheritErrorHandlerTest.java | 18 +++- .../resilience4j/ResilienceManagementTest.java | 22 +++- .../ResilienceRouteBulkheadFallbackTest.java | 16 ++- .../ResilienceRouteBulkheadOkTest.java | 16 ++- .../resilience4j/ResilienceRouteFallbackTest.java | 19 +++- .../resilience4j/ResilienceRouteOkTest.java | 16 ++- .../resilience4j/ResilienceRouteRejectedTest.java | 19 +++- .../SpringResilienceRouteFallbackTest.java | 11 +- .../resilience4j/SpringResilienceRouteOkTest.java | 11 +- .../SpringResilienceRouteFallbackTest.xml | 17 ++++ .../resilience4j/SpringResilienceRouteOkTest.xml | 17 ++++ 13 files changed, 232 insertions(+), 77 deletions(-) diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java index 90abfc6..2d8f366 100644 --- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java +++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java @@ -355,32 +355,62 @@ public class ResilienceProcessor extends AsyncProcessorSupport // Camel error handler exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true); - Callable<Exchange> task = CircuitBreaker.decorateCallable(circuitBreaker, new CircuitBreakerTask(processor, exchange)); - Function<Throwable, Exchange> fallbackTask = new CircuitBreakerFallbackTask(fallback, exchange); - if (bulkheadConfig != null) { - Bulkhead bh = Bulkhead.of(id, bulkheadConfig); - task = Bulkhead.decorateCallable(bh, task); - } + Callable<Exchange> task; if (timeLimiterConfig != null) { // timeout handling is more complex with thread-pools - final CircuitBreakerTimeoutTask timeoutTask = new CircuitBreakerTimeoutTask(task, exchange); + + TimeLimiter tl = TimeLimiter.of(id, timeLimiterConfig); Supplier<CompletableFuture<Exchange>> futureSupplier; if (executorService == null) { - futureSupplier = () -> CompletableFuture.supplyAsync(timeoutTask::get); + futureSupplier = () -> CompletableFuture.supplyAsync(() -> processInCopy(exchange)); } else { - futureSupplier = () -> CompletableFuture.supplyAsync(timeoutTask::get, executorService); + futureSupplier = () -> CompletableFuture.supplyAsync(() -> processInCopy(exchange), executorService); } - - TimeLimiter tl = TimeLimiter.of(id, timeLimiterConfig); task = TimeLimiter.decorateFutureSupplier(tl, futureSupplier); + } else { + task = new CircuitBreakerTask(() -> processInCopy(exchange)); } - Try.ofCallable(task).recover(fallbackTask).andFinally(() -> callback.done(false)).get(); + if (bulkheadConfig != null) { + Bulkhead bh = Bulkhead.of(id, bulkheadConfig); + task = Bulkhead.decorateCallable(bh, task); + } + + task = CircuitBreaker.decorateCallable(circuitBreaker, task); + Function<Throwable, Exchange> fallbackTask = new CircuitBreakerFallbackTask(this.fallback, exchange); + Try.ofCallable(task).recover(fallbackTask).andFinally(() -> callback.done(false)).get(); return false; } + private Exchange processInCopy(Exchange exchange) { + try { + LOG.debug("Running processor: {} with exchange: {}", processor, exchange); + // prepare a copy of exchange so downstream processors don't + // cause side-effects if they mutate the exchange + // in case timeout processing and continue with the fallback etc + Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false, false); + // process the processor until its fully done + processor.process(copy); + if (copy.getException() != null) { + exchange.setException(copy.getException()); + } else { + // copy the result as its regarded as success + ExchangeHelper.copyResults(exchange, copy); + exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, true); + exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false); + } + } catch (Throwable e) { + exchange.setException(e); + } + if (exchange.getException() != null) { + // throw exception so resilient4j know it was a failure + throw RuntimeExchangeException.wrapRuntimeException(exchange.getException()); + } + return exchange; + } + @Override protected void doStart() throws Exception { ObjectHelper.notNull(camelContext, "CamelContext", this); @@ -398,40 +428,15 @@ public class ResilienceProcessor extends AsyncProcessorSupport private static final class CircuitBreakerTask implements Callable<Exchange> { - private final Processor processor; - private final Exchange exchange; + Supplier<Exchange> supplier; - private CircuitBreakerTask(Processor processor, Exchange exchange) { - this.processor = processor; - this.exchange = exchange; + public CircuitBreakerTask(Supplier<Exchange> supplier) { + this.supplier = supplier; } @Override public Exchange call() throws Exception { - try { - LOG.debug("Running processor: {} with exchange: {}", processor, exchange); - // prepare a copy of exchange so downstream processors don't - // cause side-effects if they mutate the exchange - // in case timeout processing and continue with the fallback etc - Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false, false); - // process the processor until its fully done - processor.process(copy); - if (copy.getException() != null) { - exchange.setException(copy.getException()); - } else { - // copy the result as its regarded as success - ExchangeHelper.copyResults(exchange, copy); - exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, true); - exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false); - } - } catch (Throwable e) { - exchange.setException(e); - } - if (exchange.getException() != null) { - // throw exception so resilient4j know it was a failure - throw RuntimeExchangeException.wrapRuntimeException(exchange.getException()); - } - return exchange; + return supplier.get(); } } @@ -463,7 +468,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false); exchange.setProperty(CircuitBreakerConstants.RESPONSE_SHORT_CIRCUITED, true); exchange.setProperty(CircuitBreakerConstants.RESPONSE_REJECTED, true); - return exchange; + throw RuntimeExchangeException.wrapRuntimeException(throwable); } else { // throw exception so resilient4j know it was a failure throw RuntimeExchangeException.wrapRuntimeException(throwable); @@ -500,26 +505,4 @@ public class ResilienceProcessor extends AsyncProcessorSupport return exchange; } } - - private static final class CircuitBreakerTimeoutTask implements Supplier<Exchange> { - - private final Callable<Exchange> future; - private final Exchange exchange; - - private CircuitBreakerTimeoutTask(Callable<Exchange> future, Exchange exchange) { - this.future = future; - this.exchange = exchange; - } - - @Override - public Exchange get() { - try { - return future.call(); - } catch (Exception e) { - exchange.setException(e); - } - return exchange; - } - } - } diff --git a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceExistingCircuitBreakerTest.java b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceExistingCircuitBreakerTest.java index a70fb6f..7ce7a28 100644 --- a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceExistingCircuitBreakerTest.java +++ b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceExistingCircuitBreakerTest.java @@ -35,11 +35,20 @@ public class ResilienceExistingCircuitBreakerTest extends CamelTestSupport { @Test public void testResilience() throws Exception { + test("direct:start"); + } + + @Test + public void testResilienceWithTimeOut() throws Exception { + test("direct:start.with.timeout.enabled"); + } + + private void test(String endPointUri) throws InterruptedException { getMockEndpoint("mock:result").expectedBodiesReceived("Fallback message"); getMockEndpoint("mock:result").expectedPropertyReceived(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false); getMockEndpoint("mock:result").expectedPropertyReceived(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, true); - template.sendBody("direct:start", "Hello World"); + template.sendBody(endPointUri, "Hello World"); assertMockEndpointsSatisfied(); @@ -59,6 +68,11 @@ public class ResilienceExistingCircuitBreakerTest extends CamelTestSupport { .circuitBreakerRef("myCircuitBreaker").end() .throwException(new IllegalArgumentException("Forced")).onFallback().transform() .constant("Fallback message").end().to("log:result").to("mock:result"); + + from("direct:start.with.timeout.enabled").to("log:direct:start.with.timeout.enabled").circuitBreaker().resilience4jConfiguration() + .circuitBreakerRef("myCircuitBreaker").timeoutEnabled(true).timeoutDuration(2000).end() + .throwException(new IllegalArgumentException("Forced")).onFallback().transform() + .constant("Fallback message").end().to("log:result").to("mock:result"); } }; } diff --git a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceInheritErrorHandlerTest.java b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceInheritErrorHandlerTest.java index 7803358..5c97e91 100644 --- a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceInheritErrorHandlerTest.java +++ b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceInheritErrorHandlerTest.java @@ -24,11 +24,20 @@ public class ResilienceInheritErrorHandlerTest extends CamelTestSupport { @Test public void testResilience() throws Exception { + test("direct:start"); + } + + @Test + public void testResilienceWithTimeOut() throws Exception { + test("direct:start.with.timeout.enabled"); + } + + private void test(String endPointUri) throws InterruptedException { getMockEndpoint("mock:a").expectedMessageCount(3 + 1); getMockEndpoint("mock:dead").expectedMessageCount(1); getMockEndpoint("mock:result").expectedMessageCount(0); - template.sendBody("direct:start", "Hello World"); + template.sendBody(endPointUri, "Hello World"); assertMockEndpointsSatisfied(); } @@ -45,6 +54,13 @@ public class ResilienceInheritErrorHandlerTest extends CamelTestSupport { // redeliveries .circuitBreaker().inheritErrorHandler(true).to("mock:a") .throwException(new IllegalArgumentException("Forced")).end().to("log:result").to("mock:result"); + + from("direct:start.with.timeout.enabled").to("log:direct:start.with.timeout.enabled") + // turn on Camel's error handler on hystrix so it can do + // redeliveries + .circuitBreaker().inheritErrorHandler(true).resilience4jConfiguration().timeoutEnabled(true).timeoutDuration(2000).end() + .to("mock:a") + .throwException(new IllegalArgumentException("Forced")).end().to("log:result").to("mock:result"); } }; } diff --git a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceManagementTest.java b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceManagementTest.java index 196a5a9..51a3328 100644 --- a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceManagementTest.java +++ b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceManagementTest.java @@ -38,9 +38,18 @@ public class ResilienceManagementTest extends CamelTestSupport { @Test public void testResilience() throws Exception { + test("start", "myResilience"); + } + + @Test + public void testResilienceWithTimeOut() throws Exception { + test("start.with.timeout.enabled", "myResilienceWithTimeout"); + } + + public void test(String routId, String circuitBreakerName) throws Exception { getMockEndpoint("mock:result").expectedBodiesReceived("Bye World"); - template.sendBody("direct:start", "Hello World"); + template.sendBody("direct:" + routId, "Hello World"); assertMockEndpointsSatisfied(); @@ -52,11 +61,12 @@ public class ResilienceManagementTest extends CamelTestSupport { String name = context.getManagementName(); // get the object name for the delayer - ObjectName on = ObjectName.getInstance("org.apache.camel:context=" + name + ",type=processors,name=\"myResilience\""); + ObjectName on = ObjectName + .getInstance("org.apache.camel:context=" + name + ",type=processors,name=\"" + circuitBreakerName + "\""); // should be on start String routeId = (String) mbeanServer.getAttribute(on, "RouteId"); - assertEquals("start", routeId); + assertEquals(routId, routeId); Integer num = (Integer) mbeanServer.getAttribute(on, "CircuitBreakerMinimumNumberOfCalls"); assertEquals("100", num.toString()); @@ -84,6 +94,12 @@ public class ResilienceManagementTest extends CamelTestSupport { .transform().constant("Fallback message").end() .to("mock:result"); + from("direct:start.with.timeout.enabled").routeId("start.with.timeout.enabled").circuitBreaker().id("myResilienceWithTimeout") + .resilience4jConfiguration().timeoutEnabled(true).timeoutDuration(2000).end() + .to("direct:foo").onFallback() + .transform().constant("Fallback message").end() + .to("mock:result"); + from("direct:foo").transform().constant("Bye World"); } }; diff --git a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteBulkheadFallbackTest.java b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteBulkheadFallbackTest.java index ad5146d..e75abdb 100644 --- a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteBulkheadFallbackTest.java +++ b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteBulkheadFallbackTest.java @@ -25,11 +25,20 @@ public class ResilienceRouteBulkheadFallbackTest extends CamelTestSupport { @Test public void testResilience() throws Exception { + test("direct:start"); + } + + @Test + public void testResilienceWithTimeOut() throws Exception { + test("direct:start.with.timeout.enabled"); + } + + private void test(String endPointUri) throws Exception { getMockEndpoint("mock:result").expectedBodiesReceived("Fallback message"); getMockEndpoint("mock:result").expectedPropertyReceived(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false); getMockEndpoint("mock:result").expectedPropertyReceived(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, true); - template.sendBody("direct:start", "Hello World"); + template.sendBody(endPointUri, "Hello World"); assertMockEndpointsSatisfied(); } @@ -42,6 +51,11 @@ public class ResilienceRouteBulkheadFallbackTest extends CamelTestSupport { from("direct:start").to("log:start").circuitBreaker().resilience4jConfiguration().bulkheadEnabled(true).end() .throwException(new IllegalArgumentException("Forced")) .onFallback().transform().constant("Fallback message").end().to("log:result").to("mock:result"); + + from("direct:start.with.timeout.enabled").to("log:direct:start.with.timeout.enabled").circuitBreaker().resilience4jConfiguration().bulkheadEnabled(true) + .timeoutEnabled(true).timeoutDuration(2000).end() + .throwException(new IllegalArgumentException("Forced")) + .onFallback().transform().constant("Fallback message").end().to("log:result").to("mock:result"); } }; } diff --git a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteBulkheadOkTest.java b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteBulkheadOkTest.java index 688fb91..831e58f 100644 --- a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteBulkheadOkTest.java +++ b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteBulkheadOkTest.java @@ -25,11 +25,20 @@ public class ResilienceRouteBulkheadOkTest extends CamelTestSupport { @Test public void testResilience() throws Exception { + test("direct:start"); + } + + @Test + public void testResilienceWithTimeOut() throws Exception { + test("direct:start.with.timeout.enabled"); + } + + private void test(String endPointUri) throws Exception { getMockEndpoint("mock:result").expectedBodiesReceived("Bye World"); getMockEndpoint("mock:result").expectedPropertyReceived(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, true); getMockEndpoint("mock:result").expectedPropertyReceived(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false); - template.sendBody("direct:start", "Hello World"); + template.sendBody(endPointUri, "Hello World"); assertMockEndpointsSatisfied(); } @@ -43,6 +52,11 @@ public class ResilienceRouteBulkheadOkTest extends CamelTestSupport { .to("log:foo").onFallback().transform() .constant("Fallback message").end().to("log:result").to("mock:result"); + from("direct:start.with.timeout.enabled").circuitBreaker().resilience4jConfiguration().bulkheadEnabled(true).timeoutEnabled(true).timeoutDuration(2000).end() + .to("direct:foo") + .to("log:foo").onFallback().transform() + .constant("Fallback message").end().to("log:result").to("mock:result"); + from("direct:foo").transform().constant("Bye World"); } }; diff --git a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteFallbackTest.java b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteFallbackTest.java index 9be4309..9434fe3 100644 --- a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteFallbackTest.java +++ b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteFallbackTest.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.resilience4j; +import java.util.concurrent.TimeoutException; + import org.apache.camel.builder.RouteBuilder; import org.apache.camel.spi.CircuitBreakerConstants; import org.apache.camel.test.junit5.CamelTestSupport; @@ -25,11 +27,20 @@ public class ResilienceRouteFallbackTest extends CamelTestSupport { @Test public void testResilience() throws Exception { + test("direct:start"); + } + + @Test + public void testResilienceWithTimeOut() throws Exception { + test("direct:start.with.timeout.enabled"); + } + + private void test(String endPointUri) throws Exception { getMockEndpoint("mock:result").expectedBodiesReceived("Fallback message"); getMockEndpoint("mock:result").expectedPropertyReceived(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false); getMockEndpoint("mock:result").expectedPropertyReceived(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, true); - template.sendBody("direct:start", "Hello World"); + template.sendBody(endPointUri, "Hello World"); assertMockEndpointsSatisfied(); } @@ -42,6 +53,12 @@ public class ResilienceRouteFallbackTest extends CamelTestSupport { from("direct:start").to("log:start").circuitBreaker().throwException(new IllegalArgumentException("Forced")) .onFallback().transform().constant("Fallback message") .end().to("log:result").to("mock:result"); + + from("direct:start.with.timeout.enabled").to("log:start.with.timeout.enabled").circuitBreaker() + .resilience4jConfiguration().timeoutEnabled(true).timeoutDuration(2000).end() + .throwException(new TimeoutException("Forced")) + .onFallback().transform().constant("Fallback message") + .end().to("log:result").to("mock:result"); } }; } diff --git a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteOkTest.java b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteOkTest.java index 36729b4..45a7ea7 100644 --- a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteOkTest.java +++ b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteOkTest.java @@ -49,13 +49,22 @@ public class ResilienceRouteOkTest extends CamelTestSupport { @Test public void testResilience() throws Exception { + test("direct:start"); + } + + @Test + public void testResilienceWithTimeOut() throws Exception { + test("direct:start.with.timeout.enabled"); + } + + private void test(String endPointUri) throws Exception { assertEquals(0, bi.getInvokedCounter()); getMockEndpoint("mock:result").expectedBodiesReceived("Bye World"); getMockEndpoint("mock:result").expectedPropertyReceived(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, true); getMockEndpoint("mock:result").expectedPropertyReceived(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false); - template.sendBody("direct:start", "Hello World"); + template.sendBody(endPointUri, "Hello World"); assertMockEndpointsSatisfied(); @@ -70,6 +79,11 @@ public class ResilienceRouteOkTest extends CamelTestSupport { from("direct:start").circuitBreaker().to("direct:foo").to("log:foo").onFallback().transform() .constant("Fallback message").end().to("log:result").to("mock:result"); + from("direct:start.with.timeout.enabled").circuitBreaker().resilience4jConfiguration() + .timeoutEnabled(true).timeoutDuration(2000).end() + .to("direct:foo").to("log:foo").onFallback().transform() + .constant("Fallback message").end().to("log:result").to("mock:result"); + from("direct:foo").transform().constant("Bye World"); } }; diff --git a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteRejectedTest.java b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteRejectedTest.java index 2ec4c9f..d6e29c4 100644 --- a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteRejectedTest.java +++ b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteRejectedTest.java @@ -38,6 +38,15 @@ public class ResilienceRouteRejectedTest extends CamelTestSupport { @Test public void testResilience() throws Exception { + test("direct:start", "myResilience"); + } + + @Test + public void testResilienceWithTimeOut() throws Exception { + test("direct:start.with.timeout.enabled", "myResilienceWithTimeout"); + } + + private void test(String endPointUri, String circuitBreakerName) throws Exception { // look inside jmx // get the stats for the route MBeanServer mbeanServer = getMBeanServer(); @@ -45,7 +54,8 @@ public class ResilienceRouteRejectedTest extends CamelTestSupport { // context name String name = context.getManagementName(); - ObjectName on = ObjectName.getInstance("org.apache.camel:context=" + name + ",type=processors,name=\"myResilience\""); + ObjectName on = ObjectName + .getInstance("org.apache.camel:context=" + name + ",type=processors,name=\"" + circuitBreakerName + "\""); // force it into open state mbeanServer.invoke(on, "transitionToForcedOpenState", null, null); @@ -55,7 +65,7 @@ public class ResilienceRouteRejectedTest extends CamelTestSupport { // send message which should get rejected, so the message is not changed getMockEndpoint("mock:result").expectedBodiesReceived("Hello World"); - template.sendBody("direct:start", "Hello World"); + template.sendBody(endPointUri, "Hello World"); assertMockEndpointsSatisfied(); } @@ -68,6 +78,11 @@ public class ResilienceRouteRejectedTest extends CamelTestSupport { from("direct:start").circuitBreaker().id("myResilience").to("direct:foo").to("log:foo").end().to("log:result") .to("mock:result"); + from("direct:start.with.timeout.enabled").circuitBreaker().resilience4jConfiguration() + .timeoutEnabled(true).timeoutDuration(2000).end() + .id("myResilienceWithTimeout").to("direct:foo").to("log:foo").end().to("log:result") + .to("mock:result"); + from("direct:foo").transform().constant("Bye World"); } }; diff --git a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/SpringResilienceRouteFallbackTest.java b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/SpringResilienceRouteFallbackTest.java index e8b25a2..90978ac 100644 --- a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/SpringResilienceRouteFallbackTest.java +++ b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/SpringResilienceRouteFallbackTest.java @@ -34,11 +34,20 @@ public class SpringResilienceRouteFallbackTest extends CamelSpringTestSupport { @Test public void testResilience() throws Exception { + test("direct:start"); + } + + @Test + public void testResilienceWithTimeOut() throws Exception { + test("direct:start.with.timeout.enabled"); + } + + private void test(String endPointUri) throws Exception { getMockEndpoint("mock:result").expectedBodiesReceived("Fallback message"); getMockEndpoint("mock:result").expectedPropertyReceived(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false); getMockEndpoint("mock:result").expectedPropertyReceived(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, true); - template.sendBody("direct:start", "Hello World"); + template.sendBody(endPointUri, "Hello World"); assertMockEndpointsSatisfied(); } diff --git a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/SpringResilienceRouteOkTest.java b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/SpringResilienceRouteOkTest.java index 48a4320..28be9c2 100644 --- a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/SpringResilienceRouteOkTest.java +++ b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/SpringResilienceRouteOkTest.java @@ -33,11 +33,20 @@ public class SpringResilienceRouteOkTest extends CamelSpringTestSupport { @Test public void testResilience() throws Exception { + test("direct:start"); + } + + @Test + public void testResilienceWithTimeOut() throws Exception { + test("direct:start.with.timeout.enabled"); + } + + private void test(String endPointUri) throws Exception { getMockEndpoint("mock:result").expectedBodiesReceived("Bye World"); getMockEndpoint("mock:result").expectedPropertyReceived(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, true); getMockEndpoint("mock:result").expectedPropertyReceived(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false); - template.sendBody("direct:start", "Hello World"); + template.sendBody(endPointUri, "Hello World"); assertMockEndpointsSatisfied(); } diff --git a/components/camel-resilience4j/src/test/resources/org/apache/camel/component/resilience4j/SpringResilienceRouteFallbackTest.xml b/components/camel-resilience4j/src/test/resources/org/apache/camel/component/resilience4j/SpringResilienceRouteFallbackTest.xml index 39dbe97..573408c 100644 --- a/components/camel-resilience4j/src/test/resources/org/apache/camel/component/resilience4j/SpringResilienceRouteFallbackTest.xml +++ b/components/camel-resilience4j/src/test/resources/org/apache/camel/component/resilience4j/SpringResilienceRouteFallbackTest.xml @@ -37,6 +37,23 @@ <to uri="mock:result"/> </route> + <route> + <from uri="direct:start.with.timeout.enabled"/> + <circuitBreaker> + <resilience4jConfiguration> + <timeoutEnabled>true</timeoutEnabled> + <timeoutDuration>2000</timeoutDuration> + </resilience4jConfiguration> + <throwException exceptionType="java.lang.IllegalArgumentException" message="Forced"/> + <onFallback> + <transform> + <constant>Fallback message</constant> + </transform> + </onFallback> + </circuitBreaker> + <to uri="mock:result"/> + </route> + </camelContext> </beans> \ No newline at end of file diff --git a/components/camel-resilience4j/src/test/resources/org/apache/camel/component/resilience4j/SpringResilienceRouteOkTest.xml b/components/camel-resilience4j/src/test/resources/org/apache/camel/component/resilience4j/SpringResilienceRouteOkTest.xml index 1e2b139..71e831d 100644 --- a/components/camel-resilience4j/src/test/resources/org/apache/camel/component/resilience4j/SpringResilienceRouteOkTest.xml +++ b/components/camel-resilience4j/src/test/resources/org/apache/camel/component/resilience4j/SpringResilienceRouteOkTest.xml @@ -38,6 +38,23 @@ </route> <route> + <from uri="direct:start.with.timeout.enabled"/> + <circuitBreaker> + <resilience4jConfiguration> + <timeoutEnabled>true</timeoutEnabled> + <timeoutDuration>2000</timeoutDuration> + </resilience4jConfiguration> + <to uri="direct:foo"/> + <onFallback> + <transform> + <constant>Fallback message</constant> + </transform> + </onFallback> + </circuitBreaker> + <to uri="mock:result"/> + </route> + + <route> <from uri="direct:foo"/> <transform> <constant>Bye World</constant>