Updated Branches: refs/heads/master 99365506f -> 12fd86170
CAMEL-6635: PollingConsumer from a scheduled consumer such as file/ftp can use a regular thread pool instead of being scheduled Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/36f48fb0 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/36f48fb0 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/36f48fb0 Branch: refs/heads/master Commit: 36f48fb0942630cb1179b2eb0bb474affb5b3742 Parents: 9936550 Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Aug 14 11:43:10 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Aug 14 11:58:10 2013 +0200 ---------------------------------------------------------------------- .../DefaultScheduledPollConsumerScheduler.java | 7 ++ .../camel/impl/EventDrivenPollingConsumer.java | 5 +- .../camel/impl/ScheduledPollConsumer.java | 29 +++--- .../camel/impl/ScheduledPollEndpoint.java | 1 + .../SingleScheduledPollConsumerScheduler.java | 103 +++++++++++++++++++ .../spi/ScheduledPollConsumerScheduler.java | 7 ++ .../file/FileConsumerCustomSchedulerTest.java | 5 + .../QuartzScheduledPollConsumerScheduler.java | 12 +++ 8 files changed, 152 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/36f48fb0/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java index 28d1cc0..8c0af76 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java @@ -98,6 +98,13 @@ public class DefaultScheduledPollConsumerScheduler extends org.apache.camel.supp } @Override + public void unscheduleTask() { + if (future != null) { + future.cancel(false); + } + } + + @Override public void startScheduler() { // only schedule task if we have not already done that if (future == null) { http://git-wip-us.apache.org/repos/asf/camel/blob/36f48fb0/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java index b257f28..296c8c4 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java @@ -143,10 +143,9 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement if (consumer instanceof PollingConsumerPollingStrategy) { PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer; strategy.onInit(); - } else { - // for regular consumers start it - ServiceHelper.startService(consumer); } + + ServiceHelper.startService(consumer); } protected void doStop() throws Exception { http://git-wip-us.apache.org/repos/asf/camel/blob/36f48fb0/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java index 2bb28b3..1670354 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java @@ -27,7 +27,6 @@ import org.apache.camel.FailedToCreateConsumerException; import org.apache.camel.LoggingLevel; import org.apache.camel.PollingConsumerPollingStrategy; import org.apache.camel.Processor; -import org.apache.camel.ResolveEndpointFailedException; import org.apache.camel.SuspendableService; import org.apache.camel.spi.PollingConsumerPollStrategy; import org.apache.camel.spi.ScheduledPollConsumerScheduler; @@ -398,7 +397,11 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R scheduler = new DefaultScheduledPollConsumerScheduler(); } scheduler.setCamelContext(getEndpoint().getCamelContext()); - scheduler.scheduleTask(this, this); + + if (!(scheduler instanceof SingleScheduledPollConsumerScheduler)) { + // schedule task if its not the single scheduled + scheduler.scheduleTask(this, this); + } // configure scheduler with options from this consumer Map<String, Object> properties = new HashMap<String, Object>(); @@ -454,28 +457,26 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R @Override public void onInit() throws Exception { - // noop + // use a single scheduler so we do not have it running it periodically when we use + // this consumer as a EventDrivenPollingConsumer + scheduler = new SingleScheduledPollConsumerScheduler(this); } @Override public long beforePoll(long timeout) throws Exception { - LOG.trace("Before poll {}", getEndpoint()); - // resume or start our self - if (!ServiceHelper.resumeService(this)) { - ServiceHelper.startService(this); + if (LOG.isTraceEnabled()) { + LOG.trace("Before poll {}", getEndpoint()); } - - // ensure at least timeout is as long as one poll delay - return Math.max(timeout, getDelay()); + scheduler.scheduleTask(this, this); + return timeout; } @Override public void afterPoll() throws Exception { - LOG.trace("After poll {}", getEndpoint()); - // suspend or stop our self - if (!ServiceHelper.suspendService(this)) { - ServiceHelper.stopService(this); + if (LOG.isTraceEnabled()) { + LOG.trace("After poll {}", getEndpoint()); } + scheduler.unscheduleTask(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/36f48fb0/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java index 5ccbedb..191f9d1 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.camel.CamelContext; import org.apache.camel.Component; +import org.apache.camel.PollingConsumer; import org.apache.camel.util.IntrospectionSupport; /** http://git-wip-us.apache.org/repos/asf/camel/blob/36f48fb0/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java b/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java new file mode 100644 index 0000000..3d9e22e --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.camel.CamelContext; +import org.apache.camel.Consumer; +import org.apache.camel.spi.ScheduledPollConsumerScheduler; + +/** + * A {@link ScheduledPollConsumerScheduler} which is <b>not</b> scheduled but uses a regular single-threaded {@link ExecutorService} + * to execute the task when {@link #scheduleTask(org.apache.camel.Consumer, Runnable)} is invoked. + * <p/> + * This is used when the {@link org.apache.camel.PollingConsumer} EIP is implemented using the {@link EventDrivenPollingConsumer} + * bridging a {@link ScheduledPollConsumer} implementation. In this case we use this single threaded regular thread pool + * to execute the poll task on-demand, instead of using the usual scheduled thread pool which does not fit well with a + * on-demand poll attempt. + */ +public class SingleScheduledPollConsumerScheduler extends org.apache.camel.support.ServiceSupport implements ScheduledPollConsumerScheduler { + + private final Consumer consumer; + private CamelContext camelContext; + private ExecutorService executorService; + private Future future; + + public SingleScheduledPollConsumerScheduler(Consumer consumer) { + this.consumer = consumer; + } + + @Override + public void scheduleTask(Consumer consumer, Runnable task) { + if (isSchedulerStarted()) { + future = executorService.submit(task); + } + } + + @Override + public void unscheduleTask() { + if (future != null) { + future.cancel(false); + future = null; + } + } + + @Override + public void startScheduler() { + // noop + } + + @Override + public boolean isSchedulerStarted() { + return executorService != null && !executorService.isShutdown(); + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + protected void doStart() throws Exception { + if (executorService == null) { + executorService = camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, consumer.getEndpoint().getEndpointKey()); + } + } + + @Override + protected void doStop() throws Exception { + if (future != null) { + future.cancel(false); + future = null; + } + } + + @Override + protected void doShutdown() throws Exception { + if (executorService != null) { + camelContext.getExecutorServiceManager().shutdownNow(executorService); + executorService = null; + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/36f48fb0/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java b/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java index fdf99d3..db9b41f 100644 --- a/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java +++ b/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java @@ -40,6 +40,13 @@ public interface ScheduledPollConsumerScheduler extends ShutdownableService, Cam void scheduleTask(Consumer consumer, Runnable task); /** + * Attempts to unschedules the last task which was scheduled. + * <p/> + * An implementation may not implement this method. + */ + void unscheduleTask(); + + /** * Starts the scheduler. * <p/> * If the scheduler is already started, then this is a noop method call. http://git-wip-us.apache.org/repos/asf/camel/blob/36f48fb0/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java index cfa6b03..ca22bf0 100644 --- a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java +++ b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java @@ -88,6 +88,11 @@ public class FileConsumerCustomSchedulerTest extends ContextTestSupport { }; } + @Override + public void unscheduleTask() { + // noop + } + public int getCounter() { return counter; } http://git-wip-us.apache.org/repos/asf/camel/blob/36f48fb0/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java ---------------------------------------------------------------------- diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java index e4e3c05..fec05c5 100644 --- a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java +++ b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java @@ -63,6 +63,18 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme } @Override + public void unscheduleTask() { + if (trigger != null) { + LOG.debug("Unscheduling trigger: {}", trigger.getKey()); + try { + quartzScheduler.unscheduleJob(trigger.getKey()); + } catch (SchedulerException e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } + } + + @Override public void startScheduler() { // the quartz component starts the scheduler }