Repository: camel Updated Branches: refs/heads/master 80b6e2477 -> 7563d570e
CAMEL-8526: Add more EIP as specialized mbeans Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/487d7d7b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/487d7d7b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/487d7d7b Branch: refs/heads/master Commit: 487d7d7bc18f7d4a2cd514d8d4c34c6e0eb275b9 Parents: 80b6e24 Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Jul 25 08:55:40 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Jul 25 10:18:16 2015 +0200 ---------------------------------------------------------------------- .../mbean/ManagedFailoverLoadBalancerMBean.java | 3 ++ .../mbean/ManagedFailoverLoadBalancer.java | 26 +++++++++++++++ .../loadbalancer/FailOverLoadBalancer.java | 33 ++++++++++++++++++-- .../ManagedFailoverLoadBalancerTest.java | 13 +++++++- 4 files changed, 72 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/487d7d7b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedFailoverLoadBalancerMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedFailoverLoadBalancerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedFailoverLoadBalancerMBean.java index 2eab3f4..fec3f4e 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedFailoverLoadBalancerMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedFailoverLoadBalancerMBean.java @@ -35,4 +35,7 @@ public interface ManagedFailoverLoadBalancerMBean extends ManagedProcessorMBean @ManagedAttribute(description = "The class names of the exceptions the load balancer uses (separated by comma)") String getExceptions(); + @ManagedAttribute(description = "Processor id of the last known good processor that succeed processing the exchange") + String getLastGoodProcessorId(); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/487d7d7b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java index 2b96229..34c03c7 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java @@ -22,6 +22,7 @@ import org.apache.camel.CamelContext; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.api.management.mbean.ManagedFailoverLoadBalancerMBean; import org.apache.camel.model.LoadBalanceDefinition; +import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.processor.loadbalancer.FailOverLoadBalancer; import org.apache.camel.util.CollectionStringBuffer; @@ -39,6 +40,17 @@ public class ManagedFailoverLoadBalancer extends ManagedProcessor implements Man } @Override + public LoadBalanceDefinition getDefinition() { + return (LoadBalanceDefinition) super.getDefinition(); + } + + @Override + public synchronized void reset() { + super.reset(); + processor.reset(); + } + + @Override public Integer getSize() { return processor.getProcessors().size(); } @@ -76,4 +88,18 @@ public class ManagedFailoverLoadBalancer extends ManagedProcessor implements Man } return exceptions; } + + @Override + public String getLastGoodProcessorId() { + int idx = processor.getLastGoodIndex(); + if (idx != -1) { + LoadBalanceDefinition def = getDefinition(); + ProcessorDefinition<?> output = def.getOutputs().get(idx); + if (output != null) { + return output.getId(); + } + } + return null; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/487d7d7b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java index 76bfa74..5eb5d8b 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java +++ b/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java @@ -48,7 +48,7 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab // stateful counter private final AtomicInteger counter = new AtomicInteger(-1); - private final AtomicInteger lastGoodIndex = new AtomicInteger(); + private final AtomicInteger lastGoodIndex = new AtomicInteger(-1); public FailOverLoadBalancer() { this.exceptions = null; @@ -75,6 +75,10 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab this.camelContext = camelContext; } + public int getLastGoodIndex() { + return lastGoodIndex.get(); + } + public List<Class<?>> getExceptions() { return exceptions; } @@ -158,7 +162,11 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab // get the next processor if (isSticky()) { - index.set(lastGoodIndex.get()); + int idx = lastGoodIndex.get(); + if (idx == -1) { + idx = 0; + } + index.set(idx); } else if (isRoundRobin()) { if (counter.incrementAndGet() >= processors.size()) { counter.set(0); @@ -358,4 +366,25 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab public String getTraceLabel() { return "failover"; } + + public void reset() { + // reset state + lastGoodIndex.set(-1); + counter.set(-1); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + // reset state + reset(); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + // noop + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/487d7d7b/camel-core/src/test/java/org/apache/camel/management/ManagedFailoverLoadBalancerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedFailoverLoadBalancerTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedFailoverLoadBalancerTest.java index c7ee461..5720afd 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedFailoverLoadBalancerTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedFailoverLoadBalancerTest.java @@ -24,6 +24,7 @@ import javax.management.openmbean.TabularData; import org.apache.camel.ServiceStatus; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; /** * @version @@ -36,6 +37,13 @@ public class ManagedFailoverLoadBalancerTest extends ManagementTestSupport { return; } + MockEndpoint foo = getMockEndpoint("mock:foo"); + foo.expectedMessageCount(1); + + template.sendBodyAndHeader("direct:start", "Hello World", "foo", "123"); + + assertMockEndpointsSatisfied(); + // get the stats for the route MBeanServer mbeanServer = getMBeanServer(); @@ -67,6 +75,9 @@ public class ManagedFailoverLoadBalancerTest extends ManagementTestSupport { String exceptions = (String) mbeanServer.getAttribute(on, "Exceptions"); assertEquals("java.io.IOException,java.sql.SQLException", exceptions); + String id = (String) mbeanServer.getAttribute(on, "LastGoodProcessorId"); + assertEquals("foo", id); + TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"}); assertNotNull(data); assertEquals(3, data.size()); @@ -87,7 +98,7 @@ public class ManagedFailoverLoadBalancerTest extends ManagementTestSupport { public void configure() throws Exception { from("direct:start") .loadBalance().failover(3, false, true, true, IOException.class, SQLException.class).id("mysend") - .to("mock:foo", "mock:bar"); + .to("mock:foo").id("foo").to("mock:bar").id("bar"); } }; }