Repository: camel Updated Branches: refs/heads/master 1fce4a3ae -> 9ed7d85f6
CAMEL-5539 Circuit Breaker EIP Implemented circuit breaker as load balancer policy Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1f2b83af Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1f2b83af Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1f2b83af Branch: refs/heads/master Commit: 1f2b83aff8f866708267a6113da35c3e57dd0563 Parents: 1fce4a3 Author: Bilgin Ibryam <bibr...@apache.org> Authored: Sat Apr 5 09:56:58 2014 +0100 Committer: Bilgin Ibryam <bibr...@apache.org> Committed: Sat Apr 5 09:56:58 2014 +0100 ---------------------------------------------------------------------- .../camel/model/LoadBalanceDefinition.java | 22 ++- .../CircuitBreakerLoadBalancerDefinition.java | 108 ++++++++++++++ .../CircuitBreakerLoadBalancer.java | 145 +++++++++++++++++++ .../CircuitBreakerLoadBalancerTest.java | 109 ++++++++++++++ 4 files changed, 383 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1f2b83af/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java b/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java index 973e9e5..985db35 100644 --- a/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java @@ -31,6 +31,7 @@ import javax.xml.bind.annotation.XmlRootElement; import org.apache.camel.Expression; import org.apache.camel.Processor; +import org.apache.camel.model.loadbalancer.CircuitBreakerLoadBalancerDefinition; import org.apache.camel.model.loadbalancer.CustomLoadBalancerDefinition; import org.apache.camel.model.loadbalancer.FailoverLoadBalancerDefinition; import org.apache.camel.model.loadbalancer.RandomLoadBalancerDefinition; @@ -38,6 +39,7 @@ import org.apache.camel.model.loadbalancer.RoundRobinLoadBalancerDefinition; import org.apache.camel.model.loadbalancer.StickyLoadBalancerDefinition; import org.apache.camel.model.loadbalancer.TopicLoadBalancerDefinition; import org.apache.camel.model.loadbalancer.WeightedLoadBalancerDefinition; +import org.apache.camel.processor.loadbalancer.CircuitBreakerLoadBalancer; import org.apache.camel.processor.loadbalancer.FailOverLoadBalancer; import org.apache.camel.processor.loadbalancer.LoadBalancer; import org.apache.camel.processor.loadbalancer.RandomLoadBalancer; @@ -66,7 +68,8 @@ public class LoadBalanceDefinition extends ProcessorDefinition<LoadBalanceDefini @XmlElement(required = false, name = "roundRobin", type = RoundRobinLoadBalancerDefinition.class), @XmlElement(required = false, name = "sticky", type = StickyLoadBalancerDefinition.class), @XmlElement(required = false, name = "topic", type = TopicLoadBalancerDefinition.class), - @XmlElement(required = false, name = "weighted", type = WeightedLoadBalancerDefinition.class)} + @XmlElement(required = false, name = "weighted", type = WeightedLoadBalancerDefinition.class), + @XmlElement(required = false, name = "circuitBreaker", type = CircuitBreakerLoadBalancerDefinition.class)} ) private LoadBalancerDefinition loadBalancerType; @XmlElementRef @@ -213,6 +216,23 @@ public class LoadBalanceDefinition extends ProcessorDefinition<LoadBalanceDefini public LoadBalanceDefinition weighted(boolean roundRobin, String distributionRatio) { return weighted(roundRobin, distributionRatio, ","); } + + /** + * Uses circuitBreaker load balancer + * + * @param threshold number of errors before failure. + * @param halfOpenAfter time interval in milliseconds for half open state. + * @param exceptions exception classes which we want to break if one of them was thrown + * @return the builder + */ + public LoadBalanceDefinition circuitBreaker(int threshold, long halfOpenAfter, Class<?>... exceptions) { + CircuitBreakerLoadBalancer breakerLoadBalancer = new CircuitBreakerLoadBalancer(Arrays.asList(exceptions)); + breakerLoadBalancer.setThreshold(threshold); + breakerLoadBalancer.setHalfOpenAfter(halfOpenAfter); + + setLoadBalancerType(new LoadBalancerDefinition(breakerLoadBalancer)); + return this; + } /** * Uses weighted load balancer http://git-wip-us.apache.org/repos/asf/camel/blob/1f2b83af/camel-core/src/main/java/org/apache/camel/model/loadbalancer/CircuitBreakerLoadBalancerDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/loadbalancer/CircuitBreakerLoadBalancerDefinition.java b/camel-core/src/main/java/org/apache/camel/model/loadbalancer/CircuitBreakerLoadBalancerDefinition.java new file mode 100644 index 0000000..94c990b --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/model/loadbalancer/CircuitBreakerLoadBalancerDefinition.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.model.loadbalancer; + +import java.util.ArrayList; +import java.util.List; +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.camel.model.LoadBalancerDefinition; +import org.apache.camel.processor.loadbalancer.CircuitBreakerLoadBalancer; +import org.apache.camel.processor.loadbalancer.LoadBalancer; +import org.apache.camel.spi.RouteContext; +import org.apache.camel.util.ObjectHelper; + +/** + * Represents an XML <circuitbreaker/> element + */ +@XmlRootElement(name = "circuitbreaker") +@XmlAccessorType(XmlAccessType.FIELD) +public class CircuitBreakerLoadBalancerDefinition extends LoadBalancerDefinition { + @XmlElement(name = "exception") + private List<String> exceptions = new ArrayList<String>(); + @XmlAttribute + private Long halfOpenAfter; + @XmlAttribute + private Integer threshold; + + public CircuitBreakerLoadBalancerDefinition() { + } + + @Override + protected LoadBalancer createLoadBalancer(RouteContext routeContext) { + CircuitBreakerLoadBalancer answer; + + if (!exceptions.isEmpty()) { + List<Class<?>> classes = new ArrayList<Class<?>>(); + for (String name : exceptions) { + Class<?> type = routeContext.getCamelContext().getClassResolver().resolveClass(name); + if (type == null) { + throw new IllegalArgumentException("Cannot find class: " + name + " in the classpath"); + } + if (!ObjectHelper.isAssignableFrom(Throwable.class, type)) { + throw new IllegalArgumentException("Class is not an instance of Throwable: " + type); + } + classes.add(type); + } + answer = new CircuitBreakerLoadBalancer(classes); + } else { + answer = new CircuitBreakerLoadBalancer(); + } + + if (getHalfOpenAfter() != null) { + answer.setHalfOpenAfter(getHalfOpenAfter()); + } + if (getThreshold() != null) { + answer.setThreshold(getThreshold()); + } + return answer; + } + + public Long getHalfOpenAfter() { + return halfOpenAfter; + } + + public void setHalfOpenAfter(Long halfOpenAfter) { + this.halfOpenAfter = halfOpenAfter; + } + + public Integer getThreshold() { + return threshold; + } + + public void setThreshold(Integer threshold) { + this.threshold = threshold; + } + + public List<String> getExceptions() { + return exceptions; + } + + public void setExceptions(List<String> exceptions) { + this.exceptions = exceptions; + } + + + @Override + public String toString() { + return "CircuitBreakerLoadBalancer"; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/1f2b83af/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java new file mode 100644 index 0000000..b8e23b4 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.loadbalancer; + +import java.util.List; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Traceable; +import org.apache.camel.util.AsyncProcessorConverterHelper; + +public class CircuitBreakerLoadBalancer extends LoadBalancerSupport implements Traceable, CamelContextAware { + private final List<Class<?>> exceptions; + private CamelContext camelContext; + private int threshold; + private long halfOpenAfter; + private long lastFailure; + private AtomicInteger failures = new AtomicInteger(); + + public CircuitBreakerLoadBalancer(List<Class<?>> exceptions) { + this.exceptions = exceptions; + } + public CircuitBreakerLoadBalancer() { + this.exceptions = null; + } + + public void setHalfOpenAfter(long halfOpenAfter) { + this.halfOpenAfter = halfOpenAfter; + } + + public void setThreshold(int threshold) { + this.threshold = threshold; + } + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + public List<Class<?>> getExceptions() { + return exceptions; + } + + protected boolean hasFailed(Exchange exchange) { + boolean answer = false; + + if (exchange.getException() != null) { + if (exceptions == null || exceptions.isEmpty()) { + answer = true; + } else { + for (Class<?> exception : exceptions) { + if (exchange.getException(exception) != null) { + answer = true; + break; + } + } + } + } + return answer; + } + + @Override + public boolean isRunAllowed() { + boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(this); + if (forceShutdown) { + log.trace("Run not allowed as ShutdownStrategy is forcing shutting down"); + } + return !forceShutdown && super.isRunAllowed(); + } + + public boolean process(final Exchange exchange, final AsyncCallback callback) { + + // can we still run + if (!isRunAllowed()) { + log.trace("Run not allowed, will reject executing exchange: {}", exchange); + if (exchange.getException() == null) { + exchange.setException(new RejectedExecutionException("Run is not allowed")); + } + callback.done(true); + return true; + } + + if (failures.get() >= threshold && System.currentTimeMillis() - lastFailure < halfOpenAfter) { + exchange.setException(new RejectedExecutionException("CircuitBreaker Open: failures: " + failures + ", lastFailure: " + lastFailure)); + } + Processor processor = getProcessors().get(0); + if (processor == null) { + throw new IllegalStateException("No processors could be chosen to process CircuitBreaker"); + } + + AsyncProcessor albp = AsyncProcessorConverterHelper.convert(processor); + boolean sync = albp.process(exchange, callback); + + boolean failed = hasFailed(exchange); + + if (!failed) { + failures.set(0); + } else { + failures.incrementAndGet(); + lastFailure = System.currentTimeMillis(); + } + + if (!sync) { + log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); + return false; + } + + log.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId()); + callback.done(true); + return true; + } + + public String toString() { + return "CircuitBreakerLoadBalancer[" + getProcessors() + "]"; + } + + public String getTraceLabel() { + return "circuitbreaker"; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/1f2b83af/camel-core/src/test/java/org/apache/camel/processor/CircuitBreakerLoadBalancerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/CircuitBreakerLoadBalancerTest.java b/camel-core/src/test/java/org/apache/camel/processor/CircuitBreakerLoadBalancerTest.java new file mode 100644 index 0000000..124a1e1 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/CircuitBreakerLoadBalancerTest.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.concurrent.RejectedExecutionException; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import static org.apache.camel.component.mock.MockEndpoint.expectsMessageCount; + +public class CircuitBreakerLoadBalancerTest extends ContextTestSupport { + + private static class MyExceptionProcessor extends RuntimeException { + } + + private MockEndpoint result; + + @Override + protected void setUp() throws Exception { + super.setUp(); + result = getMockEndpoint("mock:result"); + } + + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from("direct:start").loadBalance() + .circuitBreaker(2, 1000L, MyExceptionProcessor.class) + .to("mock:result"); + } + }; + } + + public void testClosedCircuitPassesMessages() throws Exception { + expectsMessageCount(3, result); + sendMessage("direct:start", "message one"); + sendMessage("direct:start", "message two"); + sendMessage("direct:start", "message three"); + assertMockEndpointsSatisfied(); + } + + public void testFailedMessagesOpenCircuitToPreventMessageThree() throws Exception { + expectsMessageCount(2, result); + + result.whenAnyExchangeReceived(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.setException(new MyExceptionProcessor()); + } + }); + + Exchange exchangeOne = sendMessage("direct:start", "message one"); + Exchange exchangeTwo = sendMessage("direct:start", "message two"); + Exchange exchangeThree = sendMessage("direct:start", "message three"); + assertMockEndpointsSatisfied(); + + assertTrue(exchangeOne.getException() instanceof MyExceptionProcessor); + assertTrue(exchangeTwo.getException() instanceof MyExceptionProcessor); + assertTrue(exchangeThree.getException() instanceof RejectedExecutionException); + } + + public void testHalfOpenCircuitClosesAfterTimeout() throws Exception { + expectsMessageCount(2, result); + result.whenAnyExchangeReceived(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.setException(new MyExceptionProcessor()); + } + }); + + sendMessage("direct:start", "message one"); + sendMessage("direct:start", "message two"); + sendMessage("direct:start", "message three"); + assertMockEndpointsSatisfied(); + + result.reset(); + expectsMessageCount(1, result); + + Thread.sleep(1000); + sendMessage("direct:start", "message four"); + assertMockEndpointsSatisfied(); + } + + protected Exchange sendMessage(final String endpoint, final Object body) throws Exception { + return template.send(endpoint, new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody(body); + } + }); + } +}