Updated Branches: refs/heads/camel-2.11.x d37b8fa19 -> 51369fbc5 refs/heads/master d3c07055a -> 96270b365
CAMEL-6377: Optimize routing engine for delayer to avoid wrapping. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/357f6deb Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/357f6deb Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/357f6deb Branch: refs/heads/master Commit: 357f6deb4f9b4a891b5446e3942ac5b842127fcc Parents: d3c0705 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Jul 28 10:16:55 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Jul 28 10:16:55 2013 +0200 ---------------------------------------------------------------------- .../apache/camel/impl/DefaultCamelContext.java | 7 +---- .../org/apache/camel/model/RouteDefinition.java | 1 - .../camel/processor/CamelInternalProcessor.java | 30 ++++++++++++++++++++ .../processor/interceptor/DefaultChannel.java | 4 +++ .../processor/interceptor/DelayInterceptor.java | 3 ++ .../camel/processor/interceptor/Delayer.java | 3 ++ .../camel/processor/DelayInterceptorTest.java | 8 ++---- 7 files changed, 44 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/357f6deb/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java index 8a3091f..c7e7851 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java @@ -1579,12 +1579,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon } if (getDelayer() != null && getDelayer() > 0) { - // only add a new delayer if not already configured - if (Delayer.getDelayer(this) == null) { - long millis = getDelayer(); - log.info("Delayer is enabled with: {} ms. on CamelContext: {}", millis, getName()); - addInterceptStrategy(new Delayer(millis)); - } + log.info("Delayer is enabled with: {} ms. on CamelContext: {}", getDelayer(), getName()); } // register debugger http://git-wip-us.apache.org/repos/asf/camel/blob/357f6deb/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java index 9af2d2e..4022b0b 100644 --- a/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java @@ -844,7 +844,6 @@ public class RouteDefinition extends ProcessorDefinition<RouteDefinition> { routeContext.setDelayer(delayer); if (delayer > 0) { log.debug("Delayer is enabled with: {} ms. on route: {}", delayer, getId()); - addInterceptStrategy(new Delayer(delayer)); } else { log.debug("Delayer is disabled on route: {}", getId()); } http://git-wip-us.apache.org/repos/asf/camel/blob/357f6deb/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index b93496e..343eed6 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -759,4 +759,34 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { } } + /** + * Advice for delaying + */ + public static class DelayerAdvice implements CamelInternalProcessorAdvice { + + private final long delay; + + public DelayerAdvice(long delay) { + this.delay = delay; + } + + @Override + public Object before(Exchange exchange) throws Exception { + try { + LOG.trace("Sleeping for: {} millis", delay); + Thread.sleep(delay); + } catch (InterruptedException e) { + LOG.debug("Sleep interrupted"); + Thread.currentThread().interrupt(); + throw e; + } + return null; + } + + @Override + public void after(Exchange exchange, Object data) throws Exception { + // noop + } + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/357f6deb/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java index 6af5e96..7ebe5b0 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java +++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java @@ -278,6 +278,10 @@ public class DefaultChannel extends CamelInternalProcessor implements ModelChann addAdvice(new StreamCachingAdvice(camelContext.getStreamCachingStrategy())); } + if (routeContext.getDelayer() != null && routeContext.getDelayer() > 0) { + addAdvice(new DelayerAdvice(routeContext.getDelayer())); + } + // sets the delegate to our wrapped output output = target; } http://git-wip-us.apache.org/repos/asf/camel/blob/357f6deb/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java index b65a4d8..a5a65a1 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java @@ -24,7 +24,10 @@ import org.apache.camel.processor.DelayProcessorSupport; /** * An interceptor for delaying routes. + * + * @deprecated no longer in use, will be removed in next Camel release. */ +@Deprecated public class DelayInterceptor extends DelayProcessorSupport { private final ProcessorDefinition<?> node; http://git-wip-us.apache.org/repos/asf/camel/blob/357f6deb/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java index 6e833ad..8e8bf4e 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java +++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java @@ -25,7 +25,10 @@ import org.apache.camel.spi.InterceptStrategy; /** * An interceptor strategy for delaying routes. + * + * @deprecated no longer in use, will be removed in next Camel release. */ +@Deprecated public class Delayer implements InterceptStrategy { private volatile boolean enabled = true; http://git-wip-us.apache.org/repos/asf/camel/blob/357f6deb/camel-core/src/test/java/org/apache/camel/processor/DelayInterceptorTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/DelayInterceptorTest.java b/camel-core/src/test/java/org/apache/camel/processor/DelayInterceptorTest.java index cc2ef09..1ea6eea 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/DelayInterceptorTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/DelayInterceptorTest.java @@ -20,7 +20,6 @@ import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.processor.interceptor.Delayer; /** * Delay interceptor unit test. @@ -50,11 +49,10 @@ public class DelayInterceptorTest extends ContextTestSupport { return new RouteBuilder() { // START SNIPPET: e1 public void configure() throws Exception { - // add the delay interceptor to delay each step 200 millis - getContext().addInterceptStrategy(new Delayer(200)); - + // configure delayer for each step 200 millis + getContext().setDelayer(200L); + // regular routes here - // END SNIPPET: e1 from("direct:start"). process(new Processor() {