Repository: camel Updated Branches: refs/heads/master de2a95ece -> 15e0393c6
CAMEL-9184: Throttler should quick reject if configured to do so. Also fixed the isPast method which was wrong. Thanks to Arno Noordover for the patch. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6a6061b5 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6a6061b5 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6a6061b5 Branch: refs/heads/master Commit: 6a6061b500fde97892f5f4cd6a6f72ae95e9fc6c Parents: be2bde4 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Oct 4 11:47:25 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Oct 4 11:47:34 2015 +0200 ---------------------------------------------------------------------- .../org/apache/camel/processor/Throttler.java | 14 ++-- .../apache/camel/processor/ThrottlerTest.java | 67 ++++++++++++++++++-- 2 files changed, 72 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/6a6061b5/camel-core/src/main/java/org/apache/camel/processor/Throttler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java index a711f6d..339da9d 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java +++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java @@ -151,12 +151,16 @@ public class Throttler extends DelayProcessorSupport implements Traceable, IdAwa /* * Determine what the next available time slot is for handling an Exchange */ - protected synchronized TimeSlot nextSlot() { + protected synchronized TimeSlot nextSlot() throws ThrottlerRejectedExecutionException { if (slot == null) { slot = new TimeSlot(); - } - if (slot.isFull() || !slot.isPast()) { - slot = slot.next(); + } else { + if (rejectExecution && slot.isFull() && !slot.isPast()) { + throw new ThrottlerRejectedExecutionException("Exceed the max request limit!"); + } + if (slot.isFull() || slot.isPast()) { + slot = slot.next(); + } } slot.assign(); return slot; @@ -193,7 +197,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable, IdAwa protected boolean isPast() { long current = System.currentTimeMillis(); - return current < (startTime + duration); + return current > (startTime + duration); } protected boolean isActive() { http://git-wip-us.apache.org/repos/asf/camel/blob/6a6061b5/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java index 4931db4..3c4358e 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java @@ -19,6 +19,7 @@ package org.apache.camel.processor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.camel.AsyncCallback; import org.apache.camel.ContextTestSupport; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; @@ -27,10 +28,6 @@ import org.apache.camel.processor.Throttler.TimeSlot; import static org.apache.camel.builder.Builder.constant; - -/** - * @version - */ public class ThrottlerTest extends ContextTestSupport { private static final int INTERVAL = 500; protected int messageCount = 9; @@ -257,6 +254,68 @@ public class ThrottlerTest extends ContextTestSupport { assertTrue("Should take at most " + maxTime + "ms, was: " + delta, delta <= maxTime); } + /* + Given: you have a throttler which rejects messages + Then: + Throttler should return an exception when calculating the delay or set the exception + on the exchange when processing the delay. + */ + public void testWhenTimeSlotIsFullShouldReturnThrottlerRejectedExecutionException() { + if (!canTest()) { + return; + } + Throttler throttler = new Throttler(context, null, constant(1), 1000, null, false, true); + AsyncCallback callback = new AsyncCallback() { + @Override + public void done(boolean doneSync) { + + } + }; + throttler.calculateDelay(new DefaultExchange(context)); + + boolean exceptionThrown = false; + DefaultExchange exchange = null; + try { + long delay = throttler.calculateDelay(new DefaultExchange(context)); + exchange = new DefaultExchange(context); + throttler.processDelay(exchange, + callback, + delay); + } catch (ThrottlerRejectedExecutionException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown || exchange.getException().getClass() == ThrottlerRejectedExecutionException.class); + } + + /* + Given: you have a throttler which rejects messages after the first message + Then: the timeslot should be the original timeslot or the new timeslot should not be full + */ + public void testRejectionOfExecutionShouldNotFillNextTimeSlot() { + if (!canTest()) { + return; + } + Throttler throttler = new Throttler(context, null, constant(1), 10000, null, false, true); + AsyncCallback callback = new AsyncCallback() { + @Override + public void done(boolean doneSync) { + + } + }; + throttler.calculateDelay(new DefaultExchange(context)); + TimeSlot currentSlot = throttler.getSlot(); + DefaultExchange exchange; + try { + long delay = throttler.calculateDelay(new DefaultExchange(context)); + exchange = new DefaultExchange(context); + throttler.processDelay(exchange, + callback, + delay); + } catch (ThrottlerRejectedExecutionException ignore) { + } + assertTrue(currentSlot == throttler.getSlot() || !throttler.getSlot().isFull()); + } + protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() {