CAMEL-8526: Add more EIP as specialized mbeans
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/85209dca Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/85209dca Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/85209dca Branch: refs/heads/master Commit: 85209dca80c4bc06d6ef6a5fc2164e67ef772582 Parents: 9c1a79f Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Jul 24 13:48:39 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Jul 24 13:48:39 2015 +0200 ---------------------------------------------------------------------- .../ManagedCircuitBreakerLoadBalancerMBean.java | 42 ++++++++ .../DefaultManagementObjectStrategy.java | 4 + .../ManagedCircuitBreakerLoadBalancer.java | 91 ++++++++++++++++ .../CircuitBreakerLoadBalancer.java | 35 +++++- .../ManagedCircuitBreakerLoadBalancerTest.java | 106 +++++++++++++++++++ 5 files changed, 276 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/85209dca/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCircuitBreakerLoadBalancerMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCircuitBreakerLoadBalancerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCircuitBreakerLoadBalancerMBean.java new file mode 100644 index 0000000..87682db --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCircuitBreakerLoadBalancerMBean.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.api.management.mbean; + +import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedOperation; + +public interface ManagedCircuitBreakerLoadBalancerMBean extends ManagedProcessorMBean { + + @ManagedAttribute(description = "Number of processors in the load balancer") + Integer getSize(); + + @ManagedAttribute(description = "The timeout in millis to use as threshold to move state from closed to half-open or open state") + Long getHalfOpenAfter(); + + @ManagedAttribute(description = "Number of previous failed messages to use as threshold to move state from closed to half-open or open state") + Integer getThreshold(); + + @ManagedAttribute(description = "The class names of the exceptions the load balancer uses (separated by comma)") + String getExceptions(); + + @ManagedAttribute(description = "The current state of the circuit breaker") + String getCircuitBreakerState(); + + @ManagedOperation(description = "Dumps the state of the load balancer") + String dumpState(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/85209dca/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java index 20a0f33..faaaa87 100644 --- a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java @@ -37,6 +37,7 @@ import org.apache.camel.management.mbean.ManagedBeanProcessor; import org.apache.camel.management.mbean.ManagedBrowsableEndpoint; import org.apache.camel.management.mbean.ManagedCamelContext; import org.apache.camel.management.mbean.ManagedChoice; +import org.apache.camel.management.mbean.ManagedCircuitBreakerLoadBalancer; import org.apache.camel.management.mbean.ManagedComponent; import org.apache.camel.management.mbean.ManagedConsumer; import org.apache.camel.management.mbean.ManagedConvertBody; @@ -139,6 +140,7 @@ import org.apache.camel.processor.UnmarshalProcessor; import org.apache.camel.processor.WireTapProcessor; import org.apache.camel.processor.aggregate.AggregateProcessor; import org.apache.camel.processor.idempotent.IdempotentConsumer; +import org.apache.camel.processor.loadbalancer.CircuitBreakerLoadBalancer; import org.apache.camel.processor.loadbalancer.FailOverLoadBalancer; import org.apache.camel.processor.loadbalancer.RandomLoadBalancer; import org.apache.camel.processor.loadbalancer.RoundRobinLoadBalancer; @@ -297,6 +299,8 @@ public class DefaultManagementObjectStrategy implements ManagementObjectStrategy answer = new ManagedMarshal(context, (MarshalProcessor) target, (org.apache.camel.model.MarshalDefinition) definition); } else if (target instanceof UnmarshalProcessor) { answer = new ManagedUnmarshal(context, (UnmarshalProcessor) target, (org.apache.camel.model.UnmarshalDefinition) definition); + } else if (target instanceof CircuitBreakerLoadBalancer) { + answer = new ManagedCircuitBreakerLoadBalancer(context, (CircuitBreakerLoadBalancer) target, (org.apache.camel.model.LoadBalanceDefinition) definition); } else if (target instanceof FailOverLoadBalancer) { answer = new ManagedFailoverLoadBalancer(context, (FailOverLoadBalancer) target, (org.apache.camel.model.LoadBalanceDefinition) definition); } else if (target instanceof RandomLoadBalancer) { http://git-wip-us.apache.org/repos/asf/camel/blob/85209dca/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCircuitBreakerLoadBalancer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCircuitBreakerLoadBalancer.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCircuitBreakerLoadBalancer.java new file mode 100644 index 0000000..89c7078 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCircuitBreakerLoadBalancer.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management.mbean; + +import java.util.List; + +import org.apache.camel.CamelContext; +import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.api.management.mbean.ManagedCircuitBreakerLoadBalancerMBean; +import org.apache.camel.model.LoadBalanceDefinition; +import org.apache.camel.processor.loadbalancer.CircuitBreakerLoadBalancer; +import org.apache.camel.util.CollectionStringBuffer; + +/** + * @version + */ +@ManagedResource(description = "Managed CircuitBreaker LoadBalancer") +public class ManagedCircuitBreakerLoadBalancer extends ManagedProcessor implements ManagedCircuitBreakerLoadBalancerMBean { + private final CircuitBreakerLoadBalancer processor; + private String exceptions; + + public ManagedCircuitBreakerLoadBalancer(CamelContext context, CircuitBreakerLoadBalancer processor, LoadBalanceDefinition definition) { + super(context, processor, definition); + this.processor = processor; + } + + @Override + public Integer getSize() { + return processor.getProcessors().size(); + } + + @Override + public Long getHalfOpenAfter() { + return processor.getHalfOpenAfter(); + } + + @Override + public Integer getThreshold() { + return processor.getThreshold(); + } + + @Override + public String getExceptions() { + if (exceptions != null) { + return exceptions; + } + + List<Class<?>> classes = processor.getExceptions(); + if (classes == null || classes.isEmpty()) { + exceptions = ""; + } else { + CollectionStringBuffer csb = new CollectionStringBuffer(","); + for (Class<?> clazz : classes) { + csb.append(clazz.getCanonicalName()); + } + exceptions = csb.toString(); + } + return exceptions; + } + + @Override + public String getCircuitBreakerState() { + int num = processor.getState(); + if (num == 0) { + return "closed"; + } else if (num == 1) { + return "half open"; + } else { + return "open"; + } + } + + @Override + public String dumpState() { + return processor.dumpState(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/85209dca/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java index 645b477..39f3efc 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java +++ b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java @@ -54,10 +54,22 @@ public class CircuitBreakerLoadBalancer extends LoadBalancerSupport implements T this.halfOpenAfter = halfOpenAfter; } + public long getHalfOpenAfter() { + return halfOpenAfter; + } + public void setThreshold(int threshold) { this.threshold = threshold; } + public int getThreshold() { + return threshold; + } + + public int getState() { + return state.get(); + } + @Override public CamelContext getCamelContext() { return camelContext; @@ -115,7 +127,7 @@ public class CircuitBreakerLoadBalancer extends LoadBalancerSupport implements T } private boolean calculateState(final Exchange exchange, final AsyncCallback callback) { - boolean output = false; + boolean output; if (state.get() == STATE_HALF_OPEN) { if (failures.get() == 0) { output = closeCircuit(exchange, callback); @@ -164,7 +176,26 @@ public class CircuitBreakerLoadBalancer extends LoadBalancerSupport implements T } private void logState() { - log.debug("State {}, failures {}, closed since {}", new Object[]{state.get(), failures.get(), System.currentTimeMillis() - lastFailure}); + if (log.isDebugEnabled()) { + log.debug(dumpState()); + } + } + + public String dumpState() { + int num = state.get(); + String state; + if (num == 0) { + state = "closed"; + } else if (num == 1) { + state = "half open"; + } else { + state = "open"; + } + if (lastFailure > 0) { + return String.format("State %s, failures %d, closed since %d", state, failures.get(), System.currentTimeMillis() - lastFailure); + } else { + return String.format("State %s, failures %d", state, failures.get()); + } } private boolean executeProcessor(final Exchange exchange, final AsyncCallback callback) { http://git-wip-us.apache.org/repos/asf/camel/blob/85209dca/camel-core/src/test/java/org/apache/camel/management/ManagedCircuitBreakerLoadBalancerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedCircuitBreakerLoadBalancerTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedCircuitBreakerLoadBalancerTest.java new file mode 100644 index 0000000..d8f06e1 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedCircuitBreakerLoadBalancerTest.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management; + +import java.io.IOException; +import java.sql.SQLException; +import javax.management.MBeanServer; +import javax.management.ObjectName; +import javax.management.openmbean.TabularData; + +import org.apache.camel.ServiceStatus; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * @version + */ +public class ManagedCircuitBreakerLoadBalancerTest extends ManagementTestSupport { + + public void testManageCircuitBreakerLoadBalancer() throws Exception { + // JMX tests dont work well on AIX CI servers (hangs them) + if (isPlatform("aix")) { + return; + } + + MockEndpoint foo = getMockEndpoint("mock:foo"); + foo.expectedMessageCount(1); + + template.sendBodyAndHeader("direct:start", "Hello World", "foo", "123"); + + assertMockEndpointsSatisfied(); + + // get the stats for the route + MBeanServer mbeanServer = getMBeanServer(); + + // get the object name for the delayer + ObjectName on = ObjectName.getInstance("org.apache.camel:context=camel-1,type=processors,name=\"mysend\""); + + // should be on route1 + String routeId = (String) mbeanServer.getAttribute(on, "RouteId"); + assertEquals("route1", routeId); + + String camelId = (String) mbeanServer.getAttribute(on, "CamelId"); + assertEquals("camel-1", camelId); + + String state = (String) mbeanServer.getAttribute(on, "State"); + assertEquals(ServiceStatus.Started.name(), state); + + Integer size = (Integer) mbeanServer.getAttribute(on, "Size"); + assertEquals(1, size.intValue()); + + Long half = (Long) mbeanServer.getAttribute(on, "HalfOpenAfter"); + assertEquals(5000, half.longValue()); + + Integer attempts = (Integer) mbeanServer.getAttribute(on, "Threshold"); + assertEquals(2, attempts.intValue()); + + String exceptions = (String) mbeanServer.getAttribute(on, "Exceptions"); + assertEquals("java.io.IOException,java.sql.SQLException", exceptions); + + String cbState = (String) mbeanServer.getAttribute(on, "CircuitBreakerState"); + assertEquals("closed", cbState); + + String dump = (String) mbeanServer.invoke(on, "dumpState", null, null); + assertEquals("State closed, failures 0", dump); + + TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"}); + assertNotNull(data); + assertEquals(2, data.size()); + + data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{true}, new String[]{"boolean"}); + assertNotNull(data); + assertEquals(5, data.size()); + + String json = (String) mbeanServer.invoke(on, "informationJson", null, null); + assertNotNull(json); + assertTrue(json.contains("\"description\": \"Balances message processing among a number of nodes")); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .loadBalance().circuitBreaker(2, 5000, IOException.class, SQLException.class).id("mysend") + .to("mock:foo"); + } + }; + } + +}