Author: davsclaus Date: Wed Apr 11 08:43:21 2012 New Revision: 1324638 URL: http://svn.apache.org/viewvc?rev=1324638&view=rev Log: CAMEL-5126: Improved error message if invalid configuration of throttler EIP. CAMEL-5163: Fixed issue if throttler/delayer expression evalution threw exception, then error handler does not react.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerInvalidConfiguredTest.java - copied, changed from r1324578, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerNullEvalTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java?rev=1324638&r1=1324637&r2=1324638&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java Wed Apr 11 08:43:21 2012 @@ -129,7 +129,7 @@ public class ExpressionNode extends Proc @Override protected void preCreateProcessor() { Expression exp = expression; - if (expression.getExpressionValue() != null) { + if (expression != null && expression.getExpressionValue() != null) { exp = expression.getExpressionValue(); } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java?rev=1324638&r1=1324637&r2=1324638&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java Wed Apr 11 08:43:21 2012 @@ -88,7 +88,12 @@ public class ThrottleDefinition extends // should be default 1000 millis long period = getTimePeriodMillis() != null ? getTimePeriodMillis() : 1000L; + + // max requests per period is mandatory Expression maxRequestsExpression = createMaxRequestsPerPeriodExpression(routeContext); + if (maxRequestsExpression == null) { + throw new IllegalArgumentException("MaxRequestsPerPeriod expression must be provided on " + this); + } Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java?rev=1324638&r1=1324637&r2=1324638&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java Wed Apr 11 08:43:21 2012 @@ -95,11 +95,18 @@ public abstract class DelayProcessorSupp } // calculate delay and wait - long delay = calculateDelay(exchange); - if (delay <= 0) { - // no delay then continue routing - log.trace("No delay for exchangeId: {}", exchange.getExchangeId()); - return super.process(exchange, callback); + long delay; + try { + delay = calculateDelay(exchange); + if (delay <= 0) { + // no delay then continue routing + log.trace("No delay for exchangeId: {}", exchange.getExchangeId()); + return super.process(exchange, callback); + } + } catch (Throwable e) { + exchange.setException(e); + callback.done(true); + return true; } if (!isAsyncDelayed() || exchange.isTransacted()) { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java?rev=1324638&r1=1324637&r2=1324638&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java Wed Apr 11 08:43:21 2012 @@ -22,6 +22,7 @@ import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.Expression; import org.apache.camel.Processor; +import org.apache.camel.RuntimeExchangeException; import org.apache.camel.Traceable; import org.apache.camel.util.ObjectHelper; @@ -37,7 +38,7 @@ import org.apache.camel.util.ObjectHelpe * @version */ public class Throttler extends DelayProcessorSupport implements Traceable { - private long maximumRequestsPerPeriod; + private volatile long maximumRequestsPerPeriod; private Expression maxRequestsPerPeriodExpression; private long timePeriodMillis = 1000; private volatile TimeSlot slot; @@ -101,7 +102,14 @@ public class Throttler extends DelayProc // ----------------------------------------------------------------------- protected long calculateDelay(Exchange exchange) { - Long longValue = maxRequestsPerPeriodExpression.evaluate(exchange, Long.class); + // evaluate as Object first to see if we get any result at all + Object result = maxRequestsPerPeriodExpression.evaluate(exchange, Object.class); + if (result == null) { + throw new RuntimeExchangeException("The max requests per period expression was evaluated as null: " + maxRequestsPerPeriodExpression, exchange); + } + + // then must convert value to long + Long longValue = exchange.getContext().getTypeConverter().convertTo(Long.class, result); if (longValue != null) { // log if we changed max period after initial setting if (maximumRequestsPerPeriod > 0 && longValue.longValue() != maximumRequestsPerPeriod) { Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerInvalidConfiguredTest.java (from r1324578, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerInvalidConfiguredTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerInvalidConfiguredTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java&r1=1324578&r2=1324638&rev=1324638&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerInvalidConfiguredTest.java Wed Apr 11 08:43:21 2012 @@ -16,146 +16,36 @@ */ package org.apache.camel.processor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - import org.apache.camel.ContextTestSupport; +import org.apache.camel.FailedToCreateRouteException; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.impl.DefaultExchange; -import org.apache.camel.processor.Throttler.TimeSlot; - -import static org.apache.camel.builder.Builder.constant; /** * @version */ -public class ThrottlerTest extends ContextTestSupport { - private static final int INTERVAL = 500; - protected int messageCount = 9; - - public void testSendLotsOfMessagesButOnly3GetThrough() throws Exception { - MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class); - resultEndpoint.expectedMessageCount(3); - resultEndpoint.setResultWaitTime(5000); - - for (int i = 0; i < messageCount; i++) { - template.sendBody("seda:a", "<message>" + i + "</message>"); - } - - // lets pause to give the requests time to be processed - // to check that the throttle really does kick in - resultEndpoint.assertIsSatisfied(); - } - - public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough() throws Exception { - MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class); - resultEndpoint.expectedMessageCount(messageCount); +public class ThrottlerInvalidConfiguredTest extends ContextTestSupport { - ExecutorService executor = Executors.newFixedThreadPool(messageCount); - - long start = System.currentTimeMillis(); - for (int i = 0; i < messageCount; i++) { - executor.execute(new Runnable() { - public void run() { - template.sendBody("direct:a", "<message>payload</message>"); - } - }); + public void testInvalid() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + // null is invalid + from("seda:a").throttle(null).to("mock:result"); + } + }); + try { + context.start(); + fail("Should have thrown exception"); + } catch (FailedToCreateRouteException e) { + IllegalArgumentException cause = assertIsInstanceOf(IllegalArgumentException.class, e.getCause()); + assertTrue(cause.getMessage().startsWith("MaxRequestsPerPeriod expression must be provided")); } - - // let's wait for the exchanges to arrive - resultEndpoint.assertIsSatisfied(); - - // now assert that they have actually been throttled - long minimumTime = (messageCount - 1) * INTERVAL; - // add a little slack - long delta = System.currentTimeMillis() - start + 200; - assertTrue("Should take at least " + minimumTime + "ms, was: " + delta, delta >= minimumTime); - executor.shutdownNow(); - } - - public void testTimeSlotCalculus() throws Exception { - Throttler throttler = new Throttler(context, null, constant(3), 1000, null, false); - // calculate will assign a new slot - throttler.calculateDelay(new DefaultExchange(context)); - TimeSlot slot = throttler.nextSlot(); - // start a new time slot - assertNotNull(slot); - // make sure the same slot is used (3 exchanges per slot) - assertSame(slot, throttler.nextSlot()); - assertTrue(slot.isFull()); - - TimeSlot next = throttler.nextSlot(); - // now we should have a new slot that starts somewhere in the future - assertNotSame(slot, next); - assertFalse(next.isActive()); + context.stop(); } - public void testConfigurationWithConstantExpression() throws Exception { - MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class); - resultEndpoint.expectedMessageCount(messageCount); - - ExecutorService executor = Executors.newFixedThreadPool(messageCount); - - long start = System.currentTimeMillis(); - for (int i = 0; i < messageCount; i++) { - executor.execute(new Runnable() { - public void run() { - template.sendBody("direct:expressionConstant", "<message>payload</message>"); - } - }); - } - - // let's wait for the exchanges to arrive - resultEndpoint.assertIsSatisfied(); - - // now assert that they have actually been throttled - long minimumTime = (messageCount - 1) * INTERVAL; - // add a little slack - long delta = System.currentTimeMillis() - start + 200; - assertTrue("Should take at least " + minimumTime + "ms, was: " + delta, delta >= minimumTime); - executor.shutdownNow(); + @Override + public boolean isUseRouteBuilder() { + return false; } - public void testConfigurationWithHeaderExpression() throws Exception { - MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class); - resultEndpoint.expectedMessageCount(messageCount); - - ExecutorService executor = Executors.newFixedThreadPool(messageCount); - - long start = System.currentTimeMillis(); - for (int i = 0; i < messageCount; i++) { - executor.execute(new Runnable() { - public void run() { - template.sendBodyAndHeader("direct:expressionHeader", "<message>payload</message>", "throttleValue", 1); - } - }); - } - - // let's wait for the exchanges to arrive - resultEndpoint.assertIsSatisfied(); - - // now assert that they have actually been throttled - long minimumTime = (messageCount - 1) * INTERVAL; - // add a little slack - long delta = System.currentTimeMillis() - start + 200; - assertTrue("Should take at least " + minimumTime + "ms, was: " + delta, delta >= minimumTime); - executor.shutdownNow(); - } - - protected RouteBuilder createRouteBuilder() { - return new RouteBuilder() { - public void configure() { - // START SNIPPET: ex - from("seda:a").throttle(3).timePeriodMillis(10000).to("log:result", "mock:result"); - // END SNIPPET: ex - - from("direct:a").throttle(1).timePeriodMillis(INTERVAL).to("log:result", "mock:result"); - - from("direct:expressionConstant").throttle(constant(1)).timePeriodMillis(INTERVAL).to("log:result", "mock:result"); - - from("direct:expressionHeader").throttle(header("throttleValue")).timePeriodMillis(INTERVAL).to("log:result", "mock:result"); - } - }; - } } \ No newline at end of file Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerNullEvalTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerNullEvalTest.java?rev=1324638&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerNullEvalTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerNullEvalTest.java Wed Apr 11 08:43:21 2012 @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version + */ +public class ThrottlerNullEvalTest extends ContextTestSupport { + + public void testNullEvalTest() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World"); + getMockEndpoint("mock:dead").expectedBodiesReceived("Kaboom"); + + template.sendBodyAndHeader("seda:a", "Hello World", "max", 2); + template.sendBodyAndHeader("seda:a", "Kaboom", "max", null); + template.sendBodyAndHeader("seda:a", "Bye World", "max", 2); + + assertMockEndpointsSatisfied(); + } + + public void testNoHeaderTest() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World"); + getMockEndpoint("mock:dead").expectedBodiesReceived("Kaboom"); + + template.sendBodyAndHeader("seda:a", "Hello World", "max", 2); + template.sendBody("seda:a", "Kaboom"); + template.sendBodyAndHeader("seda:a", "Bye World", "max", 2); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead")); + + from("seda:a").throttle(header("max")).to("mock:result"); + } + }; + } +} \ No newline at end of file