Repository: camel
Updated Branches:
  refs/heads/master 1fce4a3ae -> 9ed7d85f6


CAMEL-5539 Circuit Breaker EIP
Implemented circuit breaker as load balancer policy


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1f2b83af
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1f2b83af
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1f2b83af

Branch: refs/heads/master
Commit: 1f2b83aff8f866708267a6113da35c3e57dd0563
Parents: 1fce4a3
Author: Bilgin Ibryam <bibr...@apache.org>
Authored: Sat Apr 5 09:56:58 2014 +0100
Committer: Bilgin Ibryam <bibr...@apache.org>
Committed: Sat Apr 5 09:56:58 2014 +0100

----------------------------------------------------------------------
 .../camel/model/LoadBalanceDefinition.java      |  22 ++-
 .../CircuitBreakerLoadBalancerDefinition.java   | 108 ++++++++++++++
 .../CircuitBreakerLoadBalancer.java             | 145 +++++++++++++++++++
 .../CircuitBreakerLoadBalancerTest.java         | 109 ++++++++++++++
 4 files changed, 383 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1f2b83af/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java
index 973e9e5..985db35 100644
--- a/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java
@@ -31,6 +31,7 @@ import javax.xml.bind.annotation.XmlRootElement;
 
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
+import 
org.apache.camel.model.loadbalancer.CircuitBreakerLoadBalancerDefinition;
 import org.apache.camel.model.loadbalancer.CustomLoadBalancerDefinition;
 import org.apache.camel.model.loadbalancer.FailoverLoadBalancerDefinition;
 import org.apache.camel.model.loadbalancer.RandomLoadBalancerDefinition;
@@ -38,6 +39,7 @@ import 
org.apache.camel.model.loadbalancer.RoundRobinLoadBalancerDefinition;
 import org.apache.camel.model.loadbalancer.StickyLoadBalancerDefinition;
 import org.apache.camel.model.loadbalancer.TopicLoadBalancerDefinition;
 import org.apache.camel.model.loadbalancer.WeightedLoadBalancerDefinition;
+import org.apache.camel.processor.loadbalancer.CircuitBreakerLoadBalancer;
 import org.apache.camel.processor.loadbalancer.FailOverLoadBalancer;
 import org.apache.camel.processor.loadbalancer.LoadBalancer;
 import org.apache.camel.processor.loadbalancer.RandomLoadBalancer;
@@ -66,7 +68,8 @@ public class LoadBalanceDefinition extends 
ProcessorDefinition<LoadBalanceDefini
             @XmlElement(required = false, name = "roundRobin", type = 
RoundRobinLoadBalancerDefinition.class),
             @XmlElement(required = false, name = "sticky", type = 
StickyLoadBalancerDefinition.class),
             @XmlElement(required = false, name = "topic", type = 
TopicLoadBalancerDefinition.class),
-            @XmlElement(required = false, name = "weighted", type = 
WeightedLoadBalancerDefinition.class)}
+            @XmlElement(required = false, name = "weighted", type = 
WeightedLoadBalancerDefinition.class),
+            @XmlElement(required = false, name = "circuitBreaker", type = 
CircuitBreakerLoadBalancerDefinition.class)}
     )
     private LoadBalancerDefinition loadBalancerType;
     @XmlElementRef
@@ -213,6 +216,23 @@ public class LoadBalanceDefinition extends 
ProcessorDefinition<LoadBalanceDefini
     public LoadBalanceDefinition weighted(boolean roundRobin, String 
distributionRatio) {
         return weighted(roundRobin, distributionRatio, ",");
     }
