Repository: camel Updated Branches: refs/heads/master 76c3a420c -> 799b45df0
CAMEL-7973 Fixed the issue that CircuitBreakerLoadBalancer fails on async processors with thanks to Matteo Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/799b45df Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/799b45df Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/799b45df Branch: refs/heads/master Commit: 799b45df0c2e670f0240ddd3e9c6256fcb4b3740 Parents: 76c3a42 Author: Willem Jiang <willem.ji...@gmail.com> Authored: Thu Oct 30 17:04:28 2014 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Thu Oct 30 17:04:28 2014 +0800 ---------------------------------------------------------------------- .../CircuitBreakerLoadBalancer.java | 55 ++++++++++++++---- .../CircuitBreakerLoadBalancerTest.java | 60 +++++++++++++++----- 2 files changed, 91 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/799b45df/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java index b8e23b4..3e84e6e 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java +++ b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java @@ -107,6 +107,13 @@ public class CircuitBreakerLoadBalancer extends LoadBalancerSupport implements T if (failures.get() >= threshold && System.currentTimeMillis() - lastFailure < halfOpenAfter) { exchange.setException(new RejectedExecutionException("CircuitBreaker Open: failures: " + failures + ", lastFailure: " + lastFailure)); + /* + * If the circuit opens, we have to prevent the execution of any processor. + * The failures count can be set to 0. + */ + failures.set(0); + callback.done(true); + return true; } Processor processor = getProcessors().get(0); if (processor == null) { @@ -114,18 +121,20 @@ public class CircuitBreakerLoadBalancer extends LoadBalancerSupport implements T } AsyncProcessor albp = AsyncProcessorConverterHelper.convert(processor); - boolean sync = albp.process(exchange, callback); - - boolean failed = hasFailed(exchange); - - if (!failed) { - failures.set(0); + // Added a callback for processing the exchange in the callback + boolean sync = albp.process(exchange, new CircuitBreakerCallback(exchange, callback)); + + // We need to check the exception here as albp is use sync call + if (sync) { + boolean failed = hasFailed(exchange); + if (!failed) { + failures.set(0); + } else { + failures.incrementAndGet(); + lastFailure = System.currentTimeMillis(); + } } else { - failures.incrementAndGet(); - lastFailure = System.currentTimeMillis(); - } - - if (!sync) { + // CircuitBreakerCallback can take care of failure check of the exchange log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); return false; } @@ -142,4 +151,28 @@ public class CircuitBreakerLoadBalancer extends LoadBalancerSupport implements T public String getTraceLabel() { return "circuitbreaker"; } + + class CircuitBreakerCallback implements AsyncCallback { + private final AsyncCallback callback; + private final Exchange exchange; + CircuitBreakerCallback(Exchange exchange, AsyncCallback callback) { + this.callback = callback; + this.exchange = exchange; + } + + @Override + public void done(boolean doneSync) { + if (!doneSync) { + boolean failed = hasFailed(exchange); + if (!failed) { + failures.set(0); + } else { + failures.incrementAndGet(); + lastFailure = System.currentTimeMillis(); + } + } + callback.done(doneSync); + } + + } } http://git-wip-us.apache.org/repos/asf/camel/blob/799b45df/camel-core/src/test/java/org/apache/camel/processor/CircuitBreakerLoadBalancerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/CircuitBreakerLoadBalancerTest.java b/camel-core/src/test/java/org/apache/camel/processor/CircuitBreakerLoadBalancerTest.java index 1229748..a6e489d 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/CircuitBreakerLoadBalancerTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/CircuitBreakerLoadBalancerTest.java @@ -23,6 +23,7 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; + import static org.apache.camel.component.mock.MockEndpoint.expectsMessageCount; public class CircuitBreakerLoadBalancerTest extends ContextTestSupport { @@ -46,19 +47,42 @@ public class CircuitBreakerLoadBalancerTest extends ContextTestSupport { from("direct:start").loadBalance() .circuitBreaker(2, 1000L, MyCustomException.class) .to("mock:result"); + from("direct:start-async").loadBalance() + .circuitBreaker(2, 1000L, MyCustomException.class) + .threads(1).to("mock:result"); } }; } - public void testClosedCircuitPassesMessages() throws Exception { + public void testClosedCircuitPassesMessagesSync() throws Exception { + String endpoint = "direct:start"; + closedCircuitPassesMessages(endpoint); + } + + public void testClosedCircuitPassesMessagesAsync() throws Exception { + String endpoint = "direct:start-async"; + closedCircuitPassesMessages(endpoint); + } + + private void closedCircuitPassesMessages(String endpoint) throws InterruptedException, Exception { expectsMessageCount(3, result); - sendMessage("direct:start", "message one"); - sendMessage("direct:start", "message two"); - sendMessage("direct:start", "message three"); + sendMessage(endpoint, "message one"); + sendMessage(endpoint, "message two"); + sendMessage(endpoint, "message three"); assertMockEndpointsSatisfied(); } - public void testFailedMessagesOpenCircuitToPreventMessageThree() throws Exception { + public void testFailedMessagesOpenCircuitToPreventMessageThreeSync() throws Exception { + String endpoint = "direct:start"; + failedMessagesOpenCircuitToPreventMessageThree(endpoint); + } + + public void testFailedMessagesOpenCircuitToPreventMessageThreeAsync() throws Exception { + String endpoint = "direct:start-async"; + failedMessagesOpenCircuitToPreventMessageThree(endpoint); + } + + private void failedMessagesOpenCircuitToPreventMessageThree(String endpoint) throws InterruptedException, Exception { expectsMessageCount(2, result); result.whenAnyExchangeReceived(new Processor() { @@ -68,9 +92,9 @@ public class CircuitBreakerLoadBalancerTest extends ContextTestSupport { } }); - Exchange exchangeOne = sendMessage("direct:start", "message one"); - Exchange exchangeTwo = sendMessage("direct:start", "message two"); - Exchange exchangeThree = sendMessage("direct:start", "message three"); + Exchange exchangeOne = sendMessage(endpoint, "message one"); + Exchange exchangeTwo = sendMessage(endpoint, "message two"); + Exchange exchangeThree = sendMessage(endpoint, "message three"); assertMockEndpointsSatisfied(); assertTrue(exchangeOne.getException() instanceof MyCustomException); @@ -78,7 +102,17 @@ public class CircuitBreakerLoadBalancerTest extends ContextTestSupport { assertTrue(exchangeThree.getException() instanceof RejectedExecutionException); } - public void testHalfOpenCircuitClosesAfterTimeout() throws Exception { + public void testHalfOpenCircuitClosesAfterTimeoutSync() throws Exception { + String endpoint = "direct:start"; + halfOpenCircuitClosesAfterTimeout(endpoint); + } + + public void testHalfOpenCircuitClosesAfterTimeoutAsync() throws Exception { + String endpoint = "direct:start-async"; + halfOpenCircuitClosesAfterTimeout(endpoint); + } + + private void halfOpenCircuitClosesAfterTimeout(String endpoint) throws InterruptedException, Exception { expectsMessageCount(2, result); result.whenAnyExchangeReceived(new Processor() { @Override @@ -87,16 +121,16 @@ public class CircuitBreakerLoadBalancerTest extends ContextTestSupport { } }); - sendMessage("direct:start", "message one"); - sendMessage("direct:start", "message two"); - sendMessage("direct:start", "message three"); + sendMessage(endpoint, "message one"); + sendMessage(endpoint, "message two"); + sendMessage(endpoint, "message three"); assertMockEndpointsSatisfied(); result.reset(); expectsMessageCount(1, result); Thread.sleep(1000); - sendMessage("direct:start", "message four"); + sendMessage(endpoint, "message four"); assertMockEndpointsSatisfied(); }