CAMEL-6670: Added current throttled number of exchanges to throttler eip jmx. Thanks to Christian Posta for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c698df6e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c698df6e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c698df6e Branch: refs/heads/master Commit: c698df6e7a679e8e391baa75f2ac63523e740bb6 Parents: 3506c3e Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Oct 11 15:08:28 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Oct 11 15:08:28 2013 +0200 ---------------------------------------------------------------------- .../management/mbean/ManagedThrottlerMBean.java | 3 + .../management/mbean/ManagedThrottler.java | 4 + .../camel/processor/DelayProcessorSupport.java | 20 ++ .../org/apache/camel/processor/Throttler.java | 11 +- .../camel/management/ManagedThrottlerTest.java | 275 ++++++++++++++++++- 5 files changed, 304 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c698df6e/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java index a0dc3a0..feec600 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java @@ -32,4 +32,7 @@ public interface ManagedThrottlerMBean extends ManagedProcessorMBean { @ManagedAttribute(description = "Time period in millis") void setTimePeriodMillis(long timePeriodMillis); + @ManagedAttribute(description = "Number of exchanges currently throttled") + int getThrottledCount(); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/c698df6e/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java index f80dfa1..99a5e95 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java @@ -55,4 +55,8 @@ public class ManagedThrottler extends ManagedProcessor implements ManagedThrottl public void setTimePeriodMillis(long timePeriodMillis) { getThrottler().setTimePeriodMillis(timePeriodMillis); } + + public int getThrottledCount() { + return getThrottler().getDelayedCount(); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/c698df6e/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java b/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java index ef69759..ff81170 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java +++ b/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java @@ -19,6 +19,7 @@ package org.apache.camel.processor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; @@ -43,6 +44,7 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor { private final boolean shutdownExecutorService; private boolean asyncDelayed; private boolean callerRunsWhenRejected = true; + private final AtomicInteger delayedCount = new AtomicInteger(0); // TODO: Add option to cancel tasks on shutdown so we can stop fast @@ -56,6 +58,9 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor { } public void run() { + // we are running now so decrement the counter + delayedCount.decrementAndGet(); + log.trace("Delayed task woke up and continues routing for exchangeId: {}", exchange.getExchangeId()); if (!isRunAllowed()) { exchange.setException(new RejectedExecutionException("Run is not allowed")); @@ -123,6 +128,8 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor { } } else { // asynchronous delay so schedule a process call task + // and increment the counter (we decrement the counter when we run the ProcessCall) + delayedCount.incrementAndGet(); ProcessCall call = new ProcessCall(exchange, callback); try { log.trace("Scheduling delayed task to run in {} millis for exchangeId: {}", @@ -131,6 +138,8 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor { // tell Camel routing engine we continue routing asynchronous return false; } catch (RejectedExecutionException e) { + // we were not allowed to run the ProcessCall, so need to decrement the counter here + delayedCount.decrementAndGet(); if (isCallerRunsWhenRejected()) { if (!isRunAllowed()) { exchange.setException(new RejectedExecutionException()); @@ -174,6 +183,13 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor { protected abstract long calculateDelay(Exchange exchange); /** + * Gets the current number of {@link Exchange}s being delayed (hold back due throttle limit hit) + */ + public int getDelayedCount() { + return delayedCount.get(); + } + + /** * Delays the given time before continuing. * <p/> * This implementation will block while waiting @@ -191,9 +207,13 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor { return; } else { try { + // keep track on delayer counter while we sleep + delayedCount.incrementAndGet(); sleep(delay); } catch (InterruptedException e) { handleSleepInterruptedException(e, exchange); + } finally { + delayedCount.decrementAndGet(); } } } http://git-wip-us.apache.org/repos/asf/camel/blob/c698df6e/camel-core/src/main/java/org/apache/camel/processor/Throttler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java index 52989a4..ae6bc26 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java +++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java @@ -17,6 +17,7 @@ package org.apache.camel.processor; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicLong; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; @@ -40,7 +41,7 @@ import org.apache.camel.util.ObjectHelper; public class Throttler extends DelayProcessorSupport implements Traceable { private volatile long maximumRequestsPerPeriod; private Expression maxRequestsPerPeriodExpression; - private long timePeriodMillis = 1000; + private AtomicLong timePeriodMillis = new AtomicLong(1000); private volatile TimeSlot slot; public Throttler(CamelContext camelContext, Processor processor, Expression maxRequestsPerPeriodExpression, long timePeriodMillis, @@ -53,7 +54,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable { if (timePeriodMillis <= 0) { throw new IllegalArgumentException("TimePeriodMillis should be a positive number, was: " + timePeriodMillis); } - this.timePeriodMillis = timePeriodMillis; + this.timePeriodMillis.set(timePeriodMillis); } @Override @@ -81,7 +82,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable { } public long getTimePeriodMillis() { - return timePeriodMillis; + return timePeriodMillis.get(); } /** @@ -95,7 +96,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable { * Sets the time period during which the maximum number of requests apply */ public void setTimePeriodMillis(long timePeriodMillis) { - this.timePeriodMillis = timePeriodMillis; + this.timePeriodMillis.set(timePeriodMillis); } // Implementation methods @@ -151,7 +152,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable { protected class TimeSlot { private volatile long capacity = Throttler.this.maximumRequestsPerPeriod; - private final long duration = Throttler.this.timePeriodMillis; + private final long duration = Throttler.this.timePeriodMillis.get(); private final long startTime; protected TimeSlot() { http://git-wip-us.apache.org/repos/asf/camel/blob/c698df6e/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java index 2f90dfe..feb3e1c 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java @@ -16,14 +16,23 @@ */ package org.apache.camel.management; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.management.Attribute; import javax.management.MBeanServer; import javax.management.ObjectName; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.NotifyBuilder; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; /** - * @version + * @version */ public class ManagedThrottlerTest extends ManagementTestSupport { @@ -95,15 +104,273 @@ public class ManagedThrottlerTest extends ManagementTestSupport { assertTrue("Should be around 5 sec now: was " + total, total > 3500); } + public void testThrottleVisableViaJmx() throws Exception { + // JMX tests dont work well on AIX CI servers (hangs them) + if (isPlatform("aix")) { + return; + } + + // get the stats for the route + MBeanServer mbeanServer = getMBeanServer(); + // get the object name for the delayer + ObjectName throttlerName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"mythrottler2\""); + + // use route to get the total time + ObjectName routeName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"route2\""); + + // reset the counters + mbeanServer.invoke(routeName, "reset", null, null); + + getMockEndpoint("mock:end").expectedMessageCount(10); + + NotifyBuilder notifier = new NotifyBuilder(context). + from("seda:throttleCount").whenReceived(5).create(); + + for (int i = 0; i < 10; i++) { + template.sendBody("seda:throttleCount", "Message " + i); + } + + assertTrue(notifier.matches(2, TimeUnit.SECONDS)); + Integer throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount"); + + // we are expecting this to be > 0 + assertTrue(throttledMessages.intValue() > 0); + + assertMockEndpointsSatisfied(); + + throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount"); + assertEquals("Should not be any throttled messages left, found: " + throttledMessages, (Integer) 0, throttledMessages); + + Long completed = (Long) mbeanServer.getAttribute(routeName, "ExchangesCompleted"); + assertEquals(10, completed.longValue()); + + } + + public void testThrottleAsyncVisableViaJmx() throws Exception { + // JMX tests dont work well on AIX CI servers (hangs them) + if (isPlatform("aix")) { + return; + } + + // get the stats for the route + MBeanServer mbeanServer = getMBeanServer(); + // get the object name for the delayer + ObjectName throttlerName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"mythrottler3\""); + + // use route to get the total time + ObjectName routeName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"route3\""); + + // reset the counters + mbeanServer.invoke(routeName, "reset", null, null); + + getMockEndpoint("mock:endAsync").expectedMessageCount(10); + + // we pick '5' because we are right in the middle of the number of messages + // that have been and reduces any race conditions to minimal... + NotifyBuilder notifier = new NotifyBuilder(context). + from("seda:throttleCountAsync").whenReceived(5).create(); + + for (int i = 0; i < 10; i++) { + template.sendBody("seda:throttleCountAsync", "Message " + i); + } + + assertTrue(notifier.matches(2, TimeUnit.SECONDS)); + Integer throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount"); + + // we are expecting this to be > 0 + assertTrue(throttledMessages.intValue() > 0); + + assertMockEndpointsSatisfied(); + + throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount"); + assertEquals("Should not be any throttled messages left, found: " + throttledMessages, (Integer)0, throttledMessages); + + Long completed = (Long) mbeanServer.getAttribute(routeName, "ExchangesCompleted"); + assertEquals(10, completed.longValue()); + + } + + public void testThrottleAsyncExceptionVisableViaJmx() throws Exception { + // JMX tests dont work well on AIX CI servers (hangs them) + if (isPlatform("aix")) { + return; + } + + // get the stats for the route + MBeanServer mbeanServer = getMBeanServer(); + // get the object name for the delayer + ObjectName throttlerName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"mythrottler4\""); + + // use route to get the total time + ObjectName routeName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"route4\""); + + // reset the counters + mbeanServer.invoke(routeName, "reset", null, null); + + getMockEndpoint("mock:endAsyncException").expectedMessageCount(10); + + NotifyBuilder notifier = new NotifyBuilder(context). + from("seda:throttleCountAsyncException").whenReceived(5).create(); + + for (int i = 0; i < 10; i++) { + template.sendBody("seda:throttleCountAsyncException", "Message " + i); + } + + assertTrue(notifier.matches(2, TimeUnit.SECONDS)); + Integer throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount"); + + // we are expecting this to be > 0 + assertTrue(throttledMessages.intValue() > 0); + + assertMockEndpointsSatisfied(); + + // give a sec for exception handling to finish.. + Thread.sleep(500); + + throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount"); + assertEquals("Should not be any throttled messages left, found: " + throttledMessages, (Integer)0, throttledMessages); + + // since all exchanges ended w/ exception, they are not completed + Long completed = (Long) mbeanServer.getAttribute(routeName, "ExchangesCompleted"); + assertEquals(0, completed.longValue()); + + } + + public void testRejectedExecution() throws Exception { + // when delaying async, we can possibly fill up the execution queue + //. which would through a RejectedExecutionException.. we need to make + // sure that the delayedCount/throttledCount doesn't leak + + // JMX tests dont work well on AIX CI servers (hangs them) + if (isPlatform("aix")) { + return; + } + + // get the stats for the route + MBeanServer mbeanServer = getMBeanServer(); + // get the object name for the delayer + ObjectName throttlerName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"mythrottler2\""); + + // use route to get the total time + ObjectName routeName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"route2\""); + + // reset the counters + mbeanServer.invoke(routeName, "reset", null, null); + + MockEndpoint mock = getMockEndpoint("mock:endAsyncReject"); + // only one message (the first one) should get through because the rest should get delayed + mock.expectedMessageCount(1); + + MockEndpoint exceptionMock = getMockEndpoint("mock:rejectedExceptionEndpoint1"); + exceptionMock.expectedMessageCount(9); + + + for (int i = 0; i < 10; i++) { + template.sendBody("seda:throttleCountRejectExecution", "Message " + i); + } + + assertMockEndpointsSatisfied(); + + // we shouldn't have ane leaked throttler counts + Integer throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount"); + assertEquals("Should not be any throttled messages left, found: " + throttledMessages, (Integer) 0, throttledMessages); + + } + + public void testRejectedExecutionCallerRuns() throws Exception { + // when delaying async, we can possibly fill up the execution queue + //. which would through a RejectedExecutionException.. we need to make + // sure that the delayedCount/throttledCount doesn't leak + + // JMX tests dont work well on AIX CI servers (hangs them) + if (isPlatform("aix")) { + return; + } + + // get the stats for the route + MBeanServer mbeanServer = getMBeanServer(); + // get the object name for the delayer + ObjectName throttlerName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"mythrottler2\""); + + // use route to get the total time + ObjectName routeName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"route2\""); + + // reset the counters + mbeanServer.invoke(routeName, "reset", null, null); + + MockEndpoint mock = getMockEndpoint("mock:endAsyncRejectCallerRuns"); + // only one message (the first one) should get through because the rest should get delayed + mock.expectedMessageCount(10); + + MockEndpoint exceptionMock = getMockEndpoint("mock:rejectedExceptionEndpoint"); + exceptionMock.expectedMessageCount(0); + + + for (int i = 0; i < 10; i++) { + template.sendBody("seda:throttleCountRejectExecutionCallerRuns", "Message " + i); + } + + assertMockEndpointsSatisfied(); + + // we shouldn't have ane leaked throttler counts + Integer throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount"); + assertEquals("Should not be any throttled messages left, found: " + throttledMessages, (Integer) 0, throttledMessages); + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { + final ScheduledExecutorService badService = new ScheduledThreadPoolExecutor(1) { + @Override + public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { + throw new RejectedExecutionException(); + } + }; + return new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start") - .to("log:foo") - .throttle(10).id("mythrottler") - .to("mock:result"); + .to("log:foo") + .throttle(10).id("mythrottler") + .to("mock:result"); + + from("seda:throttleCount") + .throttle(1).timePeriodMillis(250).id("mythrottler2") + .to("mock:end"); + + from("seda:throttleCountAsync") + .throttle(1).asyncDelayed().timePeriodMillis(250).id("mythrottler3") + .to("mock:endAsync"); + + from("seda:throttleCountAsyncException") + .throttle(1).asyncDelayed().timePeriodMillis(250).id("mythrottler4") + .to("mock:endAsyncException") + .process(new Processor() { + + @Override + public void process(Exchange exchange) throws Exception { + throw new RuntimeException("Fail me"); + } + }); + from("seda:throttleCountRejectExecutionCallerRuns") + .onException(RejectedExecutionException.class).to("mock:rejectedExceptionEndpoint1").end() + .throttle(1) + .timePeriodMillis(250) + .asyncDelayed() + .executorService(badService) + .callerRunsWhenRejected(true) + .id("mythrottler5") + .to("mock:endAsyncRejectCallerRuns"); + + from("seda:throttleCountRejectExecution") + .onException(RejectedExecutionException.class).to("mock:rejectedExceptionEndpoint1").end() + .throttle(1) + .timePeriodMillis(250) + .asyncDelayed() + .executorService(badService) + .callerRunsWhenRejected(false) + .id("mythrottler6") + .to("mock:endAsyncReject"); } }; }