+
+    /**
+     * Uses circuitBreaker load balancer
+     *
+     * @param threshold         number of errors before failure.
+     * @param halfOpenAfter     time interval in milliseconds for half open 
state.
+     * @param exceptions        exception classes which we want to break if 
one of them was thrown
+     * @return the builder
+     */
+    public LoadBalanceDefinition circuitBreaker(int threshold, long 
halfOpenAfter, Class<?>... exceptions) {
+        CircuitBreakerLoadBalancer breakerLoadBalancer = new 
CircuitBreakerLoadBalancer(Arrays.asList(exceptions));
+        breakerLoadBalancer.setThreshold(threshold);
+        breakerLoadBalancer.setHalfOpenAfter(halfOpenAfter);
+
+        setLoadBalancerType(new LoadBalancerDefinition(breakerLoadBalancer));
+        return this;
+    }
     
     /**
      * Uses weighted load balancer

http://git-wip-us.apache.org/repos/asf/camel/blob/1f2b83af/camel-core/src/main/java/org/apache/camel/model/loadbalancer/CircuitBreakerLoadBalancerDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/loadbalancer/CircuitBreakerLoadBalancerDefinition.java
 
b/camel-core/src/main/java/org/apache/camel/model/loadbalancer/CircuitBreakerLoadBalancerDefinition.java
new file mode 100644
index 0000000..94c990b
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/model/loadbalancer/CircuitBreakerLoadBalancerDefinition.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model.loadbalancer;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.camel.model.LoadBalancerDefinition;
+import org.apache.camel.processor.loadbalancer.CircuitBreakerLoadBalancer;
+import org.apache.camel.processor.loadbalancer.LoadBalancer;
+import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * Represents an XML &lt;circuitbreaker/&gt; element
+ */
+@XmlRootElement(name = "circuitbreaker")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class CircuitBreakerLoadBalancerDefinition extends 
LoadBalancerDefinition {
+    @XmlElement(name = "exception")
+    private List<String> exceptions = new ArrayList<String>();
+    @XmlAttribute
+    private Long halfOpenAfter;
+    @XmlAttribute
+    private Integer threshold;
+
+    public CircuitBreakerLoadBalancerDefinition() {
+    }
+
+    @Override
+    protected LoadBalancer createLoadBalancer(RouteContext routeContext) {
+        CircuitBreakerLoadBalancer answer;
+
+        if (!exceptions.isEmpty()) {
+            List<Class<?>> classes = new ArrayList<Class<?>>();
+            for (String name : exceptions) {
+                Class<?> type = 
routeContext.getCamelContext().getClassResolver().resolveClass(name);
+                if (type == null) {
+                    throw new IllegalArgumentException("Cannot find class: " + 
name + " in the classpath");
+                }
+                if (!ObjectHelper.isAssignableFrom(Throwable.class, type)) {
+                    throw new IllegalArgumentException("Class is not an 
instance of Throwable: " + type);
+                }
+                classes.add(type);
+            }
+            answer = new CircuitBreakerLoadBalancer(classes);
+        } else {
+            answer = new CircuitBreakerLoadBalancer();
+        }
+
+        if (getHalfOpenAfter() != null) {
+            answer.setHalfOpenAfter(getHalfOpenAfter());
+        }
+        if (getThreshold() != null) {
+            answer.setThreshold(getThreshold());
+        }
+        return answer;
+    }
+
+    public Long getHalfOpenAfter() {
+        return halfOpenAfter;
+    }
+
+    public void setHalfOpenAfter(Long halfOpenAfter) {
+        this.halfOpenAfter = halfOpenAfter;
+    }
+
+    public Integer getThreshold() {
+        return threshold;
+    }
+
+    public void setThreshold(Integer threshold) {
+        this.threshold = threshold;
+    }
+
+    public List<String> getExceptions() {
+        return exceptions;
+    }
+
+    public void setExceptions(List<String> exceptions) {
+        this.exceptions = exceptions;
+    }
+
+
+    @Override
+    public String toString() {
+        return "CircuitBreakerLoadBalancer";
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1f2b83af/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java
 
b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java
new file mode 100644
index 0000000..b8e23b4
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.loadbalancer;
+
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Traceable;
+import org.apache.camel.util.AsyncProcessorConverterHelper;
+
+public class CircuitBreakerLoadBalancer extends LoadBalancerSupport implements 
Traceable, CamelContextAware {
+    private final List<Class<?>> exceptions;
+    private CamelContext camelContext;
+    private int threshold;
+    private long halfOpenAfter;
+    private long lastFailure;
+    private AtomicInteger failures = new AtomicInteger();
+
+    public CircuitBreakerLoadBalancer(List<Class<?>> exceptions) {
+        this.exceptions = exceptions;
+    }
+    public CircuitBreakerLoadBalancer() {
+        this.exceptions = null;
+    }
+
+    public void setHalfOpenAfter(long halfOpenAfter) {
+        this.halfOpenAfter = halfOpenAfter;
+    }
+
+    public void setThreshold(int threshold) {
+        this.threshold = threshold;
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    public List<Class<?>> getExceptions() {
+        return exceptions;
+    }
+
+    protected boolean hasFailed(Exchange exchange) {
+        boolean answer = false;
+
+        if (exchange.getException() != null) {
+            if (exceptions == null || exceptions.isEmpty()) {
+                answer = true;
+            } else {
+                for (Class<?> exception : exceptions) {
+                    if (exchange.getException(exception) != null) {
+                        answer = true;
+                        break;
+                    }
+                }
+            }
+        }
+        return answer;
+    }
+
+    @Override
+    public boolean isRunAllowed() {
+        boolean forceShutdown = 
camelContext.getShutdownStrategy().forceShutdown(this);
+        if (forceShutdown) {
+            log.trace("Run not allowed as ShutdownStrategy is forcing shutting 
down");
+        }
+        return !forceShutdown && super.isRunAllowed();
+    }
+
+    public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
+
+        // can we still run
+        if (!isRunAllowed()) {
+            log.trace("Run not allowed, will reject executing exchange: {}", 
exchange);
+            if (exchange.getException() == null) {
+                exchange.setException(new RejectedExecutionException("Run is 
not allowed"));
+            }
+            callback.done(true);
+            return true;
+        }
+
+        if (failures.get() >= threshold && System.currentTimeMillis() - 
lastFailure < halfOpenAfter) {
+            exchange.setException(new 
RejectedExecutionException("CircuitBreaker Open: failures: " + failures + ", 
lastFailure: " + lastFailure));
+        }
+        Processor processor = getProcessors().get(0);
+        if (processor == null) {
+            throw new IllegalStateException("No processors could be chosen to 
process CircuitBreaker");
+        }
+
+        AsyncProcessor albp = AsyncProcessorConverterHelper.convert(processor);
+        boolean sync = albp.process(exchange, callback);
+
+        boolean failed = hasFailed(exchange);
+
+        if (!failed) {
+            failures.set(0);
+        } else {
+            failures.incrementAndGet();
+            lastFailure = System.currentTimeMillis();
+        }
+
+        if (!sync) {
+            log.trace("Processing exchangeId: {} is continued being processed 
asynchronously", exchange.getExchangeId());
+            return false;
+        }
+
+        log.trace("Processing exchangeId: {} is continued being processed 
synchronously", exchange.getExchangeId());
+        callback.done(true);
+        return true;
+    }
+
+    public String toString() {
+        return "CircuitBreakerLoadBalancer[" + getProcessors() + "]";
+    }
+
+    public String getTraceLabel() {
+        return "circuitbreaker";
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1f2b83af/camel-core/src/test/java/org/apache/camel/processor/CircuitBreakerLoadBalancerTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/CircuitBreakerLoadBalancerTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/CircuitBreakerLoadBalancerTest.java
new file mode 100644
index 0000000..124a1e1
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/CircuitBreakerLoadBalancerTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import static org.apache.camel.component.mock.MockEndpoint.expectsMessageCount;
+
+public class CircuitBreakerLoadBalancerTest extends ContextTestSupport {
+
+    private static class MyExceptionProcessor extends RuntimeException {
+    }
+
+    private MockEndpoint result;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        result = getMockEndpoint("mock:result");
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start").loadBalance()
+                    .circuitBreaker(2, 1000L, MyExceptionProcessor.class)
+                        .to("mock:result");
+            }
+        };
+    }
+
+    public void testClosedCircuitPassesMessages() throws Exception {
+        expectsMessageCount(3, result);
+        sendMessage("direct:start", "message one");
+        sendMessage("direct:start", "message two");
+        sendMessage("direct:start", "message three");
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testFailedMessagesOpenCircuitToPreventMessageThree() throws 
Exception {
+        expectsMessageCount(2, result);
+
+        result.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.setException(new MyExceptionProcessor());
+            }
+        });
+
+        Exchange exchangeOne = sendMessage("direct:start", "message one");
+        Exchange exchangeTwo = sendMessage("direct:start", "message two");
+        Exchange exchangeThree = sendMessage("direct:start", "message three");
+        assertMockEndpointsSatisfied();
+
+        assertTrue(exchangeOne.getException() instanceof MyExceptionProcessor);
+        assertTrue(exchangeTwo.getException() instanceof MyExceptionProcessor);
+        assertTrue(exchangeThree.getException() instanceof 
RejectedExecutionException);
+    }
+
+    public void testHalfOpenCircuitClosesAfterTimeout() throws Exception {
+        expectsMessageCount(2, result);
+        result.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.setException(new MyExceptionProcessor());
+            }
+        });
+
+        sendMessage("direct:start", "message one");
+        sendMessage("direct:start", "message two");
+        sendMessage("direct:start", "message three");
+        assertMockEndpointsSatisfied();
+
+        result.reset();
+        expectsMessageCount(1, result);
+
+        Thread.sleep(1000);
+        sendMessage("direct:start", "message four");
+        assertMockEndpointsSatisfied();
+    }
+
+    protected Exchange sendMessage(final String endpoint, final Object body) 
throws Exception {
+        return template.send(endpoint, new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setBody(body);
+            }
+        });
+    }
+}

Reply via email to