Updated Branches: refs/heads/master 546c3fd4a -> 2f75b2f1b
CAMEL-6631: Introduce ScheduledPollConsumerScheduler SPI to plugin different schedulers for poll consumer components such as file/ftp etc. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2f75b2f1 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2f75b2f1 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2f75b2f1 Branch: refs/heads/master Commit: 2f75b2f1bd2f23a864c2dc3c31b1badfd40cc743 Parents: 546c3fd Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Aug 16 08:47:47 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Aug 16 08:47:47 2013 +0200 ---------------------------------------------------------------------- .../mbean/ManagedSchedulePollConsumerMBean.java | 5 ++ .../DefaultScheduledPollConsumerScheduler.java | 2 +- .../mbean/ManagedScheduledPollConsumer.java | 5 ++ .../SpringScheduledPollConsumerScheduler.java | 6 +-- .../util/CamelThreadPoolTaskScheduler.java | 53 ++++++++++++++++++++ 5 files changed, 66 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/2f75b2f1/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSchedulePollConsumerMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSchedulePollConsumerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSchedulePollConsumerMBean.java index 831748a..3179ee6 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSchedulePollConsumerMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSchedulePollConsumerMBean.java @@ -16,6 +16,8 @@ */ package org.apache.camel.api.management.mbean; +import java.util.Map; + import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedOperation; @@ -51,4 +53,7 @@ public interface ManagedSchedulePollConsumerMBean extends ManagedConsumerMBean { @ManagedOperation(description = "Starts the scheduler") void startScheduler(); + @ManagedAttribute(description = "Scheduler classname") + String getSchedulerClassName(); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/2f75b2f1/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java index 325667c..13ed058 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java @@ -143,7 +143,7 @@ public class DefaultScheduledPollConsumerScheduler extends org.apache.camel.supp if (scheduledExecutorService == null) { // we only need one thread in the pool to schedule this task this.scheduledExecutorService = getCamelContext().getExecutorServiceManager() - .newScheduledThreadPool(this, consumer.getEndpoint().getEndpointUri(), 1); + .newSingleThreadScheduledExecutor(consumer, consumer.getEndpoint().getEndpointUri()); // and we should shutdown the thread pool when no longer needed this.shutdownExecutor = true; } http://git-wip-us.apache.org/repos/asf/camel/blob/2f75b2f1/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedScheduledPollConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedScheduledPollConsumer.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedScheduledPollConsumer.java index cddfbb9..ad341a3 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedScheduledPollConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedScheduledPollConsumer.java @@ -78,4 +78,9 @@ public class ManagedScheduledPollConsumer extends ManagedConsumer implements Man public void startScheduler() { getConsumer().startScheduler(); } + + public String getSchedulerClassName() { + return getConsumer().getScheduler().getClass().getName(); + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/2f75b2f1/components/camel-spring/src/main/java/org/apache/camel/spring/pollingconsumer/SpringScheduledPollConsumerScheduler.java ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/main/java/org/apache/camel/spring/pollingconsumer/SpringScheduledPollConsumerScheduler.java b/components/camel-spring/src/main/java/org/apache/camel/spring/pollingconsumer/SpringScheduledPollConsumerScheduler.java index 50aa3be..a9f1e55 100644 --- a/components/camel-spring/src/main/java/org/apache/camel/spring/pollingconsumer/SpringScheduledPollConsumerScheduler.java +++ b/components/camel-spring/src/main/java/org/apache/camel/spring/pollingconsumer/SpringScheduledPollConsumerScheduler.java @@ -22,9 +22,9 @@ import java.util.concurrent.ScheduledFuture; import org.apache.camel.CamelContext; import org.apache.camel.Consumer; import org.apache.camel.spi.ScheduledPollConsumerScheduler; +import org.apache.camel.spring.util.CamelThreadPoolTaskScheduler; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.ObjectHelper; -import org.apache.camel.util.concurrent.CamelThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @@ -116,9 +116,7 @@ public class SpringScheduledPollConsumerScheduler extends ServiceSupport impleme trigger = new CronTrigger(getCron(), getTimeZone()); if (taskScheduler == null) { - taskScheduler = new ThreadPoolTaskScheduler(); - CamelThreadFactory tf = new CamelThreadFactory(getCamelContext().getExecutorServiceManager().getThreadNamePattern(), "SpringScheduledPollConsumerSchedulerTask", true); - taskScheduler.setThreadFactory(tf); + taskScheduler = new CamelThreadPoolTaskScheduler(getCamelContext(), consumer, consumer.getEndpoint().getEndpointUri()); taskScheduler.afterPropertiesSet(); destroyTaskScheduler = true; } http://git-wip-us.apache.org/repos/asf/camel/blob/2f75b2f1/components/camel-spring/src/main/java/org/apache/camel/spring/util/CamelThreadPoolTaskScheduler.java ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/main/java/org/apache/camel/spring/util/CamelThreadPoolTaskScheduler.java b/components/camel-spring/src/main/java/org/apache/camel/spring/util/CamelThreadPoolTaskScheduler.java new file mode 100644 index 0000000..5067e3d --- /dev/null +++ b/components/camel-spring/src/main/java/org/apache/camel/spring/util/CamelThreadPoolTaskScheduler.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spring.util; + +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; + +import org.apache.camel.CamelContext; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; + +/** + * A Camel extension of Spring's {@link ThreadPoolTaskScheduler} which uses the + * {@link org.apache.camel.spi.ExecutorServiceManager} to create and destroy the + * thread pool, which ensures the thread pool is also managed and consistent with + * other usages of thread pools in Camel. + */ +public class CamelThreadPoolTaskScheduler extends ThreadPoolTaskScheduler { + + private final CamelContext camelContext; + private final Object source; + private final String name; + + public CamelThreadPoolTaskScheduler(CamelContext camelContext, Object source, String name) { + this.camelContext = camelContext; + this.source = source; + this.name = name; + } + + @Override + protected ScheduledExecutorService createExecutor(int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { + return camelContext.getExecutorServiceManager().newScheduledThreadPool(source, name, poolSize); + } + + @Override + public void shutdown() { + camelContext.getExecutorServiceManager().shutdownNow(getScheduledExecutor()); + } +}