Repository: camel Updated Branches: refs/heads/master a4f82a2c9 -> 0243960c5
Add unit tests for ThrottlingExceptionRoutePolicy and include in LC strategy Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5351baa5 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5351baa5 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5351baa5 Branch: refs/heads/master Commit: 5351baa5b2957a75e74432490ad29641e6f169f7 Parents: 69b716b Author: CodeSmell <mbarlo...@gmail.com> Authored: Wed Jan 18 13:59:23 2017 -0500 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Jan 20 09:44:37 2017 +0100 ---------------------------------------------------------------------- ...agedThrottlingExceptionRoutePolicyMBean.java | 41 ++--- .../impl/ThrottlingExceptionRoutePolicy.java | 4 +- .../DefaultManagementLifecycleStrategy.java | 4 + .../ManagedThrottlingExceptionRoutePolicy.java | 32 ++-- .../management/ManagedRoutePolicyTest.java | 77 --------- ...nagedThrottlingExceptionRoutePolicyTest.java | 169 +++++++++++++++++++ ...anagedThrottlingInflightRoutePolicyTest.java | 77 +++++++++ 7 files changed, 293 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/5351baa5/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlingExceptionRoutePolicyMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlingExceptionRoutePolicyMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlingExceptionRoutePolicyMBean.java index 86e394c..a83b690 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlingExceptionRoutePolicyMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlingExceptionRoutePolicyMBean.java @@ -17,39 +17,40 @@ package org.apache.camel.api.management.mbean; import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedOperation; public interface ManagedThrottlingExceptionRoutePolicyMBean extends ManagedServiceMBean { - @ManagedAttribute(description = "how long to wait before moving open circuit to half open") - long getHalfOpenAfter(); + @ManagedAttribute(description = "How long to wait before moving open circuit to half open") + Long getHalfOpenAfter(); - @ManagedAttribute(description = "how long to wait before moving open circuit to half open") - void setHalfOpenAfter(long milliseconds); + @ManagedAttribute(description = "How long to wait before moving open circuit to half open") + void setHalfOpenAfter(Long milliseconds); - @ManagedAttribute(description = "the range of time that failures should occur within") - long getFailureWindow(); + @ManagedAttribute(description = "The range of time that failures should occur within") + Long getFailureWindow(); - @ManagedAttribute(description = "the range of time that failures should occur within") - void setFailureWindow(long milliseconds); + @ManagedAttribute(description = "The range of time that failures should occur within") + void setFailureWindow(Long milliseconds); - @ManagedAttribute(description = "number of failures before opening circuit") - int getFailureThreshold(); + @ManagedAttribute(description = "Number of failures before opening circuit") + Integer getFailureThreshold(); - @ManagedAttribute(description = "number of failures before opening circuit") - void setFailureThreshold(int numberOfFailures); + @ManagedAttribute(description = "Number of failures before opening circuit") + void setFailureThreshold(Integer numberOfFailures); - @ManagedAttribute(description = "State") + @ManagedOperation(description = "The current state of the circuit") String currentState(); @ManagedAttribute(description = "The half open handler registered (if any)") - String hasHalfOpenHandler(); + String getHalfOpenHandlerName(); - @ManagedAttribute(description = "the number of failures caught") - int currentFailures(); + @ManagedAttribute(description = "The number of failures caught") + Integer getCurrentFailures(); - @ManagedAttribute(description = "number of ms since the last failure was recorded") - long getLastFailure(); + @ManagedAttribute(description = "Number of ms since the last failure was recorded") + Long getLastFailure(); - @ManagedAttribute(description = "number ms since the circuit was opened") - long getOpenAt(); + @ManagedAttribute(description = "Number ms since the circuit was opened") + Long getOpenAt(); } http://git-wip-us.apache.org/repos/asf/camel/blob/5351baa5/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java index 34b755a..e23e1bc 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java @@ -257,9 +257,9 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement int num = state.get(); String routeState = stateAsString(num); if (failures.get() > 0) { - return String.format("*** State %s, failures %d, last failure %d ms ago", routeState, failures.get(), System.currentTimeMillis() - lastFailure); + return String.format("State %s, failures %d, last failure %d ms ago", routeState, failures.get(), System.currentTimeMillis() - lastFailure); } else { - return String.format("*** State %s, failures %d", routeState, failures.get()); + return String.format("State %s, failures %d", routeState, failures.get()); } } http://git-wip-us.apache.org/repos/asf/camel/blob/5351baa5/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java index 6a2b58d..7c3784e 100644 --- a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java @@ -52,6 +52,7 @@ import org.apache.camel.impl.DefaultEndpointRegistry; import org.apache.camel.impl.DefaultTransformerRegistry; import org.apache.camel.impl.EventDrivenConsumerRoute; import org.apache.camel.impl.ProducerCache; +import org.apache.camel.impl.ThrottlingExceptionRoutePolicy; import org.apache.camel.impl.ThrottlingInflightRoutePolicy; import org.apache.camel.management.mbean.ManagedAsyncProcessorAwaitManager; import org.apache.camel.management.mbean.ManagedBacklogDebugger; @@ -67,6 +68,7 @@ import org.apache.camel.management.mbean.ManagedRoute; import org.apache.camel.management.mbean.ManagedRuntimeEndpointRegistry; import org.apache.camel.management.mbean.ManagedService; import org.apache.camel.management.mbean.ManagedStreamCachingStrategy; +import org.apache.camel.management.mbean.ManagedThrottlingExceptionRoutePolicy; import org.apache.camel.management.mbean.ManagedThrottlingInflightRoutePolicy; import org.apache.camel.management.mbean.ManagedTracer; import org.apache.camel.management.mbean.ManagedTransformerRegistry; @@ -469,6 +471,8 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement return getManagedObjectForProcessor(context, (Processor) service, route); } else if (service instanceof ThrottlingInflightRoutePolicy) { answer = new ManagedThrottlingInflightRoutePolicy(context, (ThrottlingInflightRoutePolicy) service); + } else if (service instanceof ThrottlingExceptionRoutePolicy) { + answer = new ManagedThrottlingExceptionRoutePolicy(context, (ThrottlingExceptionRoutePolicy) service); } else if (service instanceof ConsumerCache) { answer = new ManagedConsumerCache(context, (ConsumerCache) service); } else if (service instanceof ProducerCache) { http://git-wip-us.apache.org/repos/asf/camel/blob/5351baa5/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingExceptionRoutePolicy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingExceptionRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingExceptionRoutePolicy.java index bd2af1f..8030eae 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingExceptionRoutePolicy.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingExceptionRoutePolicy.java @@ -37,32 +37,32 @@ public class ManagedThrottlingExceptionRoutePolicy extends ManagedService implem } @Override - public long getHalfOpenAfter() { + public Long getHalfOpenAfter() { return getPolicy().getHalfOpenAfter(); } @Override - public void setHalfOpenAfter(long milliseconds) { + public void setHalfOpenAfter(Long milliseconds) { getPolicy().setHalfOpenAfter(milliseconds); } @Override - public long getFailureWindow() { + public Long getFailureWindow() { return getPolicy().getFailureWindow(); } @Override - public void setFailureWindow(long milliseconds) { + public void setFailureWindow(Long milliseconds) { getPolicy().setFailureWindow(milliseconds); } @Override - public int getFailureThreshold() { + public Integer getFailureThreshold() { return getPolicy().getFailureThreshold(); } @Override - public void setFailureThreshold(int numberOfFailures) { + public void setFailureThreshold(Integer numberOfFailures) { getPolicy().setFailureThreshold(numberOfFailures); } @@ -72,7 +72,7 @@ public class ManagedThrottlingExceptionRoutePolicy extends ManagedService implem } @Override - public String hasHalfOpenHandler() { + public String getHalfOpenHandlerName() { ThrottlingExceptionHalfOpenHandler obj = getPolicy().getHalfOpenHandler(); if (obj != null) { return obj.getClass().getSimpleName(); @@ -82,18 +82,26 @@ public class ManagedThrottlingExceptionRoutePolicy extends ManagedService implem } @Override - public int currentFailures() { + public Integer getCurrentFailures() { return getPolicy().getFailures(); } @Override - public long getLastFailure() { - return System.currentTimeMillis() - getPolicy().getLastFailure(); + public Long getLastFailure() { + if (getPolicy().getLastFailure() == 0) { + return 0L; + } else { + return System.currentTimeMillis() - getPolicy().getLastFailure(); + } } @Override - public long getOpenAt() { - return System.currentTimeMillis() - getPolicy().getOpenedAt(); + public Long getOpenAt() { + if (getPolicy().getOpenedAt() == 0) { + return 0L; + } else { + return System.currentTimeMillis() - getPolicy().getOpenedAt(); + } } } http://git-wip-us.apache.org/repos/asf/camel/blob/5351baa5/camel-core/src/test/java/org/apache/camel/management/ManagedRoutePolicyTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedRoutePolicyTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedRoutePolicyTest.java deleted file mode 100644 index 62e1987..0000000 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedRoutePolicyTest.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.management; - -import java.util.Set; -import javax.management.MBeanServer; -import javax.management.ObjectName; - -import org.apache.camel.ServiceStatus; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.impl.ThrottlingInflightRoutePolicy; - -/** - * @version - */ -public class ManagedRoutePolicyTest extends ManagementTestSupport { - - public void testRoutes() throws Exception { - // JMX tests dont work well on AIX CI servers (hangs them) - if (isPlatform("aix")) { - return; - } - - MBeanServer mbeanServer = getMBeanServer(); - - Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=routes,*"), null); - assertEquals(1, set.size()); - - ObjectName on = set.iterator().next(); - - boolean registered = mbeanServer.isRegistered(on); - assertEquals("Should be registered", true, registered); - - String uri = (String) mbeanServer.getAttribute(on, "EndpointUri"); - // the route has this starting endpoint uri - assertEquals("direct://start", uri); - - Integer val = (Integer) mbeanServer.getAttribute(on, "InflightExchanges"); - // the route has no inflight exchanges - assertEquals(0, val.intValue()); - - // should be started - String state = (String) mbeanServer.getAttribute(on, "State"); - assertEquals("Should be started", ServiceStatus.Started.name(), state); - - // should have route policy - String policy = (String) mbeanServer.getAttribute(on, "RoutePolicyList"); - assertNotNull(policy); - assertTrue("Should be a throttling, was: " + policy, policy.startsWith("ThrottlingInflightRoutePolicy")); - } - - @Override - protected RouteBuilder createRouteBuilder() throws Exception { - return new RouteBuilder() { - @Override - public void configure() throws Exception { - from("direct:start").routePolicy(new ThrottlingInflightRoutePolicy()) - .to("log:foo").to("mock:result"); - } - }; - } - -} http://git-wip-us.apache.org/repos/asf/camel/blob/5351baa5/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingExceptionRoutePolicyTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingExceptionRoutePolicyTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingExceptionRoutePolicyTest.java new file mode 100644 index 0000000..0d41938 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingExceptionRoutePolicyTest.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management; + +import java.util.Set; + +import javax.management.JMX; +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.ServiceStatus; +import org.apache.camel.api.management.mbean.ManagedThrottlingExceptionRoutePolicyMBean; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.ThrottlingExceptionHalfOpenHandler; +import org.apache.camel.impl.ThrottlingExceptionRoutePolicy; +import org.junit.Test; + +public class ManagedThrottlingExceptionRoutePolicyTest extends ManagementTestSupport { + + @Test + public void testRoutes() throws Exception { + // JMX tests dont work well on AIX CI servers (hangs them) + if (isPlatform("aix")) { + return; + } + + MBeanServer mbeanServer = getMBeanServer(); + + // get the Camel route + Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=routes,*"), null); + assertEquals(1, set.size()); + ObjectName on = set.iterator().next(); + boolean registered = mbeanServer.isRegistered(on); + assertEquals("Should be registered", true, registered); + + // check the starting endpoint uri + String uri = (String) mbeanServer.getAttribute(on, "EndpointUri"); + assertEquals("direct://start", uri); + + // should be started + String state = (String) mbeanServer.getAttribute(on, "State"); + assertEquals(ServiceStatus.Started.name(), state); + + // should have ThrottlingExceptionRoutePolicy route policy + String policy = (String) mbeanServer.getAttribute(on, "RoutePolicyList"); + assertNotNull(policy); + assertTrue(policy.startsWith("ThrottlingExceptionRoutePolicy")); + + // get the RoutePolicy + String mbeanName = String.format("org.apache.camel:context=camel-1,name=%s,type=services", policy); + set = mbeanServer.queryNames(new ObjectName(mbeanName), null); + assertEquals(1, set.size()); + on = set.iterator().next(); + assertTrue(mbeanServer.isRegistered(on)); + + // the route has no failures + String myType = (String) mbeanServer.getAttribute(on, "ServiceType"); + assertEquals("ThrottlingExceptionRoutePolicy", myType); + + ManagedThrottlingExceptionRoutePolicyMBean proxy = JMX.newMBeanProxy(mbeanServer, on, ManagedThrottlingExceptionRoutePolicyMBean.class); + assertNotNull(proxy); + + // state should be closed w/ no failures + String myState = proxy.currentState(); + assertEquals("State closed, failures 0", myState); + + // the route has no failures + Integer val = proxy.getCurrentFailures(); + assertEquals(0, val.intValue()); + + // the route has no failures + Long lastFail = proxy.getLastFailure(); + assertEquals(0L, lastFail.longValue()); + + // the route is closed + Long openAt = proxy.getOpenAt(); + assertEquals(0L, openAt.longValue()); + + // the route has a handler + String handlerClass = proxy.getHalfOpenHandlerName(); + assertEquals("DummyHandler", handlerClass); + + // values set during construction of class + Integer threshold = proxy.getFailureThreshold(); + assertEquals(10, threshold.intValue()); + + Long window = proxy.getFailureWindow(); + assertEquals(1000L, window.longValue()); + + Long halfOpenAfter = proxy.getHalfOpenAfter(); + assertEquals(5000L, halfOpenAfter.longValue()); + + // change value + proxy.setHalfOpenAfter(10000L); + halfOpenAfter = proxy.getHalfOpenAfter(); + assertEquals(10000L, halfOpenAfter.longValue()); + + try { + getMockEndpoint("mock:result").expectedMessageCount(0); + template.sendBody("direct:start", "Hello World"); + assertMockEndpointsSatisfied(); + } catch (Exception e) { + // expected + } + + // state should be closed w/ no failures + myState = proxy.currentState(); + assertTrue(myState.contains("State closed, failures 1, last failure")); + + // the route has no failures + val = proxy.getCurrentFailures(); + assertEquals(1, val.intValue()); + + // the route has no failures + lastFail = proxy.getLastFailure(); + assertTrue(lastFail.longValue() > 0); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + ThrottlingExceptionRoutePolicy policy = new ThrottlingExceptionRoutePolicy(10, 1000, 5000, null); + policy.setHalfOpenHandler(new DummyHandler()); + + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").routeId("testRoute") + .routePolicy(policy) + .to("log:foo") + .process(new BoomProcess()) + .to("mock:result"); + } + }; + } + + class BoomProcess implements Processor { + + @Override + public void process(Exchange exchange) throws Exception { + throw new RuntimeException("boom!"); + } + + } + + class DummyHandler implements ThrottlingExceptionHalfOpenHandler { + + @Override + public boolean isReadyToBeClosed() { + return false; + } + + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/5351baa5/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingInflightRoutePolicyTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingInflightRoutePolicyTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingInflightRoutePolicyTest.java new file mode 100644 index 0000000..bab3a3c --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingInflightRoutePolicyTest.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management; + +import java.util.Set; +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.camel.ServiceStatus; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.ThrottlingInflightRoutePolicy; + +/** + * @version + */ +public class ManagedThrottlingInflightRoutePolicyTest extends ManagementTestSupport { + + public void testRoutes() throws Exception { + // JMX tests dont work well on AIX CI servers (hangs them) + if (isPlatform("aix")) { + return; + } + + MBeanServer mbeanServer = getMBeanServer(); + + Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=routes,*"), null); + assertEquals(1, set.size()); + + ObjectName on = set.iterator().next(); + + boolean registered = mbeanServer.isRegistered(on); + assertEquals("Should be registered", true, registered); + + String uri = (String) mbeanServer.getAttribute(on, "EndpointUri"); + // the route has this starting endpoint uri + assertEquals("direct://start", uri); + + Integer val = (Integer) mbeanServer.getAttribute(on, "InflightExchanges"); + // the route has no inflight exchanges + assertEquals(0, val.intValue()); + + // should be started + String state = (String) mbeanServer.getAttribute(on, "State"); + assertEquals("Should be started", ServiceStatus.Started.name(), state); + + // should have route policy + String policy = (String) mbeanServer.getAttribute(on, "RoutePolicyList"); + assertNotNull(policy); + assertTrue("Should be a throttling, was: " + policy, policy.startsWith("ThrottlingInflightRoutePolicy")); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").routePolicy(new ThrottlingInflightRoutePolicy()) + .to("log:foo").to("mock:result"); + } + }; + } + +}