This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 9cc0eedfd62be8809db4c9c97ad42eddda121d0f Author: CodeSmell <mbarlo...@gmail.com> AuthorDate: Thu Jan 4 17:57:13 2018 -0500 [CAMEL-12125] add keepOpen to endpoint circuit breaker --- .../camel/impl/ThrottlingExceptionRoutePolicy.java | 168 +++++++++++++-------- ...lingExceptionRoutePolicyKeepOpenOnInitTest.java | 98 ++++++++++++ ...tlingExceptionRoutePolicyOpenViaConfigTest.java | 104 +++++++++++++ 3 files changed, 309 insertions(+), 61 deletions(-) diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java index 92aba77..0ee7b83 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java @@ -16,13 +16,6 @@ */ package org.apache.camel.impl; -import java.util.List; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.Exchange; @@ -33,38 +26,46 @@ import org.apache.camel.support.RoutePolicySupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + /** * Modeled after the {@link CircuitBreakerLoadBalancer} and {@link ThrottlingInflightRoutePolicy} * this {@link RoutePolicy} will stop consuming from an endpoint based on the type of exceptions that are - * thrown and the threshold setting. - * + * thrown and the threshold setting. + * * the scenario: if a route cannot process data from an endpoint due to problems with resources used by the route - * (ie database down) then it will stop consuming new messages from the endpoint by stopping the consumer. - * The implementation is comparable to the Circuit Breaker pattern. After a set amount of time, it will move + * (ie database down) then it will stop consuming new messages from the endpoint by stopping the consumer. + * The implementation is comparable to the Circuit Breaker pattern. After a set amount of time, it will move * to a half open state and attempt to determine if the consumer can be started. * There are two ways to determine if a route can be closed after being opened * (1) start the consumer and check the failure threshold - * (2) call the {@link ThrottlingExceptionHalfOpenHandler} + * (2) call the {@link ThrottlingExceptionHalfOpenHandler} * The second option allows a custom check to be performed without having to take on the possibility of * multiple messages from the endpoint. The idea is that a handler could run a simple test (ie select 1 from dual) - * to determine if the processes that cause the route to be open are now available + * to determine if the processes that cause the route to be open are now available */ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implements CamelContextAware { private static final Logger LOG = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicy.class); - + private static final int STATE_CLOSED = 0; private static final int STATE_HALF_OPEN = 1; private static final int STATE_OPEN = 2; - + private CamelContext camelContext; private final Lock lock = new ReentrantLock(); - + // configuration private int failureThreshold; private long failureWindow; private long halfOpenAfter; private final List<Class<?>> throttledExceptions; - + // handler for half open circuit // can be used instead of resuming route // to check on resources @@ -73,17 +74,27 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement // stateful information private final AtomicInteger failures = new AtomicInteger(); private final AtomicInteger state = new AtomicInteger(STATE_CLOSED); + private AtomicBoolean keepOpen = new AtomicBoolean(false); private volatile Timer halfOpenTimer; private volatile long lastFailure; private volatile long openedAt; - + public ThrottlingExceptionRoutePolicy(int threshold, long failureWindow, long halfOpenAfter, List<Class<?>> handledExceptions) { this.throttledExceptions = handledExceptions; this.failureWindow = failureWindow; this.halfOpenAfter = halfOpenAfter; this.failureThreshold = threshold; + this.keepOpen.set(false); + } + + public ThrottlingExceptionRoutePolicy(int threshold, long failureWindow, long halfOpenAfter, List<Class<?>> handledExceptions, boolean keepOpen) { + this.throttledExceptions = handledExceptions; + this.failureWindow = failureWindow; + this.halfOpenAfter = halfOpenAfter; + this.failureThreshold = threshold; + this.keepOpen.set(keepOpen); } - + @Override public void setCamelContext(CamelContext camelContext) { this.camelContext = camelContext; @@ -99,22 +110,37 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement LOG.debug("Initializing ThrottlingExceptionRoutePolicy route policy..."); logState(); } - + + @Override + public void onStart(Route route) { + // if keepOpen then start w/ the circuit open + if (keepOpen.get()) { + openCircuit(route); + } + } + @Override public void onExchangeDone(Route route, Exchange exchange) { - if (hasFailed(exchange)) { - // record the failure - failures.incrementAndGet(); - lastFailure = System.currentTimeMillis(); - } - - // check for state change - calculateState(route); + if (keepOpen.get()) { + if (state.get() != STATE_OPEN) { + LOG.debug("opening circuit b/c keepOpen is on"); + openCircuit(route); + } + } else { + if (hasFailed(exchange)) { + // record the failure + failures.incrementAndGet(); + lastFailure = System.currentTimeMillis(); + } + + // check for state change + calculateState(route); + } } - + /** * uses similar approach as {@link CircuitBreakerLoadBalancer} - * if the exchange has an exception that we are watching + * if the exchange has an exception that we are watching * then we count that as a failure otherwise we ignore it */ private boolean hasFailed(Exchange exchange) { @@ -126,7 +152,7 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement if (exchange.getException() != null) { if (throttledExceptions == null || throttledExceptions.isEmpty()) { - // if no exceptions defined then always fail + // if no exceptions defined then always fail // (ie) assume we throttle on all exceptions answer = true; } else { @@ -148,10 +174,10 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement } private void calculateState(Route route) { - + // have we reached the failure limit? boolean failureLimitReached = isThresholdExceeded(); - + if (state.get() == STATE_CLOSED) { if (failureLimitReached) { LOG.debug("Opening circuit..."); @@ -166,46 +192,52 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement closeCircuit(route); } } else if (state.get() == STATE_OPEN) { - long elapsedTimeSinceOpened = System.currentTimeMillis() - openedAt; - if (halfOpenAfter <= elapsedTimeSinceOpened) { - LOG.debug("Checking an open circuit..."); - if (halfOpenHandler != null) { - if (halfOpenHandler.isReadyToBeClosed()) { - LOG.debug("Closing circuit..."); - closeCircuit(route); + if (!keepOpen.get()) { + long elapsedTimeSinceOpened = System.currentTimeMillis() - openedAt; + if (halfOpenAfter <= elapsedTimeSinceOpened) { + LOG.debug("Checking an open circuit..."); + if (halfOpenHandler != null) { + if (halfOpenHandler.isReadyToBeClosed()) { + LOG.debug("Closing circuit..."); + closeCircuit(route); + } else { + LOG.debug("Opening circuit..."); + openCircuit(route); + } } else { - LOG.debug("Opening circuit..."); - openCircuit(route); + LOG.debug("Half opening circuit..."); + halfOpenCircuit(route); } } else { - LOG.debug("Half opening circuit..."); - halfOpenCircuit(route); + log.debug("keeping circuit open (time not elapsed)..."); } - } + } else { + log.debug("keeping circuit open (keepOpen is true)..."); + this.addHalfOpenTimer(route); + } } - + } - + protected boolean isThresholdExceeded() { boolean output = false; logState(); - // failures exceed the threshold + // failures exceed the threshold // AND the last of those failures occurred within window if ((failures.get() >= failureThreshold) && (lastFailure >= System.currentTimeMillis() - failureWindow)) { output = true; } - + return output; } - + protected void openCircuit(Route route) { try { lock.lock(); suspendOrStopConsumer(route.getConsumer()); state.set(STATE_OPEN); openedAt = System.currentTimeMillis(); - halfOpenTimer = new Timer(); - halfOpenTimer.schedule(new HalfOpenTask(route), halfOpenAfter); + this.addHalfOpenTimer(route); logState(); } catch (Exception e) { handleException(e); @@ -214,6 +246,11 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement } } + protected void addHalfOpenTimer(Route route) { + halfOpenTimer = new Timer(); + halfOpenTimer.schedule(new HalfOpenTask(route), halfOpenAfter); + } + protected void halfOpenCircuit(Route route) { try { lock.lock(); @@ -226,7 +263,7 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement lock.unlock(); } } - + protected void closeCircuit(Route route) { try { lock.lock(); @@ -242,13 +279,13 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement lock.unlock(); } } - + private void logState() { if (LOG.isDebugEnabled()) { LOG.debug(dumpState()); } } - + public String dumpState() { int num = state.get(); String routeState = stateAsString(num); @@ -258,7 +295,7 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement return String.format("State %s, failures %d", routeState, failures.get()); } } - + private static String stateAsString(int num) { if (num == STATE_CLOSED) { return "closed"; @@ -268,21 +305,21 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement return "opened"; } } - + class HalfOpenTask extends TimerTask { private final Route route; - + HalfOpenTask(Route route) { this.route = route; } - + @Override public void run() { - calculateState(route); halfOpenTimer.cancel(); + calculateState(route); } } - + public ThrottlingExceptionHalfOpenHandler getHalfOpenHandler() { return halfOpenHandler; } @@ -291,6 +328,15 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement this.halfOpenHandler = halfOpenHandler; } + public boolean getKeepOpen() { + return this.keepOpen.get(); + } + + public void setKeepOpen(boolean keepOpen) { + log.debug("keep open:" + keepOpen); + this.keepOpen.set(keepOpen); + } + public int getFailureThreshold() { return failureThreshold; } diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyKeepOpenOnInitTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyKeepOpenOnInitTest.java new file mode 100644 index 0000000..25134d5 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyKeepOpenOnInitTest.java @@ -0,0 +1,98 @@ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.ThrottlingExceptionRoutePolicy; +import org.junit.Before; +import org.junit.Test; + +public class ThrottlingExceptionRoutePolicyKeepOpenOnInitTest extends ContextTestSupport { + + private String url = "seda:foo?concurrentConsumers=20"; + private MockEndpoint result; + private int size = 5; + + private ThrottlingExceptionRoutePolicy policy; + + @Override + @Before + public void setUp() throws Exception { + this.createPolicy(); + + super.setUp(); + this.setUseRouteBuilder(true); + result = getMockEndpoint("mock:result"); + context.getShutdownStrategy().setTimeout(1); + } + + protected void createPolicy() { + int threshold = 2; + long failureWindow = 30; + long halfOpenAfter = 1000; + boolean keepOpen = true; + policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null, keepOpen); + } + + @Test + public void testThrottlingRoutePolicyStartWithAlwaysOpenOn() throws Exception { + + log.debug("---- sending some messages"); + for (int i = 0; i < size; i++) { + template.sendBody(url, "Message " + i); + Thread.sleep(3); + } + + // gives time for policy half open check to run every second + // and should not close b/c keepOpen is true + Thread.sleep(2000); + + // gives time for policy half open check to run every second + // but it should never close b/c keepOpen is true + result.expectedMessageCount(0); + result.setResultWaitTime(1000); + assertMockEndpointsSatisfied(); + } + + @Test + public void testThrottlingRoutePolicyStartWithAlwaysOpenOnThenClose() throws Exception { + + for (int i = 0; i < size; i++) { + template.sendBody(url, "Message " + i); + Thread.sleep(3); + } + + // gives time for policy half open check to run every second + // and should not close b/c keepOpen is true + Thread.sleep(2000); + + result.expectedMessageCount(0); + result.setResultWaitTime(1500); + assertMockEndpointsSatisfied(); + + // set keepOpen to false + // now half open check will succeed + policy.setKeepOpen(false); + + // gives time for policy half open check to run every second + // and should close and get all the messages + result.expectedMessageCount(5); + result.setResultWaitTime(1500); + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from(url) + .routePolicy(policy) + .log("${body}") + .to("log:foo?groupSize=10") + .to("mock:result"); + } + }; + } + +} diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyOpenViaConfigTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyOpenViaConfigTest.java new file mode 100644 index 0000000..650d08c --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyOpenViaConfigTest.java @@ -0,0 +1,104 @@ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.ThrottlingExceptionRoutePolicy; +import org.junit.Before; +import org.junit.Test; + +public class ThrottlingExceptionRoutePolicyOpenViaConfigTest extends ContextTestSupport { + + private String url = "seda:foo?concurrentConsumers=20"; + private MockEndpoint result; + private int size = 5; + + private ThrottlingExceptionRoutePolicy policy; + + @Override + @Before + public void setUp() throws Exception { + this.createPolicy(); + + super.setUp(); + this.setUseRouteBuilder(true); + result = getMockEndpoint("mock:result"); + context.getShutdownStrategy().setTimeout(1); + } + + protected void createPolicy() { + int threshold = 2; + long failureWindow = 30; + long halfOpenAfter = 1000; + boolean keepOpen = false; + policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null, keepOpen); + } + + @Test + public void testThrottlingRoutePolicyStartWithAlwaysOpenOffThenToggle() throws Exception { + + // send first set of messages + // should go through b/c circuit is closed + for (int i = 0; i < size; i++) { + template.sendBody(url, "MessageRound1 " + i); + Thread.sleep(3); + } + result.expectedMessageCount(size); + result.setResultWaitTime(2000); + assertMockEndpointsSatisfied(); + + // set keepOpen to true + policy.setKeepOpen(true); + + // trigger opening circuit + // by sending another message + template.sendBody(url, "MessageTrigger"); + + // give time for circuit to open + Thread.sleep(1000); + + // send next set of messages + // should NOT go through b/c circuit is open + for (int i = 0; i < size; i++) { + template.sendBody(url, "MessageRound2 " + i); + Thread.sleep(3); + } + + // gives time for policy half open check to run every second + // and should not close b/c keepOpen is true + Thread.sleep(2000); + + result.expectedMessageCount(size + 1); + result.setResultWaitTime(2000); + assertMockEndpointsSatisfied(); + + // set keepOpen to false + policy.setKeepOpen(false); + + // gives time for policy half open check to run every second + // and it should close b/c keepOpen is false + result.expectedMessageCount(size * 2 + 1); + result.setResultWaitTime(2000); + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + int threshold = 2; + long failureWindow = 30; + long halfOpenAfter = 1000; + policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null); + + from(url) + .routePolicy(policy) + .log("${body}") + .to("log:foo?groupSize=10") + .to("mock:result"); + } + }; + } + +} \ No newline at end of file -- To stop receiving notification emails like this one, please contact "commits@camel.apache.org" <commits@camel.apache.org>.