CAMEL-6631: Introduce ScheduledPollConsumerScheduler SPI to plugin different schedulers for poll consumer components such as file/ftp etc.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/38657b4b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/38657b4b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/38657b4b Branch: refs/heads/master Commit: 38657b4bff920b86082877e5f8894b4aafb61b97 Parents: de9ab76 Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Aug 13 12:57:18 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Aug 13 12:57:18 2013 +0200 ---------------------------------------------------------------------- .../camel/FailedToCreateConsumerException.java | 5 + .../camel/impl/ScheduledPollConsumer.java | 30 +++- .../camel/impl/ScheduledPollEndpoint.java | 7 +- .../file/FileConsumerCustomSchedulerTest.java | 137 +++++++++++++++++++ 4 files changed, 176 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/38657b4b/camel-core/src/main/java/org/apache/camel/FailedToCreateConsumerException.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/FailedToCreateConsumerException.java b/camel-core/src/main/java/org/apache/camel/FailedToCreateConsumerException.java index 821e7e2..b89ec82 100644 --- a/camel-core/src/main/java/org/apache/camel/FailedToCreateConsumerException.java +++ b/camel-core/src/main/java/org/apache/camel/FailedToCreateConsumerException.java @@ -41,6 +41,11 @@ public class FailedToCreateConsumerException extends RuntimeCamelException { this.uri = endpoint.getEndpointUri(); } + public FailedToCreateConsumerException(Endpoint endpoint, String message) { + super("Failed to create Consumer for endpoint: " + endpoint + ". Reason: " + message); + this.uri = endpoint.getEndpointUri(); + } + public String getUri() { return uri; } http://git-wip-us.apache.org/repos/asf/camel/blob/38657b4b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java index 599045a..2bb28b3 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java @@ -23,9 +23,11 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.FailedToCreateConsumerException; import org.apache.camel.LoggingLevel; import org.apache.camel.PollingConsumerPollingStrategy; import org.apache.camel.Processor; +import org.apache.camel.ResolveEndpointFailedException; import org.apache.camel.SuspendableService; import org.apache.camel.spi.PollingConsumerPollStrategy; import org.apache.camel.spi.ScheduledPollConsumerScheduler; @@ -67,6 +69,7 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R @UriParam private boolean greedy; private volatile boolean polling; + private Map<String, Object> schedulerProperties; public ScheduledPollConsumer(Endpoint endpoint, Processor processor) { super(endpoint, processor); @@ -238,13 +241,25 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R } /** - * Sets a cutom scheduler to use for scheduling running this task (poll). - * @param scheduler + * Sets a custom scheduler to use for scheduling running this task (poll). + * + * @param scheduler the custom scheduler */ public void setScheduler(ScheduledPollConsumerScheduler scheduler) { this.scheduler = scheduler; } + public Map<String, Object> getSchedulerProperties() { + return schedulerProperties; + } + + /** + * Additional properties to configure on the custom scheduler. + */ + public void setSchedulerProperties(Map<String, Object> schedulerProperties) { + this.schedulerProperties = schedulerProperties; + } + public long getInitialDelay() { return initialDelay; } @@ -389,6 +404,17 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R Map<String, Object> properties = new HashMap<String, Object>(); IntrospectionSupport.getProperties(this, properties, null); IntrospectionSupport.setProperties(getEndpoint().getCamelContext().getTypeConverter(), scheduler, properties); + if (schedulerProperties != null && !schedulerProperties.isEmpty()) { + // need to use a copy in case the consumer is restarted so we keep the properties + Map<String, Object> copy = new HashMap<String, Object>(schedulerProperties); + IntrospectionSupport.setProperties(getEndpoint().getCamelContext().getTypeConverter(), scheduler, copy); + if (copy.size() > 0) { + throw new FailedToCreateConsumerException(getEndpoint(), "There are " + copy.size() + + " scheduler parameters that couldn't be set on the endpoint." + + " Check the uri if the parameters are spelt correctly and that they are properties of the endpoint." + + " Unknown parameters=[" + copy + "]"); + } + } ObjectHelper.notNull(scheduler, "scheduler", this); ObjectHelper.notNull(pollStrategy, "pollStrategy", this); http://git-wip-us.apache.org/repos/asf/camel/blob/38657b4b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java index 4de9110..1d7e62f 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.camel.CamelContext; import org.apache.camel.Component; +import org.apache.camel.util.IntrospectionSupport; /** * A base class for {@link org.apache.camel.Endpoint} which creates a {@link ScheduledPollConsumer} @@ -54,6 +55,7 @@ public abstract class ScheduledPollEndpoint extends DefaultEndpoint { private void configureScheduledPollConsumerProperties(Map<String, Object> options, Map<String, Object> consumerProperties) { // special for scheduled poll consumers as we want to allow end users to configure its options // from the URI parameters without the consumer. prefix + Map<String, Object> schedulerProperties = IntrospectionSupport.extractProperties(options, "scheduler."); Object startScheduler = options.remove("startScheduler"); Object initialDelay = options.remove("initialDelay"); Object delay = options.remove("delay"); @@ -74,7 +76,7 @@ public abstract class ScheduledPollEndpoint extends DefaultEndpoint { if (runLoggingLevel != null || startScheduler != null || sendEmptyMessageWhenIdle != null || greedy != null || scheduledExecutorService != null) { setConsumerProperties = true; } - if (scheduler != null) { + if (scheduler != null || !schedulerProperties.isEmpty()) { setConsumerProperties = true; } @@ -116,6 +118,9 @@ public abstract class ScheduledPollEndpoint extends DefaultEndpoint { if (scheduler != null) { consumerProperties.put("scheduler", scheduler); } + if (!schedulerProperties.isEmpty()) { + consumerProperties.put("schedulerProperties", schedulerProperties); + } } } http://git-wip-us.apache.org/repos/asf/camel/blob/38657b4b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java new file mode 100644 index 0000000..dc99af6 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file; + +import java.util.Timer; +import java.util.TimerTask; + +import org.apache.camel.CamelContext; +import org.apache.camel.Consumer; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.spi.ScheduledPollConsumerScheduler; + +public class FileConsumerCustomSchedulerTest extends ContextTestSupport { + + private MyScheduler scheduler = new MyScheduler(); + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry jndi = super.createRegistry(); + jndi.bind("myScheduler", scheduler); + return jndi; + } + + @Override + protected void setUp() throws Exception { + deleteDirectory("target/file/customer"); + super.setUp(); + } + + public void testCustomScheduler() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(1); + + template.sendBodyAndHeader("file:target/file/custom", "Hello World", Exchange.FILE_NAME, "hello.txt"); + + context.startRoute("foo"); + + assertMockEndpointsSatisfied(); + + // the scheduler is only run once, and we can configure its properties + assertEquals(1, scheduler.getCounter()); + assertEquals("bar", scheduler.getFoo()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("file:target/file/custom?scheduler=#myScheduler&scheduler.foo=bar").routeId("foo").noAutoStartup() + .to("mock:result"); + } + }; + } + + private static final class MyScheduler implements ScheduledPollConsumerScheduler { + + private CamelContext camelContext; + private Timer timer; + private TimerTask timerTask; + private volatile int counter; + private String foo; + + @Override + public void scheduleTask(final Consumer consumer, final Runnable task) { + this.timerTask = new TimerTask() { + @Override + public void run() { + counter++; + task.run(); + } + }; + } + + public int getCounter() { + return counter; + } + + public String getFoo() { + return foo; + } + + public void setFoo(String foo) { + this.foo = foo; + } + + @Override + public void startScheduler() { + timer = new Timer(); + timer.schedule(timerTask, 100); + } + + @Override + public boolean isSchedulerStarted() { + return true; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void shutdown() throws Exception { + timerTask.cancel(); + } + + @Override + public void start() throws Exception { + } + + @Override + public void stop() throws Exception { + } + } +}