CAMEL-7491 Added an option in throttler to throw RejectExecutionException instead of delaying the exchange
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/86797a34 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/86797a34 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/86797a34 Branch: refs/heads/master Commit: 86797a341f82be858c93ac2778f0d31195e12c25 Parents: 20e6af8 Author: Willem Jiang <willem.ji...@gmail.com> Authored: Tue Jun 10 14:46:34 2014 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Tue Jun 10 14:46:34 2014 +0800 ---------------------------------------------------------------------- .../apache/camel/model/ThrottleDefinition.java | 29 ++++++++++- .../camel/processor/DelayProcessorSupport.java | 52 +++++++++++--------- .../org/apache/camel/processor/Throttler.java | 25 +++++++++- .../apache/camel/processor/ThrottlerTest.java | 34 +++++++++++-- 4 files changed, 109 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/86797a34/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java index 191397a..b829052 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java @@ -51,6 +51,8 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic private Boolean asyncDelayed; @XmlAttribute private Boolean callerRunsWhenRejected; + @XmlAttribute + private Boolean rejectExecution; public ThrottleDefinition() { } @@ -84,7 +86,7 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isAsyncDelayed()); ScheduledExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Throttle", this, isAsyncDelayed()); - + // should be default 1000 millis long period = getTimePeriodMillis() != null ? getTimePeriodMillis() : 1000L; @@ -94,7 +96,7 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic throw new IllegalArgumentException("MaxRequestsPerPeriod expression must be provided on " + this); } - Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool); + Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool, isRejectExecution()); if (getAsyncDelayed() != null) { answer.setAsyncDelayed(getAsyncDelayed()); @@ -164,6 +166,19 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic setAsyncDelayed(true); return this; } + + /** + * Whether or not throttler throws the RejectExceutionException when the exchange exceeds the request limit + * <p/> + * Is by default <tt>false</tt> + * + * @param throw the RejectExecutionException if the exchange exceeds the request limit + * @return the builder + */ + public ThrottleDefinition rejectExecution(boolean rejectExecution) { + setRejectExecution(rejectExecution); + return this; + } public ThrottleDefinition executorService(ExecutorService executorService) { setExecutorService(executorService); @@ -174,6 +189,8 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic setExecutorServiceRef(executorServiceRef); return this; } + + // Properties // ------------------------------------------------------------------------- @@ -221,4 +238,12 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic public void setExecutorServiceRef(String executorServiceRef) { this.executorServiceRef = executorServiceRef; } + + public boolean isRejectExecution() { + return rejectExecution != null ? rejectExecution : false; + } + + public void setRejectExecution(Boolean rejectExecution) { + this.rejectExecution = rejectExecution; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/86797a34/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java b/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java index ff81170..05ba626 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java +++ b/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java @@ -90,30 +90,8 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor { this.executorService = executorService; this.shutdownExecutorService = shutdownExecutorService; } - - @Override - public boolean process(Exchange exchange, AsyncCallback callback) { - if (!isRunAllowed()) { - exchange.setException(new RejectedExecutionException("Run is not allowed")); - callback.done(true); - return true; - } - - // calculate delay and wait - long delay; - try { - delay = calculateDelay(exchange); - if (delay <= 0) { - // no delay then continue routing - log.trace("No delay for exchangeId: {}", exchange.getExchangeId()); - return processor.process(exchange, callback); - } - } catch (Throwable e) { - exchange.setException(e); - callback.done(true); - return true; - } - + + protected boolean processDelay(Exchange exchange, AsyncCallback callback, long delay) { if (!isAsyncDelayed() || exchange.isTransacted()) { // use synchronous delay (also required if using transactions) try { @@ -164,6 +142,32 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor { } } + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + if (!isRunAllowed()) { + exchange.setException(new RejectedExecutionException("Run is not allowed")); + callback.done(true); + return true; + } + + // calculate delay and wait + long delay; + try { + delay = calculateDelay(exchange); + if (delay <= 0) { + // no delay then continue routing + log.trace("No delay for exchangeId: {}", exchange.getExchangeId()); + return processor.process(exchange, callback); + } + } catch (Throwable e) { + exchange.setException(e); + callback.done(true); + return true; + } + + return processDelay(exchange, callback, delay); + } + public boolean isAsyncDelayed() { return asyncDelayed; } http://git-wip-us.apache.org/repos/asf/camel/blob/86797a34/camel-core/src/main/java/org/apache/camel/processor/Throttler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java index 6b51a2c..a48f6a5 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java +++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java @@ -16,9 +16,11 @@ */ package org.apache.camel.processor; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicLong; +import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.Expression; @@ -43,10 +45,12 @@ public class Throttler extends DelayProcessorSupport implements Traceable { private Expression maxRequestsPerPeriodExpression; private AtomicLong timePeriodMillis = new AtomicLong(1000); private volatile TimeSlot slot; + private boolean rejectExecution; public Throttler(CamelContext camelContext, Processor processor, Expression maxRequestsPerPeriodExpression, long timePeriodMillis, - ScheduledExecutorService executorService, boolean shutdownExecutorService) { + ScheduledExecutorService executorService, boolean shutdownExecutorService, boolean rejectExecution) { super(camelContext, processor, executorService, shutdownExecutorService); + this.rejectExecution = rejectExecution; ObjectHelper.notNull(maxRequestsPerPeriodExpression, "maxRequestsPerPeriodExpression"); this.maxRequestsPerPeriodExpression = maxRequestsPerPeriodExpression; @@ -196,4 +200,23 @@ public class Throttler extends DelayProcessorSupport implements Traceable { TimeSlot getSlot() { return this.slot; } + + public boolean isRejectExecution() { + return rejectExecution; + } + + public void setRejectExecution(boolean rejectExecution) { + this.rejectExecution = rejectExecution; + } + + @Override + protected boolean processDelay(Exchange exchange, AsyncCallback callback, long delay) { + if (isRejectExecution() && delay > 0) { + exchange.setException(new RejectedExecutionException("Exceed the max request limit!")); + callback.done(true); + return true; + } else { + return super.processDelay(exchange, callback, delay); + } + } } http://git-wip-us.apache.org/repos/asf/camel/blob/86797a34/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java index 29fdc45..c1ffbc1 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java @@ -18,13 +18,13 @@ package org.apache.camel.processor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import org.apache.camel.ContextTestSupport; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.impl.DefaultExchange; import org.apache.camel.processor.Throttler.TimeSlot; - import static org.apache.camel.builder.Builder.constant; /** @@ -37,7 +37,7 @@ public class ThrottlerTest extends ContextTestSupport { public void testSendLotsOfMessagesButOnly3GetThrough() throws Exception { MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class); resultEndpoint.expectedMessageCount(3); - resultEndpoint.setResultWaitTime(5000); + resultEndpoint.setResultWaitTime(2000); for (int i = 0; i < messageCount; i++) { template.sendBody("seda:a", "<message>" + i + "</message>"); @@ -47,6 +47,25 @@ public class ThrottlerTest extends ContextTestSupport { // to check that the throttle really does kick in resultEndpoint.assertIsSatisfied(); } + + public void testSendLotsOfMessagesWithRejctExecution() throws Exception { + MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class); + resultEndpoint.expectedMessageCount(2); + resultEndpoint.setResultWaitTime(2000); + + MockEndpoint errorEndpoint = resolveMandatoryEndpoint("mock:error", MockEndpoint.class); + errorEndpoint.expectedMessageCount(4); + errorEndpoint.setResultWaitTime(2000); + + for (int i = 0; i < 6; i++) { + template.sendBody("direct:start", "<message>" + i + "</message>"); + } + + // lets pause to give the requests time to be processed + // to check that the throttle really does kick in + resultEndpoint.assertIsSatisfied(); + errorEndpoint.assertIsSatisfied(); + } public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough() throws Exception { MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class); @@ -75,7 +94,7 @@ public class ThrottlerTest extends ContextTestSupport { } public void testTimeSlotCalculus() throws Exception { - Throttler throttler = new Throttler(context, null, constant(3), 1000, null, false); + Throttler throttler = new Throttler(context, null, constant(3), 1000, null, false, false); // calculate will assign a new slot throttler.calculateDelay(new DefaultExchange(context)); TimeSlot slot = throttler.nextSlot(); @@ -93,7 +112,7 @@ public class ThrottlerTest extends ContextTestSupport { } public void testTimeSlotCalculusForPeriod() throws InterruptedException { - Throttler throttler = new Throttler(context, null, constant(3), 1000, null, false); + Throttler throttler = new Throttler(context, null, constant(3), 1000, null, false, false); throttler.calculateDelay(new DefaultExchange(context)); TimeSlot slot = throttler.getSlot(); @@ -202,6 +221,11 @@ public class ThrottlerTest extends ContextTestSupport { protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() { + + onException(RejectedExecutionException.class) + .handled(true) + .to("mock:error"); + // START SNIPPET: ex from("seda:a").throttle(3).timePeriodMillis(10000).to("log:result", "mock:result"); // END SNIPPET: ex @@ -211,6 +235,8 @@ public class ThrottlerTest extends ContextTestSupport { from("direct:expressionConstant").throttle(constant(1)).timePeriodMillis(INTERVAL).to("log:result", "mock:result"); from("direct:expressionHeader").throttle(header("throttleValue")).timePeriodMillis(INTERVAL).to("log:result", "mock:result"); + + from("direct:start").throttle(2).timePeriodMillis(10000).rejectExecution(true).to("log:result", "mock:result"); } }; }