Repository: camel Updated Branches: refs/heads/master b93083465 -> 308529ff3
CAMEL-3910: Failover LB should have sticky mode. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/308529ff Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/308529ff Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/308529ff Branch: refs/heads/master Commit: 308529ff3ce0d6b076312dbd8db36c5ab92b62a6 Parents: b930834 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Jul 12 10:54:59 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Jul 12 10:59:24 2015 +0200 ---------------------------------------------------------------------- .../camel/model/LoadBalanceDefinition.java | 18 +++++ .../FailoverLoadBalancerDefinition.java | 26 +++++- .../loadbalancer/FailOverLoadBalancer.java | 22 ++++- .../processor/FailoverRoundRobinStickyTest.java | 84 ++++++++++++++++++++ .../camel/processor/FailoverStickyTest.java | 84 ++++++++++++++++++++ .../SpringFailoverRoundRobinStickyTest.java | 32 ++++++++ .../processor/FailoverRoundRobinStickyTest.xml | 75 +++++++++++++++++ 7 files changed, 338 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/308529ff/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java b/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java index 32ec236..97db364 100644 --- a/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java @@ -173,10 +173,28 @@ public class LoadBalanceDefinition extends ProcessorDefinition<LoadBalanceDefini * @return the builder */ public LoadBalanceDefinition failover(int maximumFailoverAttempts, boolean inheritErrorHandler, boolean roundRobin, Class<?>... exceptions) { + return failover(maximumFailoverAttempts, inheritErrorHandler, roundRobin, false, exceptions); + } + + /** + * Uses fail over load balancer + * + * @param maximumFailoverAttempts maximum number of failover attempts before exhausting. + * Use -1 to newer exhaust when round robin is also enabled. + * If round robin is disabled then it will exhaust when there are no more endpoints to failover + * @param inheritErrorHandler whether or not to inherit error handler. + * If <tt>false</tt> then it will failover immediately in case of an exception + * @param roundRobin whether or not to use round robin (which keeps state) + * @param sticky whether or not to use sticky (which keeps state) + * @param exceptions exception classes which we want to failover if one of them was thrown + * @return the builder + */ + public LoadBalanceDefinition failover(int maximumFailoverAttempts, boolean inheritErrorHandler, boolean roundRobin, boolean sticky, Class<?>... exceptions) { FailoverLoadBalancerDefinition def = new FailoverLoadBalancerDefinition(); def.setExceptionTypes(Arrays.asList(exceptions)); def.setMaximumFailoverAttempts(maximumFailoverAttempts); def.setRoundRobin(roundRobin); + def.setSticky(sticky); setLoadBalancerType(def); this.setInheritErrorHandler(inheritErrorHandler); return this; http://git-wip-us.apache.org/repos/asf/camel/blob/308529ff/camel-core/src/main/java/org/apache/camel/model/loadbalancer/FailoverLoadBalancerDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/loadbalancer/FailoverLoadBalancerDefinition.java b/camel-core/src/main/java/org/apache/camel/model/loadbalancer/FailoverLoadBalancerDefinition.java index 523f794..cd1f062 100644 --- a/camel-core/src/main/java/org/apache/camel/model/loadbalancer/FailoverLoadBalancerDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/loadbalancer/FailoverLoadBalancerDefinition.java @@ -50,6 +50,8 @@ public class FailoverLoadBalancerDefinition extends LoadBalancerDefinition { private List<String> exceptions = new ArrayList<String>(); @XmlAttribute private Boolean roundRobin; + @XmlAttribute + private Boolean sticky; @XmlAttribute @Metadata(defaultValue = "-1") private Integer maximumFailoverAttempts; @@ -87,6 +89,9 @@ public class FailoverLoadBalancerDefinition extends LoadBalancerDefinition { if (roundRobin != null) { answer.setRoundRobin(roundRobin); } + if (sticky != null) { + answer.setSticky(sticky); + } return answer; } @@ -124,12 +129,31 @@ public class FailoverLoadBalancerDefinition extends LoadBalancerDefinition { * If not, then it will always start from the first endpoint when a new message is to be processed. * In other words it restart from the top for every message. * If round robin is enabled, then it keeps state and will continue with the next endpoint in a round robin fashion. - * When using round robin it will not stick to last known good endpoint, it will always pick the next endpoint to use. + * <p/> + * You can also enable sticky mode together with round robin, if so then it will pick the last known good endpoint + * to use when starting the load balancing (instead of using the next when starting). */ public void setRoundRobin(Boolean roundRobin) { this.roundRobin = roundRobin; } + public Boolean getSticky() { + return sticky; + } + + /** + * Whether or not the failover load balancer should operate in sticky mode or not. + * If not, then it will always start from the first endpoint when a new message is to be processed. + * In other words it restart from the top for every message. + * If sticky is enabled, then it keeps state and will continue with the last known good endpoint. + * <p/> + * You can also enable sticky mode together with round robin, if so then it will pick the last known good endpoint + * to use when starting the load balancing (instead of using the next when starting). + */ + public void setSticky(Boolean sticky) { + this.sticky = sticky; + } + public Integer getMaximumFailoverAttempts() { return maximumFailoverAttempts; } http://git-wip-us.apache.org/repos/asf/camel/blob/308529ff/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java index 42b76f4..76bfa74 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java +++ b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java @@ -43,10 +43,12 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab private final List<Class<?>> exceptions; private CamelContext camelContext; private boolean roundRobin; + private boolean sticky; private int maximumFailoverAttempts = -1; // stateful counter private final AtomicInteger counter = new AtomicInteger(-1); + private final AtomicInteger lastGoodIndex = new AtomicInteger(); public FailOverLoadBalancer() { this.exceptions = null; @@ -85,6 +87,14 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab this.roundRobin = roundRobin; } + public boolean isSticky() { + return sticky; + } + + public void setSticky(boolean sticky) { + this.sticky = sticky; + } + public int getMaximumFailoverAttempts() { return maximumFailoverAttempts; } @@ -147,7 +157,9 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab Exchange copy = null; // get the next processor - if (isRoundRobin()) { + if (isSticky()) { + index.set(lastGoodIndex.get()); + } else if (isRoundRobin()) { if (counter.incrementAndGet() >= processors.size()) { counter.set(0); } @@ -214,6 +226,9 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab log.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId()); } + // remember last good index + lastGoodIndex.set(index.get()); + // and copy the current result to original so it will contain this result of this eip if (copy != null) { ExchangeHelper.copyResults(exchange, copy); @@ -323,6 +338,9 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab } } + // remember last good index + lastGoodIndex.set(index.get()); + // and copy the current result to original so it will contain this result of this eip if (copy != null) { ExchangeHelper.copyResults(exchange, copy); @@ -330,7 +348,7 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab log.debug("Failover complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); // signal callback we are done callback.done(false); - }; + } } public String toString() { http://git-wip-us.apache.org/repos/asf/camel/blob/308529ff/camel-core/src/test/java/org/apache/camel/processor/FailoverRoundRobinStickyTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/FailoverRoundRobinStickyTest.java b/camel-core/src/test/java/org/apache/camel/processor/FailoverRoundRobinStickyTest.java new file mode 100644 index 0000000..ac869c0 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/FailoverRoundRobinStickyTest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version + */ +public class FailoverRoundRobinStickyTest extends ContextTestSupport { + + public void testFailoverRoundRobinSticky() throws Exception { + getMockEndpoint("mock:bad").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:bad2").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:good").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:good2").expectedMessageCount(0); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + + // as its round robin and sticky based it remembers that last good endpoint + // and will invoke the last good + + resetMocks(); + + getMockEndpoint("mock:bad").expectedMessageCount(0); + getMockEndpoint("mock:bad2").expectedMessageCount(0); + getMockEndpoint("mock:good").expectedBodiesReceived("Bye World"); + getMockEndpoint("mock:good2").expectedMessageCount(0); + + template.sendBody("direct:start", "Bye World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // START SNIPPET: e1 + from("direct:start") + // Use failover load balancer in stateful round robin and sticky mode + // which mean it will failover immediately in case of an exception + // as it does NOT inherit error handler. It will also keep retrying as + // its configured to newer exhaust. + .loadBalance().failover(-1, false, true, true). + to("direct:bad", "direct:bad2", "direct:good", "direct:good2"); + // END SNIPPET: e1 + + from("direct:bad") + .to("mock:bad") + .throwException(new IllegalArgumentException("Damn")); + + from("direct:bad2") + .to("mock:bad2") + .throwException(new IllegalArgumentException("Damn Again")); + + from("direct:good") + .to("mock:good"); + + from("direct:good2") + .to("mock:good2"); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/308529ff/camel-core/src/test/java/org/apache/camel/processor/FailoverStickyTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/FailoverStickyTest.java b/camel-core/src/test/java/org/apache/camel/processor/FailoverStickyTest.java new file mode 100644 index 0000000..c9539a0 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/FailoverStickyTest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version + */ +public class FailoverStickyTest extends ContextTestSupport { + + public void testFailoverSticky() throws Exception { + getMockEndpoint("mock:bad").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:bad2").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:good").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:good2").expectedMessageCount(0); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + + // as its sticky based it remembers that last good endpoint + // and will invoke the last good + + resetMocks(); + + getMockEndpoint("mock:bad").expectedMessageCount(0); + getMockEndpoint("mock:bad2").expectedMessageCount(0); + getMockEndpoint("mock:good").expectedBodiesReceived("Bye World"); + getMockEndpoint("mock:good2").expectedMessageCount(0); + + template.sendBody("direct:start", "Bye World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // START SNIPPET: e1 + from("direct:start") + // Use failover load balancer in stateful sticky mode + // which mean it will failover immediately in case of an exception + // as it does NOT inherit error handler. It will also keep retrying as + // its configured to newer exhaust. + .loadBalance().failover(-1, false, false, true). + to("direct:bad", "direct:bad2", "direct:good", "direct:good2"); + // END SNIPPET: e1 + + from("direct:bad") + .to("mock:bad") + .throwException(new IllegalArgumentException("Damn")); + + from("direct:bad2") + .to("mock:bad2") + .throwException(new IllegalArgumentException("Damn Again")); + + from("direct:good") + .to("mock:good"); + + from("direct:good2") + .to("mock:good2"); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/308529ff/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringFailoverRoundRobinStickyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringFailoverRoundRobinStickyTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringFailoverRoundRobinStickyTest.java new file mode 100644 index 0000000..ce19240 --- /dev/null +++ b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringFailoverRoundRobinStickyTest.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spring.processor; + +import org.apache.camel.CamelContext; +import org.apache.camel.processor.FailoverRoundRobinStickyTest; + +import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext; + +/** + * @version + */ +public class SpringFailoverRoundRobinStickyTest extends FailoverRoundRobinStickyTest { + + protected CamelContext createCamelContext() throws Exception { + return createSpringCamelContext(this, "org/apache/camel/spring/processor/FailoverRoundRobinStickyTest.xml"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/308529ff/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/FailoverRoundRobinStickyTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/FailoverRoundRobinStickyTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/FailoverRoundRobinStickyTest.xml new file mode 100644 index 0000000..c96c63f --- /dev/null +++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/FailoverRoundRobinStickyTest.xml @@ -0,0 +1,75 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd + "> + + <bean id="damn" class="java.lang.IllegalArgumentException"> + <constructor-arg index="0" value="Damn"/> + </bean> + + <bean id="damnAgain" class="java.lang.IllegalArgumentException"> + <constructor-arg index="0" value="Damn Again"/> + </bean> + + <camelContext xmlns="http://camel.apache.org/schema/spring"> + + <!-- START SNIPPET: e1 --> + <route> + <from uri="direct:start"/> + <loadBalance> + <!-- failover using stateful round robin and sticky mode, + which will keep retrying forever those 4 endpoints until success. + You can set the maximumFailoverAttempt to break out after X attempts --> + <failover roundRobin="true" sticky="true"/> + <to uri="direct:bad"/> + <to uri="direct:bad2"/> + <to uri="direct:good"/> + <to uri="direct:good2"/> + </loadBalance> + </route> + <!-- END SNIPPET: e1 --> + + <route> + <from uri="direct:bad"/> + <to uri="mock:bad"/> + <throwException ref="damn"/> + </route> + + <route> + <from uri="direct:bad2"/> + <to uri="mock:bad2"/> + <throwException ref="damnAgain"/> + </route> + + <route> + <from uri="direct:good"/> + <to uri="mock:good"/> + </route> + + <route> + <from uri="direct:good2"/> + <to uri="mock:good2"/> + </route> + + </camelContext> + +</beans>