CAMEL-8526: Add more EIP as specialized mbeans

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/85209dca
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/85209dca
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/85209dca

Branch: refs/heads/master
Commit: 85209dca80c4bc06d6ef6a5fc2164e67ef772582
Parents: 9c1a79f
Author: Claus Ibsen <davscl...@apache.org>
Authored: Fri Jul 24 13:48:39 2015 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Jul 24 13:48:39 2015 +0200

----------------------------------------------------------------------
 .../ManagedCircuitBreakerLoadBalancerMBean.java |  42 ++++++++
 .../DefaultManagementObjectStrategy.java        |   4 +
 .../ManagedCircuitBreakerLoadBalancer.java      |  91 ++++++++++++++++
 .../CircuitBreakerLoadBalancer.java             |  35 +++++-
 .../ManagedCircuitBreakerLoadBalancerTest.java  | 106 +++++++++++++++++++
 5 files changed, 276 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/85209dca/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCircuitBreakerLoadBalancerMBean.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCircuitBreakerLoadBalancerMBean.java
 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCircuitBreakerLoadBalancerMBean.java
new file mode 100644
index 0000000..87682db
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCircuitBreakerLoadBalancerMBean.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.api.management.mbean;
+
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedOperation;
+
+public interface ManagedCircuitBreakerLoadBalancerMBean extends 
ManagedProcessorMBean {
+
+    @ManagedAttribute(description = "Number of processors in the load 
balancer")
+    Integer getSize();
+
+    @ManagedAttribute(description = "The timeout in millis to use as threshold 
to move state from closed to half-open or open state")
+    Long getHalfOpenAfter();
+
+    @ManagedAttribute(description = "Number of previous failed messages to use 
as threshold to move state from closed to half-open or open state")
+    Integer getThreshold();
+
+    @ManagedAttribute(description = "The class names of the exceptions the 
load balancer uses (separated by comma)")
+    String getExceptions();
+
+    @ManagedAttribute(description = "The current state of the circuit breaker")
+    String getCircuitBreakerState();
+
+    @ManagedOperation(description = "Dumps the state of the load balancer")
+    String dumpState();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/85209dca/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
 
b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
index 20a0f33..faaaa87 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
@@ -37,6 +37,7 @@ import org.apache.camel.management.mbean.ManagedBeanProcessor;
 import org.apache.camel.management.mbean.ManagedBrowsableEndpoint;
 import org.apache.camel.management.mbean.ManagedCamelContext;
 import org.apache.camel.management.mbean.ManagedChoice;
+import org.apache.camel.management.mbean.ManagedCircuitBreakerLoadBalancer;
 import org.apache.camel.management.mbean.ManagedComponent;
 import org.apache.camel.management.mbean.ManagedConsumer;
 import org.apache.camel.management.mbean.ManagedConvertBody;
@@ -139,6 +140,7 @@ import org.apache.camel.processor.UnmarshalProcessor;
 import org.apache.camel.processor.WireTapProcessor;
 import org.apache.camel.processor.aggregate.AggregateProcessor;
 import org.apache.camel.processor.idempotent.IdempotentConsumer;
+import org.apache.camel.processor.loadbalancer.CircuitBreakerLoadBalancer;
 import org.apache.camel.processor.loadbalancer.FailOverLoadBalancer;
 import org.apache.camel.processor.loadbalancer.RandomLoadBalancer;
 import org.apache.camel.processor.loadbalancer.RoundRobinLoadBalancer;
@@ -297,6 +299,8 @@ public class DefaultManagementObjectStrategy implements 
ManagementObjectStrategy
                 answer = new ManagedMarshal(context, (MarshalProcessor) 
target, (org.apache.camel.model.MarshalDefinition) definition);
             } else if (target instanceof UnmarshalProcessor) {
                 answer = new ManagedUnmarshal(context, (UnmarshalProcessor) 
target, (org.apache.camel.model.UnmarshalDefinition) definition);
+            } else if (target instanceof CircuitBreakerLoadBalancer) {
+                answer = new ManagedCircuitBreakerLoadBalancer(context, 
(CircuitBreakerLoadBalancer) target, 
(org.apache.camel.model.LoadBalanceDefinition) definition);
             } else if (target instanceof FailOverLoadBalancer) {
                 answer = new ManagedFailoverLoadBalancer(context, 
(FailOverLoadBalancer) target, (org.apache.camel.model.LoadBalanceDefinition) 
definition);
             } else if (target instanceof RandomLoadBalancer) {

http://git-wip-us.apache.org/repos/asf/camel/blob/85209dca/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCircuitBreakerLoadBalancer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCircuitBreakerLoadBalancer.java
 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCircuitBreakerLoadBalancer.java
new file mode 100644
index 0000000..89c7078
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCircuitBreakerLoadBalancer.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management.mbean;
+
+import java.util.List;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.api.management.ManagedResource;
+import 
org.apache.camel.api.management.mbean.ManagedCircuitBreakerLoadBalancerMBean;
+import org.apache.camel.model.LoadBalanceDefinition;
+import org.apache.camel.processor.loadbalancer.CircuitBreakerLoadBalancer;
+import org.apache.camel.util.CollectionStringBuffer;
+
+/**
+ * @version 
+ */
+@ManagedResource(description = "Managed CircuitBreaker LoadBalancer")
+public class ManagedCircuitBreakerLoadBalancer extends ManagedProcessor 
implements ManagedCircuitBreakerLoadBalancerMBean {
+    private final CircuitBreakerLoadBalancer processor;
+    private String exceptions;
+
+    public ManagedCircuitBreakerLoadBalancer(CamelContext context, 
CircuitBreakerLoadBalancer processor, LoadBalanceDefinition definition) {
+        super(context, processor, definition);
+        this.processor = processor;
+    }
+
+    @Override
+    public Integer getSize() {
+        return processor.getProcessors().size();
+    }
+
+    @Override
+    public Long getHalfOpenAfter() {
+        return processor.getHalfOpenAfter();
+    }
+
+    @Override
+    public Integer getThreshold() {
+        return processor.getThreshold();
+    }
+
+    @Override
+    public String getExceptions() {
+        if (exceptions != null) {
+            return exceptions;
+        }
+
+        List<Class<?>> classes = processor.getExceptions();
+        if (classes == null || classes.isEmpty()) {
+            exceptions = "";
+        } else {
+            CollectionStringBuffer csb = new CollectionStringBuffer(",");
+            for (Class<?> clazz : classes) {
+                csb.append(clazz.getCanonicalName());
+            }
+            exceptions = csb.toString();
+        }
+        return exceptions;
+    }
+
+    @Override
+    public String getCircuitBreakerState() {
+        int num = processor.getState();
+        if (num == 0) {
+            return "closed";
+        } else if (num == 1) {
+            return "half open";
+        } else {
+            return "open";
+        }
+    }
+
+    @Override
+    public String dumpState() {
+        return processor.dumpState();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/85209dca/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java
 
b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java
index 645b477..39f3efc 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/CircuitBreakerLoadBalancer.java
@@ -54,10 +54,22 @@ public class CircuitBreakerLoadBalancer extends 
LoadBalancerSupport implements T
         this.halfOpenAfter = halfOpenAfter;
     }
 
+    public long getHalfOpenAfter() {
+        return halfOpenAfter;
+    }
+
     public void setThreshold(int threshold) {
         this.threshold = threshold;
     }
 
+    public int getThreshold() {
+        return threshold;
+    }
+
+    public int getState() {
+        return state.get();
+    }
+
     @Override
     public CamelContext getCamelContext() {
         return camelContext;
@@ -115,7 +127,7 @@ public class CircuitBreakerLoadBalancer extends 
LoadBalancerSupport implements T
     }
 
     private boolean calculateState(final Exchange exchange, final 
AsyncCallback callback) {
-        boolean output = false;
+        boolean output;
         if (state.get() == STATE_HALF_OPEN) {
             if (failures.get() == 0) {
                 output = closeCircuit(exchange, callback);
@@ -164,7 +176,26 @@ public class CircuitBreakerLoadBalancer extends 
LoadBalancerSupport implements T
     }
 
     private void logState() {
-        log.debug("State {}, failures {}, closed since {}", new 
Object[]{state.get(), failures.get(), System.currentTimeMillis() - 
lastFailure});
+        if (log.isDebugEnabled()) {
+            log.debug(dumpState());
+        }
+    }
+
+    public String dumpState() {
+        int num = state.get();
+        String state;
+        if (num == 0) {
+            state = "closed";
+        } else if (num == 1) {
+            state = "half open";
+        } else {
+            state = "open";
+        }
+        if (lastFailure > 0) {
+            return String.format("State %s, failures %d, closed since %d", 
state, failures.get(), System.currentTimeMillis() - lastFailure);
+        } else {
+            return String.format("State %s, failures %d", state, 
failures.get());
+        }
     }
 
     private boolean executeProcessor(final Exchange exchange, final 
AsyncCallback callback) {

http://git-wip-us.apache.org/repos/asf/camel/blob/85209dca/camel-core/src/test/java/org/apache/camel/management/ManagedCircuitBreakerLoadBalancerTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/management/ManagedCircuitBreakerLoadBalancerTest.java
 
b/camel-core/src/test/java/org/apache/camel/management/ManagedCircuitBreakerLoadBalancerTest.java
new file mode 100644
index 0000000..d8f06e1
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/management/ManagedCircuitBreakerLoadBalancerTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.openmbean.TabularData;
+
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version 
+ */
+public class ManagedCircuitBreakerLoadBalancerTest extends 
ManagementTestSupport {
+
+    public void testManageCircuitBreakerLoadBalancer() throws Exception {
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        MockEndpoint foo = getMockEndpoint("mock:foo");
+        foo.expectedMessageCount(1);
+
+        template.sendBodyAndHeader("direct:start", "Hello World", "foo", 
"123");
+
+        assertMockEndpointsSatisfied();
+
+        // get the stats for the route
+        MBeanServer mbeanServer = getMBeanServer();
+
+        // get the object name for the delayer
+        ObjectName on = 
ObjectName.getInstance("org.apache.camel:context=camel-1,type=processors,name=\"mysend\"");
+
+        // should be on route1
+        String routeId = (String) mbeanServer.getAttribute(on, "RouteId");
+        assertEquals("route1", routeId);
+
+        String camelId = (String) mbeanServer.getAttribute(on, "CamelId");
+        assertEquals("camel-1", camelId);
+
+        String state = (String) mbeanServer.getAttribute(on, "State");
+        assertEquals(ServiceStatus.Started.name(), state);
+
+        Integer size = (Integer) mbeanServer.getAttribute(on, "Size");
+        assertEquals(1, size.intValue());
+
+        Long half = (Long) mbeanServer.getAttribute(on, "HalfOpenAfter");
+        assertEquals(5000, half.longValue());
+
+        Integer attempts = (Integer) mbeanServer.getAttribute(on, "Threshold");
+        assertEquals(2, attempts.intValue());
+
+        String exceptions = (String) mbeanServer.getAttribute(on, 
"Exceptions");
+        assertEquals("java.io.IOException,java.sql.SQLException", exceptions);
+
+        String cbState = (String) mbeanServer.getAttribute(on, 
"CircuitBreakerState");
+        assertEquals("closed", cbState);
+
+        String dump = (String) mbeanServer.invoke(on, "dumpState", null, null);
+        assertEquals("State closed, failures 0", dump);
+
+        TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new 
Object[]{false}, new String[]{"boolean"});
+        assertNotNull(data);
+        assertEquals(2, data.size());
+
+        data = (TabularData) mbeanServer.invoke(on, "explain", new 
Object[]{true}, new String[]{"boolean"});
+        assertNotNull(data);
+        assertEquals(5, data.size());
+
+        String json = (String) mbeanServer.invoke(on, "informationJson", null, 
null);
+        assertNotNull(json);
+        assertTrue(json.contains("\"description\": \"Balances message 
processing among a number of nodes"));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .loadBalance().circuitBreaker(2, 5000, IOException.class, 
SQLException.class).id("mysend")
+                        .to("mock:foo");
+            }
+        };
+    }
+
+}

Reply via email to