CAMEL-6461: Fixed camel-quartz with stateful jobs may change endpoint uri during redeployments. Use trigger to match instead of endpoint uri is safer. Thanks to Zemian Deng for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ff5cf8ba Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ff5cf8ba Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ff5cf8ba Branch: refs/heads/camel-2.10.x Commit: ff5cf8baf06e1bd22e43e6b1c9193a4067eba662 Parents: a9db81c Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Jul 22 17:31:07 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Jul 22 17:31:58 2013 +0200 ---------------------------------------------------------------------- .../apache/camel/component/quartz/CamelJob.java | 26 ++++++++++++-------- .../camel/component/quartz/QuartzComponent.java | 20 +++++++++------ 2 files changed, 29 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ff5cf8ba/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java ---------------------------------------------------------------------- diff --git a/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java b/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java index fcf59cd..acbcdf0 100644 --- a/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java +++ b/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java @@ -25,18 +25,17 @@ import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.quartz.SchedulerContext; import org.quartz.SchedulerException; +import org.quartz.Trigger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.camel.util.URISupport.normalizeUri; - /** * @version */ public class CamelJob implements Job, Serializable { private static final transient Logger LOG = LoggerFactory.getLogger(CamelJob.class); - private static final long serialVersionUID = 26L; + private static final long serialVersionUID = 27L; public void execute(JobExecutionContext context) throws JobExecutionException { String camelContextName = (String) context.getJobDetail().getJobDataMap().get(QuartzConstants.QUARTZ_CAMEL_CONTEXT_NAME); @@ -54,28 +53,35 @@ public class CamelJob implements Job, Serializable { throw new JobExecutionException("No CamelContext could be found with name: " + camelContextName); } - QuartzEndpoint endpoint = lookupQuartzEndpoint(camelContext, endpointUri); + Trigger trigger = context.getTrigger(); + QuartzEndpoint endpoint = lookupQuartzEndpoint(camelContext, endpointUri, trigger); if (endpoint == null) { - throw new JobExecutionException("No QuartzEndpoint could be found with uri: " + endpointUri); + throw new JobExecutionException("No QuartzEndpoint could be found with endpointUri: " + endpointUri); } endpoint.onJobExecute(context); } - private QuartzEndpoint lookupQuartzEndpoint(CamelContext camelContext, String endpointUri) throws JobExecutionException { - try { - String targetUri = normalizeUri(endpointUri); + private QuartzEndpoint lookupQuartzEndpoint(CamelContext camelContext, String endpointUri, Trigger trigger) throws JobExecutionException { + String targetTriggerName = trigger.getName(); + String targetTriggerGroup = trigger.getGroup(); + LOG.debug("Looking up existing QuartzEndpoint with trigger {}.{}", targetTriggerName, targetTriggerGroup); + try { // check all active routes for the quartz endpoint this task matches // as we prefer to use the existing endpoint from the routes for (Route route : camelContext.getRoutes()) { if (route.getEndpoint() instanceof QuartzEndpoint) { - if (normalizeUri(route.getEndpoint().getEndpointUri()).equals(targetUri)) { + QuartzEndpoint quartzEndpoint = (QuartzEndpoint) route.getEndpoint(); + String triggerName = quartzEndpoint.getTrigger().getName(); + String triggerGroup = quartzEndpoint.getTrigger().getGroup(); + LOG.trace("Checking route trigger {}.{}", triggerName, triggerGroup); + if (triggerName.equals(targetTriggerName) && triggerGroup.equals(targetTriggerGroup)) { return (QuartzEndpoint) route.getEndpoint(); } } } } catch (Exception e) { - throw new JobExecutionException("Error lookup up existing QuartzEndpoint with uri: " + endpointUri, e); + throw new JobExecutionException("Error lookup up existing QuartzEndpoint with trigger: " + trigger, e); } // fallback and lookup existing from registry (eg maybe a @Consume POJO with a quartz endpoint, and thus not from a route) http://git-wip-us.apache.org/repos/asf/camel/blob/ff5cf8ba/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java b/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java index 901d91b..5dcc34a 100644 --- a/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java +++ b/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java @@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory; * For a brief tutorial on setting cron expression see * <a href="http://www.opensymphony.com/quartz/wikidocs/CronTriggers%20Tutorial.html">Quartz cron tutorial</a>. * - * @version + * @version */ public class QuartzComponent extends DefaultComponent implements StartupListener { private static final transient Logger LOG = LoggerFactory.getLogger(QuartzComponent.class); @@ -234,19 +234,25 @@ public class QuartzComponent extends DefaultComponent implements StartupListener LOG.debug("Trigger: {}/{} already exists and will be updated by Quartz.", trigger.getGroup(), trigger.getName()); // fast forward start time to now, as we do not want any misfire to kick in trigger.setStartTime(new Date()); - // replace job, and relate trigger to previous job name, which is needed to reschedule job + + // To ensure trigger uses the same job (the job name might change!) we will remove old trigger then re-add. + scheduler.unscheduleJob(trigger.getName(), trigger.getGroup()); scheduler.addJob(job, true); - trigger.setJobName(existingTrigger.getJobName()); - scheduler.rescheduleJob(trigger.getName(), trigger.getGroup(), trigger); + trigger.setJobName(job.getName()); + trigger.setJobGroup(job.getGroup()); + scheduler.scheduleJob(trigger); } else { if (!isClustered()) { LOG.debug("Trigger: {}/{} already exists and will be resumed by Quartz.", trigger.getGroup(), trigger.getName()); // fast forward start time to now, as we do not want any misfire to kick in trigger.setStartTime(new Date()); - // replace job, and relate trigger to previous job name, which is needed to reschedule job + + // To ensure trigger uses the same job (the job name might change!) we will remove old trigger then re-add. + scheduler.unscheduleJob(trigger.getName(), trigger.getGroup()); scheduler.addJob(job, true); - trigger.setJobName(existingTrigger.getJobName()); - scheduler.rescheduleJob(trigger.getName(), trigger.getGroup(), trigger); + trigger.setJobName(job.getName()); + trigger.setJobGroup(job.getGroup()); + scheduler.scheduleJob(trigger); } else { LOG.debug("Trigger: {}/{} already exists and is already scheduled by clustered JobStore.", trigger.getGroup(), trigger.getName()); }