CAMEL-6631: Introduce ScheduledPollConsumerScheduler SPI to plugin different schedulers for poll consumer components such as file/ftp etc.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0fb78122 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0fb78122 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0fb78122 Branch: refs/heads/master Commit: 0fb781227d276d9beb82a690d8e604cddf69e12b Parents: 36f48fb Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Aug 14 12:06:42 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Aug 14 12:06:42 2013 +0200 ---------------------------------------------------------------------- .../camel/impl/DefaultScheduledPollConsumerScheduler.java | 6 +++++- .../java/org/apache/camel/impl/ScheduledPollConsumer.java | 7 ++++--- .../camel/impl/SingleScheduledPollConsumerScheduler.java | 9 +++++---- .../apache/camel/spi/ScheduledPollConsumerScheduler.java | 10 ++++++++-- .../component/file/FileConsumerCustomSchedulerTest.java | 7 ++++++- .../quartz2/QuartzScheduledPollConsumerScheduler.java | 6 +++++- 6 files changed, 33 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/0fb78122/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java index 8c0af76..83a40cb 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java @@ -92,8 +92,12 @@ public class DefaultScheduledPollConsumerScheduler extends org.apache.camel.supp } @Override - public void scheduleTask(Consumer consumer, Runnable task) { + public void onInit(Consumer consumer) { this.consumer = consumer; + } + + @Override + public void scheduleTask(Runnable task) { this.task = task; } http://git-wip-us.apache.org/repos/asf/camel/blob/0fb78122/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java index 1670354..546b83a 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java @@ -397,10 +397,11 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R scheduler = new DefaultScheduledPollConsumerScheduler(); } scheduler.setCamelContext(getEndpoint().getCamelContext()); + scheduler.onInit(this); if (!(scheduler instanceof SingleScheduledPollConsumerScheduler)) { // schedule task if its not the single scheduled - scheduler.scheduleTask(this, this); + scheduler.scheduleTask(this); } // configure scheduler with options from this consumer @@ -459,7 +460,7 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R public void onInit() throws Exception { // use a single scheduler so we do not have it running it periodically when we use // this consumer as a EventDrivenPollingConsumer - scheduler = new SingleScheduledPollConsumerScheduler(this); + scheduler = new SingleScheduledPollConsumerScheduler(); } @Override @@ -467,7 +468,7 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R if (LOG.isTraceEnabled()) { LOG.trace("Before poll {}", getEndpoint()); } - scheduler.scheduleTask(this, this); + scheduler.scheduleTask(this); return timeout; } http://git-wip-us.apache.org/repos/asf/camel/blob/0fb78122/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java b/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java index 3d9e22e..347e1d8 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java +++ b/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java @@ -25,7 +25,7 @@ import org.apache.camel.spi.ScheduledPollConsumerScheduler; /** * A {@link ScheduledPollConsumerScheduler} which is <b>not</b> scheduled but uses a regular single-threaded {@link ExecutorService} - * to execute the task when {@link #scheduleTask(org.apache.camel.Consumer, Runnable)} is invoked. + * to execute the task when {@link #scheduleTask(Runnable)} is invoked. * <p/> * This is used when the {@link org.apache.camel.PollingConsumer} EIP is implemented using the {@link EventDrivenPollingConsumer} * bridging a {@link ScheduledPollConsumer} implementation. In this case we use this single threaded regular thread pool @@ -34,17 +34,18 @@ import org.apache.camel.spi.ScheduledPollConsumerScheduler; */ public class SingleScheduledPollConsumerScheduler extends org.apache.camel.support.ServiceSupport implements ScheduledPollConsumerScheduler { - private final Consumer consumer; + private Consumer consumer; private CamelContext camelContext; private ExecutorService executorService; private Future future; - public SingleScheduledPollConsumerScheduler(Consumer consumer) { + @Override + public void onInit(Consumer consumer) { this.consumer = consumer; } @Override - public void scheduleTask(Consumer consumer, Runnable task) { + public void scheduleTask(Runnable task) { if (isSchedulerStarted()) { future = executorService.submit(task); } http://git-wip-us.apache.org/repos/asf/camel/blob/0fb78122/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java b/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java index db9b41f..784676a 100644 --- a/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java +++ b/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java @@ -32,12 +32,18 @@ import org.apache.camel.ShutdownableService; public interface ScheduledPollConsumerScheduler extends ShutdownableService, CamelContextAware { /** - * Schedules the task to run. + * Initializes this {@link ScheduledPollConsumerScheduler} with the associated {@link Consumer}. * * @param consumer the consumer. + */ + void onInit(Consumer consumer); + + /** + * Schedules the task to run. + * * @param task the task to run. */ - void scheduleTask(Consumer consumer, Runnable task); + void scheduleTask(Runnable task); /** * Attempts to unschedules the last task which was scheduled. http://git-wip-us.apache.org/repos/asf/camel/blob/0fb78122/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java index ca22bf0..a466086 100644 --- a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java +++ b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java @@ -78,7 +78,12 @@ public class FileConsumerCustomSchedulerTest extends ContextTestSupport { private String foo; @Override - public void scheduleTask(final Consumer consumer, final Runnable task) { + public void onInit(Consumer consumer) { + // noop + } + + @Override + public void scheduleTask(final Runnable task) { this.timerTask = new TimerTask() { @Override public void run() { http://git-wip-us.apache.org/repos/asf/camel/blob/0fb78122/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java ---------------------------------------------------------------------- diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java index fec05c5..5b460e0 100644 --- a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java +++ b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java @@ -57,8 +57,12 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme private volatile JobDetail job; @Override - public void scheduleTask(Consumer consumer, Runnable runnable) { + public void onInit(Consumer consumer) { this.consumer = consumer; + } + + @Override + public void scheduleTask(Runnable runnable) { this.runnable = runnable; }