CAMEL-6631: Introduce ScheduledPollConsumerScheduler SPI to plugin different schedulers for poll consumer components such as file/ftp etc.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fca7b242 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fca7b242 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fca7b242 Branch: refs/heads/master Commit: fca7b242df5961a43a8a605329e2f1ea39668a80 Parents: 38657b4 Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Aug 13 14:31:42 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Aug 13 14:31:42 2013 +0200 ---------------------------------------------------------------------- camel-core/pom.xml | 1 + .../camel/impl/ScheduledPollEndpoint.java | 11 ++ .../file/FileConsumerCustomSchedulerTest.java | 4 +- components/camel-quartz2/pom.xml | 2 + .../quartz2/QuartzScheduledPollConsumerJob.java | 42 +++++ .../QuartzScheduledPollConsumerScheduler.java | 175 +++++++++++++++++++ .../FileConsumerQuartzSchedulerTest.java | 54 ++++++ .../src/test/resources/log4j.properties | 1 + 8 files changed, 288 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/fca7b242/camel-core/pom.xml ---------------------------------------------------------------------- diff --git a/camel-core/pom.xml b/camel-core/pom.xml index d7e8aa8..612ae8e 100755 --- a/camel-core/pom.xml +++ b/camel-core/pom.xml @@ -89,6 +89,7 @@ <camel.osgi.dynamic> org.apache.camel.spring.util;${camel.osgi.import.strict.version}, org.apache.camel.processor.interceptor.jpa;${camel.osgi.import.strict.version}, + org.apache.camel.pollconsumer.quartz2;${camel.osgi.import.strict.version}, net.sf.saxon </camel.osgi.dynamic> <camel.osgi.activator> http://git-wip-us.apache.org/repos/asf/camel/blob/fca7b242/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java index 1d7e62f..5ccbedb 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java @@ -30,6 +30,8 @@ import org.apache.camel.util.IntrospectionSupport; */ public abstract class ScheduledPollEndpoint extends DefaultEndpoint { + private static final String QUARTZ_2_SCHEDULER = "org.apache.camel.pollconsumer.quartz2.QuartzScheduledPollConsumerScheduler"; + protected ScheduledPollEndpoint(String endpointUri, Component component) { super(endpointUri, component); } @@ -116,6 +118,15 @@ public abstract class ScheduledPollEndpoint extends DefaultEndpoint { consumerProperties.put("scheduledExecutorService", scheduledExecutorService); } if (scheduler != null) { + // special for scheduler if its "quartz2" + if ("quartz2".equals(scheduler)) { + try { + Class<?> clazz = getCamelContext().getClassResolver().resolveMandatoryClass(QUARTZ_2_SCHEDULER); + scheduler = getCamelContext().getInjector().newInstance(clazz); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Cannot load " + QUARTZ_2_SCHEDULER + " from classpath. Make sure camel-quarz2.jar is on the classpath.", e); + } + } consumerProperties.put("scheduler", scheduler); } if (!schedulerProperties.isEmpty()) { http://git-wip-us.apache.org/repos/asf/camel/blob/fca7b242/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java index dc99af6..cfa6b03 100644 --- a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java +++ b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java @@ -40,7 +40,7 @@ public class FileConsumerCustomSchedulerTest extends ContextTestSupport { @Override protected void setUp() throws Exception { - deleteDirectory("target/file/customer"); + deleteDirectory("target/file/custom"); super.setUp(); } @@ -123,7 +123,7 @@ public class FileConsumerCustomSchedulerTest extends ContextTestSupport { @Override public void shutdown() throws Exception { - timerTask.cancel(); + timerTask.cancel(); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/fca7b242/components/camel-quartz2/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-quartz2/pom.xml b/components/camel-quartz2/pom.xml index 1c4e7bd..274c86a 100644 --- a/components/camel-quartz2/pom.xml +++ b/components/camel-quartz2/pom.xml @@ -32,10 +32,12 @@ <properties> <camel.osgi.export.pkg> org.apache.camel.component.quartz2.*;${camel.osgi.version}, + org.apache.camel.pollconsumer.quartz2.* org.apache.camel.routepolicy.quartz2.* </camel.osgi.export.pkg> <camel.osgi.import.pkg> !org.apache.camel.component.quartz2.*, + !org.apache.camel.pollconsumer.quartz2.*, !org.apache.camel.routepolicy.quartz2.*, ${camel.osgi.import.defaults}, * http://git-wip-us.apache.org/repos/asf/camel/blob/fca7b242/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerJob.java ---------------------------------------------------------------------- diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerJob.java b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerJob.java new file mode 100644 index 0000000..cdc10b4 --- /dev/null +++ b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerJob.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.pollconsumer.quartz2; + +import org.quartz.Job; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QuartzScheduledPollConsumerJob implements Job { + + private static final transient Logger LOG = LoggerFactory.getLogger(QuartzScheduledPollConsumerJob.class); + + public QuartzScheduledPollConsumerJob() { + } + + @Override + public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { + LOG.trace("Execute job: {}", jobExecutionContext); + + Runnable task = (Runnable) jobExecutionContext.getJobDetail().getJobDataMap().get("task"); + if (task != null) { + LOG.trace("Running task: {}", task); + task.run(); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/fca7b242/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java ---------------------------------------------------------------------- diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java new file mode 100644 index 0000000..e4e3c05 --- /dev/null +++ b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.pollconsumer.quartz2; + +import java.util.TimeZone; + +import org.apache.camel.CamelContext; +import org.apache.camel.Consumer; +import org.apache.camel.component.quartz2.QuartzComponent; +import org.apache.camel.spi.ScheduledPollConsumerScheduler; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.ObjectHelper; +import org.quartz.CronScheduleBuilder; +import org.quartz.CronTrigger; +import org.quartz.Job; +import org.quartz.JobBuilder; +import org.quartz.JobDataMap; +import org.quartz.JobDetail; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.quartz.TriggerBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A quartz based {@link ScheduledPollConsumerScheduler} which uses a {@link CronTrigger} to define when the + * poll should be triggered. + */ +public class QuartzScheduledPollConsumerScheduler extends ServiceSupport implements ScheduledPollConsumerScheduler, Job { + + private static final transient Logger LOG = LoggerFactory.getLogger(QuartzScheduledPollConsumerScheduler.class); + private Scheduler quartzScheduler; + private CamelContext camelContext; + private Consumer consumer; + private Runnable runnable; + private String cron; + private String triggerId; + private String triggerGroup = "QuartzScheduledPollConsumerScheduler"; + private TimeZone timeZone = TimeZone.getDefault(); + private volatile CronTrigger trigger; + private volatile JobDetail job; + + @Override + public void scheduleTask(Consumer consumer, Runnable runnable) { + this.consumer = consumer; + this.runnable = runnable; + } + + @Override + public void startScheduler() { + // the quartz component starts the scheduler + } + + @Override + public boolean isSchedulerStarted() { + try { + return quartzScheduler != null && quartzScheduler.isStarted(); + } catch (SchedulerException e) { + return false; + } + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + public Scheduler getQuartzScheduler() { + return quartzScheduler; + } + + public void setQuartzScheduler(Scheduler scheduler) { + this.quartzScheduler = scheduler; + } + + public String getCron() { + return cron; + } + + public void setCron(String cron) { + this.cron = cron; + } + + public TimeZone getTimeZone() { + return timeZone; + } + + public void setTimeZone(TimeZone timeZone) { + this.timeZone = timeZone; + } + + public String getTriggerId() { + return triggerId; + } + + public void setTriggerId(String triggerId) { + this.triggerId = triggerId; + } + + public String getTriggerGroup() { + return triggerGroup; + } + + public void setTriggerGroup(String triggerGroup) { + this.triggerGroup = triggerGroup; + } + + @Override + public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { + runnable.run(); + } + + @Override + protected void doStart() throws Exception { + ObjectHelper.notEmpty(cron, "cron", this); + + if (quartzScheduler == null) { + // get the scheduler form the quartz component + QuartzComponent quartz = getCamelContext().getComponent("quartz2", QuartzComponent.class); + setQuartzScheduler(quartz.getScheduler()); + } + + JobDataMap map = new JobDataMap(); + map.put("task", runnable); + job = JobBuilder.newJob(QuartzScheduledPollConsumerJob.class) + .usingJobData(map) + .build(); + + String id = triggerId; + if (id == null) { + id = "trigger-" + getCamelContext().getUuidGenerator().generateUuid(); + } + + trigger = TriggerBuilder.newTrigger() + .withIdentity(id, triggerGroup) + .withSchedule(CronScheduleBuilder.cronSchedule(getCron()).inTimeZone(getTimeZone())) + .build(); + + LOG.debug("Scheduling job: {} with trigger: {}", job, trigger.getKey()); + quartzScheduler.scheduleJob(job, trigger); + } + + @Override + protected void doStop() throws Exception { + if (trigger != null) { + LOG.debug("Unscheduling trigger: {}", trigger.getKey()); + quartzScheduler.unscheduleJob(trigger.getKey()); + } + } + + @Override + protected void doShutdown() throws Exception { + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/fca7b242/components/camel-quartz2/src/test/java/org/apache/camel/pollconsumer/quartz2/FileConsumerQuartzSchedulerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-quartz2/src/test/java/org/apache/camel/pollconsumer/quartz2/FileConsumerQuartzSchedulerTest.java b/components/camel-quartz2/src/test/java/org/apache/camel/pollconsumer/quartz2/FileConsumerQuartzSchedulerTest.java new file mode 100644 index 0000000..be0c488 --- /dev/null +++ b/components/camel-quartz2/src/test/java/org/apache/camel/pollconsumer/quartz2/FileConsumerQuartzSchedulerTest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.pollconsumer.quartz2; + +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class FileConsumerQuartzSchedulerTest extends CamelTestSupport { + + @Override + public void setUp() throws Exception { + deleteDirectory("target/file/quartz"); + super.setUp(); + } + + @Test + public void testQuartzScheduler() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(1); + + template.sendBodyAndHeader("file:target/file/quartz", "Hello World", Exchange.FILE_NAME, "hello.txt"); + + context.startRoute("foo"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("file:target/file/quartz?scheduler=quartz2&scheduler.cron=0/2+*+*+*+*+?").routeId("foo").noAutoStartup() + .to("mock:result"); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/fca7b242/components/camel-quartz2/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/components/camel-quartz2/src/test/resources/log4j.properties b/components/camel-quartz2/src/test/resources/log4j.properties index cf13b39..1199657 100644 --- a/components/camel-quartz2/src/test/resources/log4j.properties +++ b/components/camel-quartz2/src/test/resources/log4j.properties @@ -23,6 +23,7 @@ log4j.rootLogger=INFO, file # uncomment the following to enable camel debugging #log4j.logger.org.apache.camel=DEBUG #log4j.logger.org.apache.camel.component.quartz2=DEBUG +#log4j.logger.org.apache.camel.pollconsumer.quartz2=TRACE #log4j.logger.org.apache.camel.routepolicy.quartz2=DEBUG # CONSOLE appender not used by default