Author: davsclaus Date: Fri Jul 2 13:14:59 2010 New Revision: 959977 URL: http://svn.apache.org/viewvc?rev=959977&view=rev Log: CAMEL-2876: Delayer and throttle EIP supports non blocking delays using .asyncDelayed() option.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerAsyncDelayedTest.java - copied, changed from r959949, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedTest.java - copied, changed from r959949, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java camel/trunk/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java camel/trunk/camel-core/src/test/resources/log4j.properties Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=959977&r1=959976&r2=959977&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java Fri Jul 2 13:14:59 2010 @@ -155,6 +155,29 @@ public class DefaultExecutorServiceStrat return answer; } + public ScheduledExecutorService lookupScheduled(Object source, String name, String executorServiceRef) { + ScheduledExecutorService answer = camelContext.getRegistry().lookup(executorServiceRef, ScheduledExecutorService.class); + if (answer != null && LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Looking up ScheduledExecutorService with ref: " + executorServiceRef + " and found it from Registry: " + answer); + } + } + + if (answer == null) { + ThreadPoolProfile profile = getThreadPoolProfile(name); + if (profile != null) { + int poolSize = profile.getPoolSize(); + answer = newScheduledThreadPool(source, name, poolSize); + if (answer != null && LOG.isDebugEnabled()) { + LOG.debug("Looking up ScheduledExecutorService with ref: " + executorServiceRef + + " and found a matching ThreadPoolProfile to create the ScheduledExecutorService: " + answer); + } + } + } + + return answer; + } + public ExecutorService newDefaultThreadPool(Object source, String name) { ThreadPoolProfile profile = getDefaultThreadPoolProfile(); ObjectHelper.notNull(profile, "DefaultThreadPoolProfile"); @@ -194,6 +217,11 @@ public class DefaultExecutorServiceStrat return answer; } + public ScheduledExecutorService newScheduledThreadPool(Object source, String name) { + int poolSize = getDefaultThreadPoolProfile().getPoolSize(); + return newScheduledThreadPool(source, name, poolSize); + } + public ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize) { ScheduledExecutorService answer = ExecutorServiceHelper.newScheduledThreadPool(poolSize, threadNamePattern, name, true); onThreadPoolCreated(answer); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java?rev=959977&r1=959976&r2=959977&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/DelayDefinition.java Fri Jul 2 13:14:59 2010 @@ -16,9 +16,13 @@ */ package org.apache.camel.model; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlAttribute; import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.XmlTransient; import org.apache.camel.Expression; import org.apache.camel.Processor; @@ -27,6 +31,7 @@ import org.apache.camel.model.language.E import org.apache.camel.processor.Delayer; import org.apache.camel.spi.RouteContext; import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.concurrent.ExecutorServiceHelper; /** * Represents an XML <delay/> element @@ -35,7 +40,16 @@ import org.apache.camel.util.ObjectHelpe */ @XmlRootElement(name = "delay") @XmlAccessorType(XmlAccessType.FIELD) -public class DelayDefinition extends ExpressionNode { +public class DelayDefinition extends ExpressionNode implements ExecutorServiceAwareDefinition<DelayDefinition> { + + @XmlTransient + private ExecutorService executorService; + @XmlAttribute(required = false) + private String executorServiceRef; + @XmlAttribute + private Boolean asyncDelayed; + @XmlAttribute + private Boolean callerRunsWhenRejected = Boolean.TRUE; public DelayDefinition() { } @@ -59,6 +73,39 @@ public class DelayDefinition extends Exp return "Delay[" + getExpression() + " -> " + getOutputs() + "]"; } + @Override + public Processor createProcessor(RouteContext routeContext) throws Exception { + Processor childProcessor = this.createChildProcessor(routeContext, false); + Expression delay = createAbsoluteTimeDelayExpression(routeContext); + + ScheduledExecutorService scheduled = null; + if (getAsyncDelayed() != null && getAsyncDelayed()) { + scheduled = ExecutorServiceHelper.getConfiguredScheduledExecutorService(routeContext, "Delay", this); + if (scheduled == null) { + scheduled = routeContext.getCamelContext().getExecutorServiceStrategy().newScheduledThreadPool(this, "Delay"); + } + } + + Delayer answer = new Delayer(childProcessor, delay, scheduled); + if (getAsyncDelayed() != null) { + answer.setAsyncDelayed(getAsyncDelayed()); + } + if (getCallerRunsWhenRejected() != null) { + answer.setCallerRunsWhenRejected(getCallerRunsWhenRejected()); + } + return answer; + } + + private Expression createAbsoluteTimeDelayExpression(RouteContext routeContext) { + ExpressionDefinition expr = getExpression(); + if (expr != null) { + if (ObjectHelper.isNotEmpty(expr.getExpression()) || expr.getExpressionValue() != null) { + return expr.createExpression(routeContext); + } + } + return null; + } + // Fluent API // ------------------------------------------------------------------------- @@ -72,21 +119,72 @@ public class DelayDefinition extends Exp setExpression(new ExpressionDefinition(ExpressionBuilder.constantExpression(delay))); return this; } - - @Override - public Processor createProcessor(RouteContext routeContext) throws Exception { - Processor childProcessor = this.createChildProcessor(routeContext, false); - Expression delay = createAbsoluteTimeDelayExpression(routeContext); - return new Delayer(childProcessor, delay); + + /** + * Whether or not the caller should run the task when it was rejected by the thread pool. + * <p/> + * Is by default <tt>true</tt> + * + * @param callerRunsWhenRejected whether or not the caller should run + * @return the builder + */ + public DelayDefinition callerRunsWhenRejected(boolean callerRunsWhenRejected) { + setCallerRunsWhenRejected(callerRunsWhenRejected); + return this; } - private Expression createAbsoluteTimeDelayExpression(RouteContext routeContext) { - ExpressionDefinition expr = getExpression(); - if (expr != null) { - if (ObjectHelper.isNotEmpty(expr.getExpression()) || expr.getExpressionValue() != null) { - return expr.createExpression(routeContext); - } - } - return null; + /** + * Enables asynchronous delay which means the thread will <b>noy</b> block while delaying. + * + * @return the builder + */ + public DelayDefinition asyncDelayed() { + setAsyncDelayed(true); + return this; + } + + public DelayDefinition executorService(ExecutorService executorService) { + setExecutorService(executorService); + return this; + } + + public DelayDefinition executorServiceRef(String executorServiceRef) { + setExecutorServiceRef(executorServiceRef); + return this; + } + + // Properties + // ------------------------------------------------------------------------- + + public Boolean getAsyncDelayed() { + return asyncDelayed; + } + + public void setAsyncDelayed(Boolean asyncDelayed) { + this.asyncDelayed = asyncDelayed; + } + + public Boolean getCallerRunsWhenRejected() { + return callerRunsWhenRejected; + } + + public void setCallerRunsWhenRejected(Boolean callerRunsWhenRejected) { + this.callerRunsWhenRejected = callerRunsWhenRejected; + } + + public ExecutorService getExecutorService() { + return executorService; + } + + public void setExecutorService(ExecutorService executorService) { + this.executorService = executorService; + } + + public String getExecutorServiceRef() { + return executorServiceRef; + } + + public void setExecutorServiceRef(String executorServiceRef) { + this.executorServiceRef = executorServiceRef; } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java?rev=959977&r1=959976&r2=959977&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java Fri Jul 2 13:14:59 2010 @@ -16,16 +16,18 @@ */ package org.apache.camel.model; -import java.util.List; - +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAttribute; import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.XmlTransient; import org.apache.camel.Processor; import org.apache.camel.processor.Throttler; import org.apache.camel.spi.RouteContext; +import org.apache.camel.util.concurrent.ExecutorServiceHelper; /** * Represents an XML <throttle/> element @@ -34,11 +36,19 @@ import org.apache.camel.spi.RouteContext */ @XmlRootElement(name = "throttle") @XmlAccessorType(XmlAccessType.FIELD) -public class ThrottleDefinition extends OutputDefinition<ThrottleDefinition> { +public class ThrottleDefinition extends OutputDefinition<ThrottleDefinition> implements ExecutorServiceAwareDefinition<ThrottleDefinition> { + @XmlTransient + private ExecutorService executorService; + @XmlAttribute(required = false) + private String executorServiceRef; @XmlAttribute private Long maximumRequestsPerPeriod; @XmlAttribute private long timePeriodMillis = 1000; + @XmlAttribute + private Boolean asyncDelayed; + @XmlAttribute + private Boolean callerRunsWhenRejected = Boolean.TRUE; public ThrottleDefinition() { } @@ -66,7 +76,23 @@ public class ThrottleDefinition extends @Override public Processor createProcessor(RouteContext routeContext) throws Exception { Processor childProcessor = this.createChildProcessor(routeContext, true); - return new Throttler(childProcessor, maximumRequestsPerPeriod, timePeriodMillis); + + ScheduledExecutorService scheduled = null; + if (getAsyncDelayed() != null && getAsyncDelayed()) { + scheduled = ExecutorServiceHelper.getConfiguredScheduledExecutorService(routeContext, "Throttle", this); + if (scheduled == null) { + scheduled = routeContext.getCamelContext().getExecutorServiceStrategy().newScheduledThreadPool(this, "Throttle"); + } + } + + Throttler answer = new Throttler(childProcessor, maximumRequestsPerPeriod, timePeriodMillis, scheduled); + if (getAsyncDelayed() != null) { + answer.setAsyncDelayed(getAsyncDelayed()); + } + if (getCallerRunsWhenRejected() != null) { + answer.setCallerRunsWhenRejected(getCallerRunsWhenRejected()); + } + return answer; } // Fluent API @@ -94,6 +120,39 @@ public class ThrottleDefinition extends return this; } + /** + * Whether or not the caller should run the task when it was rejected by the thread pool. + * <p/> + * Is by default <tt>true</tt> + * + * @param callerRunsWhenRejected whether or not the caller should run + * @return the builder + */ + public ThrottleDefinition callerRunsWhenRejected(boolean callerRunsWhenRejected) { + setCallerRunsWhenRejected(callerRunsWhenRejected); + return this; + } + + /** + * Enables asynchronous delay which means the thread will <b>noy</b> block while delaying. + * + * @return the builder + */ + public ThrottleDefinition asyncDelayed() { + setAsyncDelayed(true); + return this; + } + + public ThrottleDefinition executorService(ExecutorService executorService) { + setExecutorService(executorService); + return this; + } + + public ThrottleDefinition executorServiceRef(String executorServiceRef) { + setExecutorServiceRef(executorServiceRef); + return this; + } + // Properties // ------------------------------------------------------------------------- @@ -113,4 +172,35 @@ public class ThrottleDefinition extends this.timePeriodMillis = timePeriodMillis; } + public Boolean getAsyncDelayed() { + return asyncDelayed; + } + + public void setAsyncDelayed(Boolean asyncDelayed) { + this.asyncDelayed = asyncDelayed; + } + + public Boolean getCallerRunsWhenRejected() { + return callerRunsWhenRejected; + } + + public void setCallerRunsWhenRejected(Boolean callerRunsWhenRejected) { + this.callerRunsWhenRejected = callerRunsWhenRejected; + } + + public ExecutorService getExecutorService() { + return executorService; + } + + public void setExecutorService(ExecutorService executorService) { + this.executorService = executorService; + } + + public String getExecutorServiceRef() { + return executorServiceRef; + } + + public void setExecutorServiceRef(String executorServiceRef) { + this.executorServiceRef = executorServiceRef; + } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java?rev=959977&r1=959976&r2=959977&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java Fri Jul 2 13:14:59 2010 @@ -16,13 +16,14 @@ */ package org.apache.camel.processor; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.camel.AlreadyStoppedException; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.camel.util.ObjectHelper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,83 +37,140 @@ import org.apache.commons.logging.LogFac */ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor { protected final transient Log log = LogFactory.getLog(getClass()); - private final CountDownLatch stoppedLatch = new CountDownLatch(1); - private boolean fastStop = true; + private final ScheduledExecutorService executorService; + private boolean asyncDelayed; + private boolean callerRunsWhenRejected = true; + + // TODO: Add option to cancel tasks on shutdown so we can stop fast + + private final class ProcessCall implements Runnable { + private final Exchange exchange; + private final AsyncCallback callback; + + public ProcessCall(Exchange exchange, AsyncCallback callback) { + this.exchange = exchange; + this.callback = callback; + } + + public void run() { + if (log.isTraceEnabled()) { + log.trace("Delayed task woke up and continues routing for exchangeId: " + exchange.getExchangeId()); + } + if (!isRunAllowed()) { + exchange.setException(new RejectedExecutionException("Run is not allowed")); + } + DelayProcessorSupport.super.process(exchange, callback); + // signal callback we are done async + callback.done(false); + } + } public DelayProcessorSupport(Processor processor) { + this(processor, null); + } + + public DelayProcessorSupport(Processor processor, ScheduledExecutorService executorService) { super(processor); + this.executorService = executorService; } @Override public boolean process(Exchange exchange, AsyncCallback callback) { - try { - delay(exchange); - } catch (Exception e) { - // exception occurred so we are done - exchange.setException(e); + if (!isRunAllowed()) { + exchange.setException(new RejectedExecutionException("Run is not allowed")); callback.done(true); return true; } - return super.process(exchange, callback); + + // calculate delay and wait + long delay = calculateDelay(exchange); + if (delay <= 0) { + // no delay then continue routing + return super.process(exchange, callback); + } + + if (!isAsyncDelayed() || exchange.isTransacted()) { + // use synchronous delay (also required if using transactions) + try { + delay(delay, exchange); + // then continue routing + return super.process(exchange, callback); + } catch (Exception e) { + // exception occurred so we are done + exchange.setException(e); + callback.done(true); + return true; + } + } else { + // asynchronous delay so schedule a process call task + ProcessCall call = new ProcessCall(exchange, callback); + try { + if (log.isTraceEnabled()) { + log.trace("Scheduling delayed task to run in " + delay + " millis for exchangeId: " + exchange.getExchangeId()); + } + executorService.schedule(call, delay, TimeUnit.MILLISECONDS); + // tell Camel routing engine we continue routing asynchronous + return false; + } catch (RejectedExecutionException e) { + if (isCallerRunsWhenRejected()) { + if (!isRunAllowed()) { + exchange.setException(new RejectedExecutionException()); + } else { + // let caller run by processing + delay(delay, exchange); + // then continue routing + return super.process(exchange, callback); + } + } else { + exchange.setException(e); + } + // caller don't run the task so we are done + callback.done(true); + return true; + } + } } - public boolean isFastStop() { - return fastStop; + public boolean isAsyncDelayed() { + return asyncDelayed; } - /** - * Enables & disables a fast stop; basically to avoid waiting a possibly - * long time for delays to complete before the context shuts down; instead - * the current processing method throws - * {...@link org.apache.camel.AlreadyStoppedException} to terminate processing. - */ - public void setFastStop(boolean fastStop) { - this.fastStop = fastStop; + public void setAsyncDelayed(boolean asyncDelayed) { + this.asyncDelayed = asyncDelayed; + } + + public boolean isCallerRunsWhenRejected() { + return callerRunsWhenRejected; } - protected void doStop() throws Exception { - stoppedLatch.countDown(); - super.doStop(); + public void setCallerRunsWhenRejected(boolean callerRunsWhenRejected) { + this.callerRunsWhenRejected = callerRunsWhenRejected; } - protected abstract void delay(Exchange exchange) throws Exception; + protected abstract long calculateDelay(Exchange exchange); /** - * Wait until the given system time before continuing + * Delays the given time before continuing. + * <p/> + * This implementation will block while waiting * - * @param time the system time to wait for + * @param delay the delay time in millis * @param exchange the exchange being processed */ - protected void waitUntil(long time, Exchange exchange) throws Exception { + protected void delay(long delay, Exchange exchange) { // only run is we are started - while (isRunAllowed()) { - long delay = time - currentSystemTime(); - if (delay < 0) { - return; - } else { - if (isFastStop() && !isRunAllowed()) { - throw new AlreadyStoppedException(); - } - try { - sleep(delay); - } catch (InterruptedException e) { - handleSleepInterruptedException(e); - } - } + if (!isRunAllowed()) { + return; } - } - protected void sleep(long delay) throws InterruptedException { - if (delay <= 0) { + if (delay < 0) { return; - } - if (log.isTraceEnabled()) { - log.trace("Sleeping for: " + delay + " millis"); - } - if (isFastStop()) { - stoppedLatch.await(delay, TimeUnit.MILLISECONDS); } else { - Thread.sleep(delay); + try { + sleep(delay); + } catch (InterruptedException e) { + handleSleepInterruptedException(e); + } } } @@ -129,4 +187,22 @@ public abstract class DelayProcessorSupp protected long currentSystemTime() { return System.currentTimeMillis(); } + + private void sleep(long delay) throws InterruptedException { + if (delay <= 0) { + return; + } + if (log.isTraceEnabled()) { + log.trace("Sleeping for: " + delay + " millis"); + } + Thread.sleep(delay); + } + + @Override + protected void doStart() throws Exception { + if (isAsyncDelayed()) { + ObjectHelper.notNull(executorService, "executorService", this); + } + super.doStart(); + } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java?rev=959977&r1=959976&r2=959977&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java Fri Jul 2 13:14:59 2010 @@ -16,6 +16,8 @@ */ package org.apache.camel.processor; +import java.util.concurrent.ScheduledExecutorService; + import org.apache.camel.Exchange; import org.apache.camel.Expression; import org.apache.camel.Processor; @@ -33,8 +35,8 @@ public class Delayer extends DelayProces private Expression delay; private long delayValue; - public Delayer(Processor processor, Expression delay) { - super(processor); + public Delayer(Processor processor, Expression delay, ScheduledExecutorService executorService) { + super(processor, executorService); this.delay = delay; } @@ -62,11 +64,7 @@ public class Delayer extends DelayProces // Implementation methods // ------------------------------------------------------------------------- - /** - * Waits for an optional time period before continuing to process the - * exchange - */ - protected void delay(Exchange exchange) throws Exception { + protected long calculateDelay(Exchange exchange) { long time = 0; if (delay != null) { Long longValue = delay.evaluate(exchange, Long.class); @@ -79,21 +77,10 @@ public class Delayer extends DelayProces } if (time <= 0) { // no delay - return; + return 0; } - // now add the current time - time += defaultProcessTime(exchange); - - waitUntil(time, exchange); - } - - /** - * A Strategy Method to allow derived implementations to decide the current - * system time or some other default exchange property - */ - protected long defaultProcessTime(Exchange exchange) { - return currentSystemTime(); + return time; } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java?rev=959977&r1=959976&r2=959977&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java Fri Jul 2 13:14:59 2010 @@ -77,6 +77,7 @@ public class ThreadsProcessor extends Se ProcessCall call = new ProcessCall(exchange, callback); try { executorService.submit(call); + // tell Camel routing engine we continue routing asynchronous return false; } catch (RejectedExecutionException e) { if (isCallerRunsWhenRejected()) { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java?rev=959977&r1=959976&r2=959977&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java Fri Jul 2 13:14:59 2010 @@ -16,6 +16,8 @@ */ package org.apache.camel.processor; +import java.util.concurrent.ScheduledExecutorService; + import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -33,14 +35,14 @@ import org.apache.camel.Processor; public class Throttler extends DelayProcessorSupport implements Traceable { private long maximumRequestsPerPeriod; private long timePeriodMillis; - private TimeSlot slot; + private volatile TimeSlot slot; public Throttler(Processor processor, long maximumRequestsPerPeriod) { - this(processor, maximumRequestsPerPeriod, 1000); + this(processor, maximumRequestsPerPeriod, 1000, null); } - public Throttler(Processor processor, long maximumRequestsPerPeriod, long timePeriodMillis) { - super(processor); + public Throttler(Processor processor, long maximumRequestsPerPeriod, long timePeriodMillis, ScheduledExecutorService executorService) { + super(processor, executorService); this.maximumRequestsPerPeriod = maximumRequestsPerPeriod; this.timePeriodMillis = timePeriodMillis; } @@ -81,10 +83,14 @@ public class Throttler extends DelayProc // Implementation methods // ----------------------------------------------------------------------- - protected void delay(Exchange exchange) throws Exception { + + protected long calculateDelay(Exchange exchange) { TimeSlot slot = nextSlot(); if (!slot.isActive()) { - waitUntil(slot.startTime, exchange); + long delay = slot.startTime - currentSystemTime(); + return delay; + } else { + return 0; } } @@ -107,7 +113,7 @@ public class Throttler extends DelayProc */ protected class TimeSlot { - private long capacity = Throttler.this.maximumRequestsPerPeriod; + private volatile long capacity = Throttler.this.maximumRequestsPerPeriod; private final long duration = Throttler.this.timePeriodMillis; private final long startTime; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java?rev=959977&r1=959976&r2=959977&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java Fri Jul 2 13:14:59 2010 @@ -40,10 +40,11 @@ public class DelayInterceptor extends De return "DelayInterceptor[delay: " + delayer.getDelay() + " on: " + node + "]"; } - public void delay(Exchange exchange) throws Exception { + public long calculateDelay(Exchange exchange) { if (delayer.isEnabled()) { - long time = currentSystemTime() + delayer.getDelay(); - waitUntil(time, exchange); + return delayer.getDelay(); + } else { + return 0; } } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=959977&r1=959976&r2=959977&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java Fri Jul 2 13:14:59 2010 @@ -112,6 +112,17 @@ public interface ExecutorServiceStrategy ExecutorService lookup(Object source, String name, String executorServiceRef); /** + * Lookup a {...@link java.util.concurrent.ScheduledExecutorService} from the {...@link org.apache.camel.spi.Registry} + * and from known list of {...@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}. + * + * @param source the source object, usually it should be <tt>this</tt> passed in as parameter + * @param name name which is appended to the thread name + * @param executorServiceRef reference to lookup + * @return the {...@link java.util.concurrent.ScheduledExecutorService} or <tt>null</tt> if not found + */ + ScheduledExecutorService lookupScheduled(Object source, String name, String executorServiceRef); + + /** * Creates a new thread pool using the default thread pool profile. * * @param source the source object, usually it should be <tt>this</tt> passed in as parameter @@ -141,6 +152,8 @@ public interface ExecutorServiceStrategy /** * Creates a new scheduled thread pool. + * <p/> + * Will use the pool size from the default thread pool profile * * @param source the source object, usually it should be <tt>this</tt> passed in as parameter * @param name name which is appended to the thread name @@ -150,6 +163,15 @@ public interface ExecutorServiceStrategy ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize); /** + * Creates a new scheduled thread pool. + * + * @param source the source object, usually it should be <tt>this</tt> passed in as parameter + * @param name name which is appended to the thread name + * @return the created thread pool + */ + ScheduledExecutorService newScheduledThreadPool(Object source, String name); + + /** * Creates a new fixed thread pool. * * @param source the source object, usually it should be <tt>this</tt> passed in as parameter Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java?rev=959977&r1=959976&r2=959977&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java Fri Jul 2 13:14:59 2010 @@ -1256,7 +1256,7 @@ public final class ObjectHelper { return "0x" + Integer.toHexString(System.identityHashCode(object)); } - private static class ExceptionIterator implements Iterator<Throwable> { + private static final class ExceptionIterator implements Iterator<Throwable> { private List<Throwable> tree = new ArrayList<Throwable>(); private Iterator<Throwable> it; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=959977&r1=959976&r2=959977&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Fri Jul 2 13:14:59 2010 @@ -261,4 +261,48 @@ public final class ExecutorServiceHelper return null; } + /** + * Will lookup and get the configured {...@link java.util.concurrent.ScheduledExecutorService} from the given definition. + * <p/> + * This method will lookup for configured thread pool in the following order + * <ul> + * <li>from the definition if any explicit configured executor service.</li> + * <li>from the {...@link org.apache.camel.spi.Registry} if found</li> + * <li>from the known list of {...@link org.apache.camel.spi.ThreadPoolProfile ThreadPoolProfile(s)}.</li> + * <li>if none found, then <tt>null</tt> is returned.</li> + * </ul> + * The various {...@link ExecutorServiceAwareDefinition} should use this helper method to ensure they support + * configured executor services in the same coherent way. + * + * @param routeContext the rout context + * @param name name which is appended to the thread name, when the {...@link java.util.concurrent.ExecutorService} + * is created based on a {...@link org.apache.camel.spi.ThreadPoolProfile}. + * @param definition the node definition which may leverage executor service. + * @return the configured executor service, or <tt>null</tt> if none was configured. + * @throws IllegalArgumentException is thrown if lookup of executor service in {...@link org.apache.camel.spi.Registry} was not found + * or the found instance is not a ScheduledExecutorService type. + */ + public static ScheduledExecutorService getConfiguredScheduledExecutorService(RouteContext routeContext, String name, + ExecutorServiceAwareDefinition definition) throws IllegalArgumentException { + ExecutorServiceStrategy strategy = routeContext.getCamelContext().getExecutorServiceStrategy(); + ObjectHelper.notNull(strategy, "ExecutorServiceStrategy", routeContext.getCamelContext()); + + // prefer to use explicit configured executor on the definition + if (definition.getExecutorService() != null) { + ExecutorService executorService = definition.getExecutorService(); + if (executorService instanceof ScheduledExecutorService) { + return (ScheduledExecutorService) executorService; + } + throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " is not an ScheduledExecutorService instance"); + } else if (definition.getExecutorServiceRef() != null) { + ScheduledExecutorService answer = strategy.lookupScheduled(definition, name, definition.getExecutorServiceRef()); + if (answer == null) { + throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " not found in registry."); + } + return answer; + } + + return null; + } + } Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerAsyncDelayedTest.java (from r959949, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerAsyncDelayedTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerAsyncDelayedTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerTest.java&r1=959949&r2=959977&rev=959977&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerAsyncDelayedTest.java Fri Jul 2 13:14:59 2010 @@ -23,7 +23,7 @@ import org.apache.camel.component.mock.M /** * @version $Revision$ */ -public class DelayerTest extends ContextTestSupport { +public class DelayerAsyncDelayedTest extends ContextTestSupport { public void testSendingMessageGetsDelayed() throws Exception { MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class); @@ -54,13 +54,13 @@ public class DelayerTest extends Context return new RouteBuilder() { public void configure() { // START SNIPPET: ex - from("seda:a").delay().header("MyDelay").to("mock:result"); + from("seda:a").delay().header("MyDelay").asyncDelayed().to("mock:result"); // END SNIPPET: ex // START SNIPPET: ex2 - from("seda:b").delay(1000).to("mock:result"); + from("seda:b").delay(1000).asyncDelayed().to("mock:result"); // END SNIPPET: ex2 } }; } -} +} \ No newline at end of file Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedTest.java (from r959949, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java&r1=959949&r2=959977&rev=959977&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedTest.java Fri Jul 2 13:14:59 2010 @@ -22,14 +22,13 @@ import java.util.concurrent.Executors; import org.apache.camel.ContextTestSupport; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.processor.Throttler.TimeSlot; /** * @version $Revision$ */ -public class ThrottlerTest extends ContextTestSupport { +public class ThrottlerAsyncDelayedTest extends ContextTestSupport { private static final int INTERVAL = 500; - protected int messageCount = 6; + protected int messageCount = 9; public void testSendLotsOfMessagesButOnly3GetThrough() throws Exception { MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class); @@ -44,7 +43,7 @@ public class ThrottlerTest extends Conte // to check that the throttle really does kick in resultEndpoint.assertIsSatisfied(); } - + public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough() throws Exception { long start = System.currentTimeMillis(); MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class); @@ -55,41 +54,26 @@ public class ThrottlerTest extends Conte executor.execute(new Runnable() { public void run() { template.sendBody("direct:a", "<message>payload</message>"); - } + } }); } - + // let's wait for the exchanges to arrive resultEndpoint.assertIsSatisfied(); - + // now assert that they have actually been throttled long minimumTime = (messageCount - 1) * INTERVAL; assertTrue("Should take at least " + minimumTime + "ms", System.currentTimeMillis() - start >= minimumTime); } - - public void testTimeSlotCalculus() throws Exception { - Throttler throttler = new Throttler(null, 2, 1000); - TimeSlot slot = throttler.nextSlot(); - // start a new time slot - assertNotNull(slot); - // make sure the same slot is used (2 exchanges per slot) - assertSame(slot, throttler.nextSlot()); - assertTrue(slot.isFull()); - - TimeSlot next = throttler.nextSlot(); - // now we should have a new slot that starts somewhere in the future - assertNotSame(slot, next); - assertFalse(next.isActive()); - } protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() { // START SNIPPET: ex - from("seda:a").throttle(3).timePeriodMillis(10000).to("mock:result"); + from("seda:a").throttle(3).timePeriodMillis(10000).asyncDelayed().to("log:result", "mock:result"); // END SNIPPET: ex - - from("direct:a").throttle(1).timePeriodMillis(INTERVAL).to("mock:result"); + + from("direct:a").throttle(1).timePeriodMillis(INTERVAL).asyncDelayed().to("log:result", "mock:result"); } }; } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java?rev=959977&r1=959976&r2=959977&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java Fri Jul 2 13:14:59 2010 @@ -29,7 +29,7 @@ import org.apache.camel.processor.Thrott */ public class ThrottlerTest extends ContextTestSupport { private static final int INTERVAL = 500; - protected int messageCount = 6; + protected int messageCount = 9; public void testSendLotsOfMessagesButOnly3GetThrough() throws Exception { MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class); @@ -68,7 +68,7 @@ public class ThrottlerTest extends Conte } public void testTimeSlotCalculus() throws Exception { - Throttler throttler = new Throttler(null, 2, 1000); + Throttler throttler = new Throttler(null, 2, 1000, null); TimeSlot slot = throttler.nextSlot(); // start a new time slot assertNotNull(slot); @@ -86,10 +86,10 @@ public class ThrottlerTest extends Conte return new RouteBuilder() { public void configure() { // START SNIPPET: ex - from("seda:a").throttle(3).timePeriodMillis(10000).to("mock:result"); + from("seda:a").throttle(3).timePeriodMillis(10000).to("log:result", "mock:result"); // END SNIPPET: ex - from("direct:a").throttle(1).timePeriodMillis(INTERVAL).to("mock:result"); + from("direct:a").throttle(1).timePeriodMillis(INTERVAL).to("log:result", "mock:result"); } }; } Modified: camel/trunk/camel-core/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/resources/log4j.properties?rev=959977&r1=959976&r2=959977&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/resources/log4j.properties (original) +++ camel/trunk/camel-core/src/test/resources/log4j.properties Fri Jul 2 13:14:59 2010 @@ -34,6 +34,8 @@ log4j.logger.org.apache.activemq.spring= #log4j.logger.org.apache.camel.processor.RoutingSlip=TRACE #log4j.logger.org.apache.camel.processor.TryProcessor=TRACE #log4j.logger.org.apache.camel.processor.loadbalancer=TRACE +#log4j.logger.org.apache.camel.processor.Delayer=TRACE +#log4j.logger.org.apache.camel.processor.Throttler=TRACE log4j.logger.org.apache.camel.impl.converter=WARN log4j.logger.org.apache.camel.management=WARN log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN