CAMEL-8076 Fixed the issue that CamelJob cannot find right QuartzEndpoint when recoverying the job
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ff59faef Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ff59faef Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ff59faef Branch: refs/heads/master Commit: ff59faefa559d6ffe22020b51b668b647ef01041 Parents: aa6e20a Author: Willem Jiang <willem.ji...@gmail.com> Authored: Wed Dec 3 21:43:21 2014 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Wed Dec 3 21:57:52 2014 +0800 ---------------------------------------------------------------------- .../camel/component/quartz2/CamelJob.java | 9 +++-- ...rtzConsumerTwoAppsClusteredRecoveryTest.java | 35 ++++++++++++++++---- ...ingQuartzConsumerRecoveryClusteredAppOne.xml | 6 ++-- 3 files changed, 38 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ff59faef/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java ---------------------------------------------------------------------- diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java index 407b93b..3e74abc 100644 --- a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java +++ b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java @@ -23,8 +23,10 @@ import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Route; import org.quartz.Job; +import org.quartz.JobDetail; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; +import org.quartz.JobKey; import org.quartz.SchedulerContext; import org.quartz.SchedulerException; import org.quartz.TriggerKey; @@ -91,10 +93,12 @@ public class CamelJob implements Job { protected QuartzEndpoint lookupQuartzEndpoint(CamelContext camelContext, JobExecutionContext quartzContext) throws JobExecutionException { TriggerKey triggerKey = quartzContext.getTrigger().getKey(); + JobDetail jobDetail = quartzContext.getJobDetail(); + JobKey jobKey = jobDetail.getKey(); if (LOG.isDebugEnabled()) { LOG.debug("Looking up existing QuartzEndpoint with triggerKey={}", triggerKey); } - + // check all active routes for the quartz endpoint this task matches // as we prefer to use the existing endpoint from the routes for (Route route : camelContext.getRoutes()) { @@ -108,7 +112,8 @@ public class CamelJob implements Job { if (LOG.isTraceEnabled()) { LOG.trace("Checking route endpoint={} with checkTriggerKey={}", quartzEndpoint, checkTriggerKey); } - if (triggerKey.equals(checkTriggerKey)) { + if (triggerKey.equals(checkTriggerKey) + || (jobDetail.requestsRecovery() && jobKey.getGroup().equals(checkTriggerKey.getGroup()) && jobKey.getName().equals(checkTriggerKey.getName()))) { return quartzEndpoint; } } http://git-wip-us.apache.org/repos/asf/camel/blob/ff59faef/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/SpringQuartzConsumerTwoAppsClusteredRecoveryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/SpringQuartzConsumerTwoAppsClusteredRecoveryTest.java b/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/SpringQuartzConsumerTwoAppsClusteredRecoveryTest.java index 4fe1ab9..9758097 100644 --- a/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/SpringQuartzConsumerTwoAppsClusteredRecoveryTest.java +++ b/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/SpringQuartzConsumerTwoAppsClusteredRecoveryTest.java @@ -19,10 +19,14 @@ package org.apache.camel.component.quartz2; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.Predicate; +import org.apache.camel.Processor; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.test.junit4.TestSupport; import org.apache.camel.util.IOHelper; import org.junit.Test; +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; import org.springframework.context.support.AbstractXmlApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; @@ -42,19 +46,20 @@ public class SpringQuartzConsumerTwoAppsClusteredRecoveryTest extends TestSuppor // now launch the first clustered app which will acquire the quartz database lock and become the master AbstractXmlApplicationContext app = new ClassPathXmlApplicationContext("org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppOne.xml"); app.start(); - - // as well as the second one which will run in slave mode as it will not be able to acquire the same lock - AbstractXmlApplicationContext app2 = new ClassPathXmlApplicationContext("org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppTwo.xml"); - app2.start(); - + // now let's simulate a crash of the first app (the quartz instance 'app-one') log.warn("The first app is going to crash NOW!"); - IOHelper.close(app); log.warn("Crashed..."); log.warn("Crashed..."); log.warn("Crashed..."); - + + Thread.sleep(2000); + + // as well as the second one which will run in slave mode as it will not be able to acquire the same lock + AbstractXmlApplicationContext app2 = new ClassPathXmlApplicationContext("org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppTwo.xml"); + app2.start(); + // wait long enough until the second app takes it over... Thread.sleep(20000); // inside the logs one can then clearly see how the route of the second app ('app-two') starts consuming: @@ -91,5 +96,21 @@ public class SpringQuartzConsumerTwoAppsClusteredRecoveryTest extends TestSuppor } } + + public static class MyProcessor implements Processor, ApplicationContextAware { + ApplicationContext applicationContext; + + @Override + public void process(Exchange exchange) throws Exception { + // shutdown the application context; + ((AbstractXmlApplicationContext)applicationContext).close(); + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + + } } http://git-wip-us.apache.org/repos/asf/camel/blob/ff59faef/components/camel-quartz2/src/test/resources/org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppOne.xml ---------------------------------------------------------------------- diff --git a/components/camel-quartz2/src/test/resources/org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppOne.xml b/components/camel-quartz2/src/test/resources/org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppOne.xml index a01b1c0..91c07b1 100644 --- a/components/camel-quartz2/src/test/resources/org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppOne.xml +++ b/components/camel-quartz2/src/test/resources/org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppOne.xml @@ -58,6 +58,8 @@ </props> </property> </bean> + + <bean id="myProcessor" class="org.apache.camel.component.quartz2.SpringQuartzConsumerTwoAppsClusteredRecoveryTest$MyProcessor" /> <camelContext id="camelContext" shutdownEager="false" xmlns="http://camel.apache.org/schema/spring"> <template id="template" /> @@ -67,9 +69,7 @@ <simple>clustering PINGS!</simple> </transform> <to uri="log:triggered" /> - <!--delay> - <constant>10000</constant> - </delay--> + <process ref="myProcessor"/> <to uri="mock:result" /> </route> </camelContext>