patch to support dynamic job execution based on message headers for sprinbatch component https://issues.apache.org/jira/browse/CAMEL-9733
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d1c6ddcf Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d1c6ddcf Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d1c6ddcf Branch: refs/heads/master Commit: d1c6ddcfa993344bf41028f7e95787257b90a246 Parents: 7275b33 Author: Joseluis Pedrosa <joseluis.pedr...@elephanttalk.com> Authored: Thu Apr 28 16:11:21 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat May 28 08:48:19 2016 +0200 ---------------------------------------------------------------------- .../spring/batch/SpringBatchComponent.java | 1 + .../spring/batch/SpringBatchEndpoint.java | 6 +- .../spring/batch/SpringBatchProducer.java | 29 +++++++++- .../spring/batch/SpringBatchEndpointTest.java | 61 ++++++++++++++++++++ 4 files changed, 94 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d1c6ddcf/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchComponent.java b/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchComponent.java index 9f7d10c..bd1fbf9 100644 --- a/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchComponent.java +++ b/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchComponent.java @@ -25,6 +25,7 @@ import org.springframework.batch.core.launch.JobLauncher; public class SpringBatchComponent extends UriEndpointComponent { private static final String DEFAULT_JOB_LAUNCHER_REF_NAME = "jobLauncher"; + public static final String DYNAMIC_JOBNAME = "DYNAMIC_JOBNAME_HEADER"; private JobLauncher jobLauncher; private JobLauncher defaultResolvedJobLauncher; http://git-wip-us.apache.org/repos/asf/camel/blob/d1c6ddcf/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchEndpoint.java b/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchEndpoint.java index 569e089..72df88d 100644 --- a/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchEndpoint.java +++ b/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchEndpoint.java @@ -37,7 +37,8 @@ import org.springframework.batch.core.launch.JobLauncher; @UriEndpoint(scheme = "spring-batch", title = "Spring Batch", syntax = "spring-batch:jobName", producerOnly = true, label = "spring,batch,scheduling") public class SpringBatchEndpoint extends DefaultEndpoint { - @UriPath @Metadata(required = "true") + @UriPath() + @Metadata(required = "true") private String jobName; /** @@ -46,6 +47,7 @@ public class SpringBatchEndpoint extends DefaultEndpoint { */ @Deprecated private String jobLauncherRef; + @UriParam private JobLauncher jobLauncher; @@ -83,7 +85,7 @@ public class SpringBatchEndpoint extends DefaultEndpoint { if (jobLauncher == null) { jobLauncher = resolveJobLauncher(); } - if (job == null && jobName != null) { + if (job == null && jobName != null && jobName.compareTo("dynamic")!=0) { job = CamelContextHelper.mandatoryLookup(getCamelContext(), jobName, Job.class); } } http://git-wip-us.apache.org/repos/asf/camel/blob/d1c6ddcf/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchProducer.java b/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchProducer.java index 4fc5395..a192193 100644 --- a/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchProducer.java +++ b/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchProducer.java @@ -19,8 +19,10 @@ package org.apache.camel.component.spring.batch; import java.util.Date; import java.util.Map; +import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.util.CamelContextHelper; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; @@ -44,8 +46,33 @@ public class SpringBatchProducer extends DefaultProducer { @Override public void process(Exchange exchange) throws Exception { + JobParameters jobParameters = prepareJobParameters(exchange.getIn().getHeaders()); - JobExecution jobExecution = jobLauncher.run(job, jobParameters); + String messageJobName = jobParameters.getString(SpringBatchComponent.DYNAMIC_JOBNAME); + + Job job2run = this.job; + + if (messageJobName != null) + { + Job dynamicJob = CamelContextHelper.mandatoryLookup(getEndpoint().getCamelContext(), messageJobName, Job.class); + + job2run = dynamicJob; + + if (job2run == null) + { + exchange.setException(new CamelExchangeException("Found header " + SpringBatchComponent.DYNAMIC_JOBNAME + + " with value " +messageJobName + " but could not find a Job in camel context", exchange)); + return; + } + } + + if (job2run == null) { + exchange.setException( new CamelExchangeException("jobName was not specified in the endpoint construction " + + " and header "+ SpringBatchComponent.DYNAMIC_JOBNAME + " could not be found", exchange)); + return; + } + + JobExecution jobExecution = jobLauncher.run(job2run, jobParameters); exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders()); exchange.getOut().setBody(jobExecution); } http://git-wip-us.apache.org/repos/asf/camel/blob/d1c6ddcf/components/camel-spring-batch/src/test/java/org/apache/camel/component/spring/batch/SpringBatchEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-spring-batch/src/test/java/org/apache/camel/component/spring/batch/SpringBatchEndpointTest.java b/components/camel-spring-batch/src/test/java/org/apache/camel/component/spring/batch/SpringBatchEndpointTest.java index cb962ab..00e6c52 100644 --- a/components/camel-spring-batch/src/test/java/org/apache/camel/component/spring/batch/SpringBatchEndpointTest.java +++ b/components/camel-spring-batch/src/test/java/org/apache/camel/component/spring/batch/SpringBatchEndpointTest.java @@ -16,9 +16,13 @@ */ package org.apache.camel.component.spring.batch; +import java.time.Instant; import java.util.Date; +import java.util.HashMap; +import java.util.Map; import org.apache.camel.CamelContext; +import org.apache.camel.CamelExchangeException; import org.apache.camel.EndpointInject; import org.apache.camel.FailedToCreateRouteException; import org.apache.camel.builder.RouteBuilder; @@ -55,32 +59,89 @@ public class SpringBatchEndpointTest extends CamelTestSupport { @Mock Job job; + @Mock + Job dynamicMockjob; + // Camel fixtures @EndpointInject(uri = "mock:test") MockEndpoint mockEndpoint; + @EndpointInject(uri = "mock:error") + MockEndpoint errorEndpoint; + + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start").to("spring-batch:mockJob").to("mock:test"); + from("direct:dynamic"). + to("spring-batch:dynamic"). + errorHandler(deadLetterChannel("mock:error")). + to("mock:test"); } }; } + + @Override public JndiRegistry createRegistry() throws Exception { JndiRegistry registry = super.createRegistry(); registry.bind("jobLauncher", jobLauncher); registry.bind("alternativeJobLauncher", alternativeJobLauncher); registry.bind("mockJob", job); + registry.bind("dynamicMockjob", dynamicMockjob); return registry; } // Tests + @Test + public void dynamicJobFailsIfHeaderNotPressent() throws Exception { + + mockEndpoint.expectedMessageCount(0); + errorEndpoint.expectedMessageCount(1); + + //dynamic job should fail as header is not present and the job is dynamic + sendBody("direct:dyanmic", "Start the job, please."); + mockEndpoint.assertIsSatisfied(); + mockEndpoint.assertIsSatisfied(); + } + + @Test + public void dynamicJobWorksIfHeaderWithInvalidJobName() throws Exception { + + mockEndpoint.expectedMessageCount(0); + errorEndpoint.expectedMessageCount(1); + + //dynamic job should fail as header is present but the job does not exists + header(SpringBatchComponent.DYNAMIC_JOBNAME).append("thisJobDoesNotExsistAtAll" +Date.from(Instant.now())); + sendBody("direct:dyanmic", "Start the job, please."); + + mockEndpoint.assertIsSatisfied(); + mockEndpoint.assertIsSatisfied(); + } + + @Test + public void dynamicJobWorksIfHeaderPressentWithvalidJob() throws Exception { + + mockEndpoint.expectedMessageCount(1); + errorEndpoint.expectedMessageCount(0); + Thread.sleep(5000); + //dynamic job work if header is present and the job exists + final Map<String, Object> headers = new HashMap<>(); + headers.put(SpringBatchComponent.DYNAMIC_JOBNAME, "dynamicMockjob"); + + sendBody("direct:dynamic", "Start the job, please.", headers); + + mockEndpoint.assertIsSatisfied(); + errorEndpoint.assertIsSatisfied(); + } + + @Test public void shouldInjectJobToEndpoint() throws IllegalAccessException { SpringBatchEndpoint batchEndpoint = getMandatoryEndpoint("spring-batch:mockJob", SpringBatchEndpoint.class);