Repository: camel
Updated Branches:
  refs/heads/master 76c3a420c -> 799b45df0


CAMEL-7973 Fixed the issue that CircuitBreakerLoadBalancer fails on async 
processors with thanks to Matteo


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/799b45df
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/799b45df
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/799b45df

Branch: refs/heads/master
Commit: 799b45df0c2e670f0240ddd3e9c6256fcb4b3740
Parents: 76c3a42
Author: Willem Jiang <willem.ji...@gmail.com>
Authored: Thu Oct 30 17:04:28 2014 +0800
Committer: Willem Jiang <willem.ji...@gmail.com>
Committed: Thu Oct 30 17:04:28 2014 +0800

----------------------------------------------------------------------
 .../CircuitBreakerLoadBalancer.java             | 55 ++++++++++++++----
 .../CircuitBreakerLoadBalancerTest.java         | 60 +++++++++++++++-----
 2 files changed, 91 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/799b45df/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java
 
b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java
index b8e23b4..3e84e6e 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java
@@ -107,6 +107,13 @@ public class CircuitBreakerLoadBalancer extends 
LoadBalancerSupport implements T
 
         if (failures.get() >= threshold && System.currentTimeMillis() - 
lastFailure < halfOpenAfter) {
             exchange.setException(new 
RejectedExecutionException("CircuitBreaker Open: failures: " + failures + ", 
lastFailure: " + lastFailure));
+            /*
+             * If the circuit opens, we have to prevent the execution of any 
processor.
+             * The failures count can be set to 0.
+             */
+            failures.set(0);
+            callback.done(true);
+            return true;
         }
         Processor processor = getProcessors().get(0);
         if (processor == null) {
@@ -114,18 +121,20 @@ public class CircuitBreakerLoadBalancer extends 
LoadBalancerSupport implements T
         }
 
         AsyncProcessor albp = AsyncProcessorConverterHelper.convert(processor);
-        boolean sync = albp.process(exchange, callback);
-
-        boolean failed = hasFailed(exchange);
-
-        if (!failed) {
-            failures.set(0);
+        // Added a callback for processing the exchange in the callback
+        boolean sync = albp.process(exchange, new 
CircuitBreakerCallback(exchange, callback));
+        
+        // We need to check the exception here as albp is use sync call  
+        if (sync) {
+            boolean failed = hasFailed(exchange);
+            if (!failed) {
+                failures.set(0);
+            } else {
+                failures.incrementAndGet();
+                lastFailure = System.currentTimeMillis();
+            }
         } else {
-            failures.incrementAndGet();
-            lastFailure = System.currentTimeMillis();
-        }
-
-        if (!sync) {
+            // CircuitBreakerCallback can take care of failure check of the 
exchange
             log.trace("Processing exchangeId: {} is continued being processed 
asynchronously", exchange.getExchangeId());
             return false;
         }
@@ -142,4 +151,28 @@ public class CircuitBreakerLoadBalancer extends 
LoadBalancerSupport implements T
     public String getTraceLabel() {
         return "circuitbreaker";
     }
+    
+    class CircuitBreakerCallback implements AsyncCallback {
+        private final AsyncCallback callback;
+        private final Exchange exchange;
+        CircuitBreakerCallback(Exchange exchange, AsyncCallback callback) {
+            this.callback = callback;
+            this.exchange = exchange;
+        }
+
+        @Override
+        public void done(boolean doneSync) {
+            if (!doneSync) {
+                boolean failed = hasFailed(exchange);
+                if (!failed) {
+                    failures.set(0);
+                } else {
+                    failures.incrementAndGet();
+                    lastFailure = System.currentTimeMillis();
+                }
+            }
+            callback.done(doneSync);
+        }
+        
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/799b45df/camel-core/src/test/java/org/apache/camel/processor/CircuitBreakerLoadBalancerTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/CircuitBreakerLoadBalancerTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/CircuitBreakerLoadBalancerTest.java
index 1229748..a6e489d 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/CircuitBreakerLoadBalancerTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/CircuitBreakerLoadBalancerTest.java
@@ -23,6 +23,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+
 import static org.apache.camel.component.mock.MockEndpoint.expectsMessageCount;
 
 public class CircuitBreakerLoadBalancerTest extends ContextTestSupport {
@@ -46,19 +47,42 @@ public class CircuitBreakerLoadBalancerTest extends 
ContextTestSupport {
                 from("direct:start").loadBalance()
                     .circuitBreaker(2, 1000L, MyCustomException.class)
                         .to("mock:result");
+                from("direct:start-async").loadBalance()
+                .circuitBreaker(2, 1000L, MyCustomException.class)
+                    .threads(1).to("mock:result");
             }
         };
     }
 
-    public void testClosedCircuitPassesMessages() throws Exception {
+    public void testClosedCircuitPassesMessagesSync() throws Exception {
+        String endpoint = "direct:start";
+        closedCircuitPassesMessages(endpoint);
+    }
+    
+    public void testClosedCircuitPassesMessagesAsync() throws Exception {
+        String endpoint = "direct:start-async";
+        closedCircuitPassesMessages(endpoint);
+    }
+
+    private void closedCircuitPassesMessages(String endpoint) throws 
InterruptedException, Exception {
         expectsMessageCount(3, result);
-        sendMessage("direct:start", "message one");
-        sendMessage("direct:start", "message two");
-        sendMessage("direct:start", "message three");
+        sendMessage(endpoint, "message one");
+        sendMessage(endpoint, "message two");
+        sendMessage(endpoint, "message three");
         assertMockEndpointsSatisfied();
     }
 
-    public void testFailedMessagesOpenCircuitToPreventMessageThree() throws 
Exception {
+    public void testFailedMessagesOpenCircuitToPreventMessageThreeSync() 
throws Exception {
+        String endpoint = "direct:start";
+        failedMessagesOpenCircuitToPreventMessageThree(endpoint);
+    }
+    
+    public void testFailedMessagesOpenCircuitToPreventMessageThreeAsync() 
throws Exception {
+        String endpoint = "direct:start-async";
+        failedMessagesOpenCircuitToPreventMessageThree(endpoint);
+    }
+
+    private void failedMessagesOpenCircuitToPreventMessageThree(String 
endpoint) throws InterruptedException, Exception {
         expectsMessageCount(2, result);
 
         result.whenAnyExchangeReceived(new Processor() {
@@ -68,9 +92,9 @@ public class CircuitBreakerLoadBalancerTest extends 
ContextTestSupport {
             }
         });
 
-        Exchange exchangeOne = sendMessage("direct:start", "message one");
-        Exchange exchangeTwo = sendMessage("direct:start", "message two");
-        Exchange exchangeThree = sendMessage("direct:start", "message three");
+        Exchange exchangeOne = sendMessage(endpoint, "message one");
+        Exchange exchangeTwo = sendMessage(endpoint, "message two");
+        Exchange exchangeThree = sendMessage(endpoint, "message three");
         assertMockEndpointsSatisfied();
 
         assertTrue(exchangeOne.getException() instanceof MyCustomException);
@@ -78,7 +102,17 @@ public class CircuitBreakerLoadBalancerTest extends 
ContextTestSupport {
         assertTrue(exchangeThree.getException() instanceof 
RejectedExecutionException);
     }
 
-    public void testHalfOpenCircuitClosesAfterTimeout() throws Exception {
+    public void testHalfOpenCircuitClosesAfterTimeoutSync() throws Exception {
+        String endpoint = "direct:start";
+        halfOpenCircuitClosesAfterTimeout(endpoint);
+    }
+    
+    public void testHalfOpenCircuitClosesAfterTimeoutAsync() throws Exception {
+        String endpoint = "direct:start-async";
+        halfOpenCircuitClosesAfterTimeout(endpoint);
+    }
+
+    private void halfOpenCircuitClosesAfterTimeout(String endpoint) throws 
InterruptedException, Exception {
         expectsMessageCount(2, result);
         result.whenAnyExchangeReceived(new Processor() {
             @Override
@@ -87,16 +121,16 @@ public class CircuitBreakerLoadBalancerTest extends 
ContextTestSupport {
             }
         });
 
-        sendMessage("direct:start", "message one");
-        sendMessage("direct:start", "message two");
-        sendMessage("direct:start", "message three");
+        sendMessage(endpoint, "message one");
+        sendMessage(endpoint, "message two");
+        sendMessage(endpoint, "message three");
         assertMockEndpointsSatisfied();
 
         result.reset();
         expectsMessageCount(1, result);
 
         Thread.sleep(1000);
-        sendMessage("direct:start", "message four");
+        sendMessage(endpoint, "message four");
         assertMockEndpointsSatisfied();
     }
 

Reply via email to