Repository: camel Updated Branches: refs/heads/master 6482bf78b -> ddf9e0be5
CAMEL-7663: QuartzScheduledPollConsumerScheduler supports jdbc datastore and is cluster aware. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ddf9e0be Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ddf9e0be Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ddf9e0be Branch: refs/heads/master Commit: ddf9e0be50858c7ed43fc401e4417b62a73152db Parents: 6482bf7 Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Aug 25 10:01:03 2014 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Aug 25 10:01:03 2014 +0200 ---------------------------------------------------------------------- .../camel/component/quartz2/CamelJob.java | 6 ++-- .../camel/component/quartz2/QuartzEndpoint.java | 12 +------ .../camel/component/quartz2/QuartzHelper.java | 15 +++++++++ .../quartz2/QuartzScheduledPollConsumerJob.java | 33 +++++++++++++++++--- .../QuartzScheduledPollConsumerScheduler.java | 21 ++++++++++++- 5 files changed, 67 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ddf9e0be/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java ---------------------------------------------------------------------- diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java index df580a0..1d5d88d 100644 --- a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java +++ b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java @@ -69,7 +69,7 @@ public class CamelJob implements Job { } } - private CamelContext getCamelContext(JobExecutionContext context) throws JobExecutionException { + protected CamelContext getCamelContext(JobExecutionContext context) throws JobExecutionException { SchedulerContext schedulerContext = getSchedulerContext(context); String camelContextName = context.getMergedJobDataMap().getString(QuartzConstants.QUARTZ_CAMEL_CONTEXT_NAME); CamelContext result = (CamelContext)schedulerContext.get(QuartzConstants.QUARTZ_CAMEL_CONTEXT + "-" + camelContextName); @@ -79,7 +79,7 @@ public class CamelJob implements Job { return result; } - private SchedulerContext getSchedulerContext(JobExecutionContext context) throws JobExecutionException { + protected SchedulerContext getSchedulerContext(JobExecutionContext context) throws JobExecutionException { try { return context.getScheduler().getContext(); } catch (SchedulerException e) { @@ -87,7 +87,7 @@ public class CamelJob implements Job { } } - private QuartzEndpoint lookupQuartzEndpoint(CamelContext camelContext, JobExecutionContext quartzContext) throws JobExecutionException { + protected QuartzEndpoint lookupQuartzEndpoint(CamelContext camelContext, JobExecutionContext quartzContext) throws JobExecutionException { TriggerKey triggerKey = quartzContext.getTrigger().getKey(); if (LOG.isDebugEnabled()) { LOG.debug("Looking up existing QuartzEndpoint with triggerKey={}", triggerKey); http://git-wip-us.apache.org/repos/asf/camel/blob/ddf9e0be/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzEndpoint.java b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzEndpoint.java index 6d2167d..c4cb731 100644 --- a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzEndpoint.java +++ b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzEndpoint.java @@ -240,7 +240,7 @@ public class QuartzEndpoint extends DefaultEndpoint { jobDetail = createJobDetail(); trigger = createTrigger(jobDetail); - updateJobDataMap(jobDetail); + QuartzHelper.updateJobDataMap(getCamelContext(), jobDetail, getEndpointUri()); // Schedule it now. Remember that scheduler might not be started it, but we can schedule now. Date nextFireDate = scheduler.scheduleJob(jobDetail, trigger); @@ -274,16 +274,6 @@ public class QuartzEndpoint extends DefaultEndpoint { } } - private void updateJobDataMap(JobDetail jobDetail) { - // Store this camelContext name into the job data - JobDataMap jobDataMap = jobDetail.getJobDataMap(); - String camelContextName = QuartzHelper.getQuartzContextName(getCamelContext()); - String endpointUri = getEndpointUri(); - LOG.debug("Adding camelContextName={}, endpointUri={} into job data map.", camelContextName, endpointUri); - jobDataMap.put(QuartzConstants.QUARTZ_CAMEL_CONTEXT_NAME, camelContextName); - jobDataMap.put(QuartzConstants.QUARTZ_ENDPOINT_URI, endpointUri); - } - private Trigger createTrigger(JobDetail jobDetail) throws Exception { Trigger result; Date startTime = new Date(); http://git-wip-us.apache.org/repos/asf/camel/blob/ddf9e0be/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzHelper.java b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzHelper.java index 0d5c5bc..bb6db52 100644 --- a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzHelper.java +++ b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzHelper.java @@ -17,9 +17,15 @@ package org.apache.camel.component.quartz2; import org.apache.camel.CamelContext; +import org.quartz.JobDataMap; +import org.quartz.JobDetail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class QuartzHelper { + public static final Logger LOG = LoggerFactory.getLogger(QuartzEndpoint.class); + private QuartzHelper() { } @@ -32,4 +38,13 @@ public final class QuartzHelper { } } + public static void updateJobDataMap(CamelContext camelContext, JobDetail jobDetail, String endpointUri) { + // Store this camelContext name into the job data + JobDataMap jobDataMap = jobDetail.getJobDataMap(); + String camelContextName = QuartzHelper.getQuartzContextName(camelContext); + LOG.debug("Adding camelContextName={}, endpointUri={} into job data map.", camelContextName, endpointUri); + jobDataMap.put(QuartzConstants.QUARTZ_CAMEL_CONTEXT_NAME, camelContextName); + jobDataMap.put(QuartzConstants.QUARTZ_ENDPOINT_URI, endpointUri); + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/ddf9e0be/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerJob.java ---------------------------------------------------------------------- diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerJob.java b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerJob.java index c2ddde0..afaaa7e 100644 --- a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerJob.java +++ b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerJob.java @@ -16,13 +16,16 @@ */ package org.apache.camel.pollconsumer.quartz2; -import org.quartz.Job; +import org.apache.camel.CamelContext; +import org.apache.camel.Consumer; +import org.apache.camel.Route; +import org.apache.camel.component.quartz2.CamelJob; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class QuartzScheduledPollConsumerJob implements Job { +public class QuartzScheduledPollConsumerJob extends CamelJob { private static final Logger LOG = LoggerFactory.getLogger(QuartzScheduledPollConsumerJob.class); @@ -30,10 +33,30 @@ public class QuartzScheduledPollConsumerJob implements Job { } @Override - public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { - LOG.trace("Execute job: {}", jobExecutionContext); + public void execute(JobExecutionContext context) throws JobExecutionException { + LOG.trace("Execute job: {}", context); + + CamelContext camelContext = getCamelContext(context); + + Runnable task = (Runnable) context.getJobDetail().getJobDataMap().get("task"); + + if (task == null) { + // if not task then use the route id to lookup the consumer to be used as the task + String routeId = (String) context.getJobDetail().getJobDataMap().get("routeId"); + if (routeId != null && camelContext != null) { + // find the consumer + for (Route route : camelContext.getRoutes()) { + if (route.getId().equals(routeId)) { + Consumer consumer = route.getConsumer(); + if (consumer instanceof Runnable) { + task = (Runnable) consumer; + break; + } + } + } + } + } - Runnable task = (Runnable) jobExecutionContext.getJobDetail().getJobDataMap().get("task"); if (task != null) { LOG.trace("Running task: {}", task); task.run(); http://git-wip-us.apache.org/repos/asf/camel/blob/ddf9e0be/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java ---------------------------------------------------------------------- diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java index 0e65438..6ddf670 100644 --- a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java +++ b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java @@ -20,8 +20,10 @@ import java.util.TimeZone; import org.apache.camel.CamelContext; import org.apache.camel.Consumer; +import org.apache.camel.Route; import org.apache.camel.component.quartz2.QuartzComponent; import org.apache.camel.component.quartz2.QuartzConstants; +import org.apache.camel.component.quartz2.QuartzHelper; import org.apache.camel.spi.ScheduledPollConsumerScheduler; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.ObjectHelper; @@ -45,6 +47,7 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme private static final Logger LOG = LoggerFactory.getLogger(QuartzScheduledPollConsumerScheduler.class); private Scheduler quartzScheduler; private CamelContext camelContext; + private String routeId; private Consumer consumer; private Runnable runnable; private String cron; @@ -57,6 +60,13 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme @Override public void onInit(Consumer consumer) { this.consumer = consumer; + // find the route of the consumer + for (Route route : consumer.getEndpoint().getCamelContext().getRoutes()) { + if (route.getConsumer() == consumer) { + this.routeId = route.getId(); + break; + } + } } @Override @@ -151,7 +161,12 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme } JobDataMap map = new JobDataMap(); - map.put("task", runnable); + // do not store task as its not serializable, if we have route id + if (routeId != null) { + map.put("routeId", routeId); + } else { + map.put("task", runnable); + } map.put(QuartzConstants.QUARTZ_TRIGGER_TYPE, "cron"); map.put(QuartzConstants.QUARTZ_TRIGGER_CRON_EXPRESSION, getCron()); map.put(QuartzConstants.QUARTZ_TRIGGER_CRON_TIMEZONE, getTimeZone().getID()); @@ -160,6 +175,9 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme .usingJobData(map) .build(); + // store additional information on job such as camel context etc + QuartzHelper.updateJobDataMap(getCamelContext(), job, null); + String id = triggerId; if (id == null) { id = "trigger-" + getCamelContext().getUuidGenerator().generateUuid(); @@ -185,4 +203,5 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme @Override protected void doShutdown() throws Exception { } + }