Updated Branches: refs/heads/master 64f5a86a1 -> d9eb98ca8
CAMEL-6631: Introduce ScheduledPollConsumerScheduler SPI to plugin different schedulers for poll consumer components such as file/ftp etc. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6afe8575 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6afe8575 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6afe8575 Branch: refs/heads/master Commit: 6afe8575d495351290bd4859f1552c404dd11dd0 Parents: 64f5a86 Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Aug 13 12:30:49 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Aug 13 12:30:49 2013 +0200 ---------------------------------------------------------------------- .../DefaultScheduledPollConsumerScheduler.java | 158 +++++++++++++++++++ .../camel/impl/ScheduledPollConsumer.java | 72 ++++----- .../camel/impl/ScheduledPollEndpoint.java | 7 + .../spi/ScheduledPollConsumerScheduler.java | 56 +++++++ .../LimitedPollingConsumerPollStrategyTest.java | 21 ++- .../camel/impl/MockScheduledPollConsumer.java | 5 +- .../camel/impl/ScheduledPollConsumerTest.java | 9 +- 7 files changed, 277 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/6afe8575/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java new file mode 100644 index 0000000..28d1cc0 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import java.util.Locale; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.Consumer; +import org.apache.camel.spi.ScheduledPollConsumerScheduler; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultScheduledPollConsumerScheduler extends org.apache.camel.support.ServiceSupport implements ScheduledPollConsumerScheduler { + + private static final transient Logger LOG = LoggerFactory.getLogger(DefaultScheduledPollConsumerScheduler.class); + private CamelContext camelContext; + private Consumer consumer; + private ScheduledExecutorService scheduledExecutorService; + private boolean shutdownExecutor; + private volatile ScheduledFuture<?> future; + private Runnable task; + + private long initialDelay = 1000; + private long delay = 500; + private TimeUnit timeUnit = TimeUnit.MILLISECONDS; + private boolean useFixedDelay = true; + + public CamelContext getCamelContext() { + return camelContext; + } + + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + public long getInitialDelay() { + return initialDelay; + } + + public void setInitialDelay(long initialDelay) { + this.initialDelay = initialDelay; + } + + public long getDelay() { + return delay; + } + + public void setDelay(long delay) { + this.delay = delay; + } + + public TimeUnit getTimeUnit() { + return timeUnit; + } + + public void setTimeUnit(TimeUnit timeUnit) { + this.timeUnit = timeUnit; + } + + public boolean isUseFixedDelay() { + return useFixedDelay; + } + + public void setUseFixedDelay(boolean useFixedDelay) { + this.useFixedDelay = useFixedDelay; + } + + public ScheduledExecutorService getScheduledExecutorService() { + return scheduledExecutorService; + } + + public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { + this.scheduledExecutorService = scheduledExecutorService; + } + + @Override + public void scheduleTask(Consumer consumer, Runnable task) { + this.consumer = consumer; + this.task = task; + } + + @Override + public void startScheduler() { + // only schedule task if we have not already done that + if (future == null) { + if (isUseFixedDelay()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Scheduling poll (fixed delay) with initialDelay: {}, delay: {} ({}) for: {}", + new Object[]{getInitialDelay(), getDelay(), getTimeUnit().name().toLowerCase(Locale.ENGLISH), consumer.getEndpoint()}); + } + future = scheduledExecutorService.scheduleWithFixedDelay(task, getInitialDelay(), getDelay(), getTimeUnit()); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Scheduling poll (fixed rate) with initialDelay: {}, delay: {} ({}) for: {}", + new Object[]{getInitialDelay(), getDelay(), getTimeUnit().name().toLowerCase(Locale.ENGLISH), consumer.getEndpoint()}); + } + future = scheduledExecutorService.scheduleAtFixedRate(task, getInitialDelay(), getDelay(), getTimeUnit()); + } + } + } + + @Override + public boolean isSchedulerStarted() { + return future != null; + } + + @Override + protected void doStart() throws Exception { + ObjectHelper.notNull(consumer, "Consumer", this); + ObjectHelper.notNull(camelContext, "CamelContext", this); + ObjectHelper.notNull(task, "Task", this); + + // if no existing executor provided, then create a new thread pool ourselves + if (scheduledExecutorService == null) { + // we only need one thread in the pool to schedule this task + this.scheduledExecutorService = getCamelContext().getExecutorServiceManager() + .newScheduledThreadPool(this, consumer.getEndpoint().getEndpointUri(), 1); + // and we should shutdown the thread pool when no longer needed + this.shutdownExecutor = true; + } + } + + @Override + protected void doStop() throws Exception { + if (future != null) { + LOG.debug("This consumer is stopping, so cancelling scheduled task: " + future); + future.cancel(false); + future = null; + } + } + + @Override + protected void doShutdown() throws Exception { + if (shutdownExecutor && scheduledExecutorService != null) { + getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutorService); + scheduledExecutorService = null; + future = null; + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/6afe8575/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java index 739c692..599045a 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java @@ -16,9 +16,9 @@ */ package org.apache.camel.impl; -import java.util.Locale; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.camel.Endpoint; @@ -28,7 +28,9 @@ import org.apache.camel.PollingConsumerPollingStrategy; import org.apache.camel.Processor; import org.apache.camel.SuspendableService; import org.apache.camel.spi.PollingConsumerPollStrategy; +import org.apache.camel.spi.ScheduledPollConsumerScheduler; import org.apache.camel.spi.UriParam; +import org.apache.camel.util.IntrospectionSupport; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ServiceHelper; import org.slf4j.Logger; @@ -42,9 +44,8 @@ import org.slf4j.LoggerFactory; public abstract class ScheduledPollConsumer extends DefaultConsumer implements Runnable, SuspendableService, PollingConsumerPollingStrategy { private static final transient Logger LOG = LoggerFactory.getLogger(ScheduledPollConsumer.class); + private ScheduledPollConsumerScheduler scheduler; private ScheduledExecutorService scheduledExecutorService; - private boolean shutdownExecutor; - private volatile ScheduledFuture<?> future; // if adding more options then align with ScheduledPollEndpoint#configureScheduledPollConsumerProperties @UriParam @@ -232,6 +233,18 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R return polling; } + public ScheduledPollConsumerScheduler getScheduler() { + return scheduler; + } + + /** + * Sets a cutom scheduler to use for scheduling running this task (poll). + * @param scheduler + */ + public void setScheduler(ScheduledPollConsumerScheduler scheduler) { + this.scheduler = scheduler; + } + public long getInitialDelay() { return initialDelay; } @@ -335,7 +348,7 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R * @return <tt>true</tt> if started, <tt>false</tt> if not. */ public boolean isSchedulerStarted() { - return future != null; + return scheduler.isSchedulerStarted(); } /** @@ -366,18 +379,22 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R protected void doStart() throws Exception { super.doStart(); - // if no existing executor provided, then create a new thread pool ourselves - if (scheduledExecutorService == null) { - // we only need one thread in the pool to schedule this task - this.scheduledExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager() - .newScheduledThreadPool(this, getEndpoint().getEndpointUri(), 1); - // and we should shutdown the thread pool when no longer needed - this.shutdownExecutor = true; + if (scheduler == null) { + scheduler = new DefaultScheduledPollConsumerScheduler(); } + scheduler.setCamelContext(getEndpoint().getCamelContext()); + scheduler.scheduleTask(this, this); - ObjectHelper.notNull(scheduledExecutorService, "scheduledExecutorService", this); + // configure scheduler with options from this consumer + Map<String, Object> properties = new HashMap<String, Object>(); + IntrospectionSupport.getProperties(this, properties, null); + IntrospectionSupport.setProperties(getEndpoint().getCamelContext().getTypeConverter(), scheduler, properties); + + ObjectHelper.notNull(scheduler, "scheduler", this); ObjectHelper.notNull(pollStrategy, "pollStrategy", this); + ServiceHelper.startService(scheduler); + if (isStartScheduler()) { startScheduler(); } @@ -389,41 +406,18 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R * If the scheduler is already started, then this is a noop method call. */ public void startScheduler() { - // only schedule task if we have not already done that - if (future == null) { - if (isUseFixedDelay()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Scheduling poll (fixed delay) with initialDelay: {}, delay: {} ({}) for: {}", - new Object[]{getInitialDelay(), getDelay(), getTimeUnit().name().toLowerCase(Locale.ENGLISH), getEndpoint()}); - } - future = scheduledExecutorService.scheduleWithFixedDelay(this, getInitialDelay(), getDelay(), getTimeUnit()); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Scheduling poll (fixed rate) with initialDelay: {}, delay: {} ({}) for: {}", - new Object[]{getInitialDelay(), getDelay(), getTimeUnit().name().toLowerCase(Locale.ENGLISH), getEndpoint()}); - } - future = scheduledExecutorService.scheduleAtFixedRate(this, getInitialDelay(), getDelay(), getTimeUnit()); - } - } + scheduler.startScheduler(); } @Override protected void doStop() throws Exception { - if (future != null) { - LOG.debug("This consumer is stopping, so cancelling scheduled task: " + future); - future.cancel(false); - future = null; - } + ServiceHelper.stopService(scheduler); super.doStop(); } @Override protected void doShutdown() throws Exception { - if (shutdownExecutor && scheduledExecutorService != null) { - getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutorService); - scheduledExecutorService = null; - future = null; - } + ServiceHelper.stopAndShutdownServices(scheduler); super.doShutdown(); } http://git-wip-us.apache.org/repos/asf/camel/blob/6afe8575/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java index b3a0ed6..4de9110 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java @@ -64,6 +64,7 @@ public abstract class ScheduledPollEndpoint extends DefaultEndpoint { Object sendEmptyMessageWhenIdle = options.remove("sendEmptyMessageWhenIdle"); Object greedy = options.remove("greedy"); Object scheduledExecutorService = options.remove("scheduledExecutorService"); + Object scheduler = options.remove("scheduler"); boolean setConsumerProperties = false; // the following is split into two if statements to satisfy the checkstyle max complexity constraint @@ -73,6 +74,9 @@ public abstract class ScheduledPollEndpoint extends DefaultEndpoint { if (runLoggingLevel != null || startScheduler != null || sendEmptyMessageWhenIdle != null || greedy != null || scheduledExecutorService != null) { setConsumerProperties = true; } + if (scheduler != null) { + setConsumerProperties = true; + } if (setConsumerProperties) { @@ -109,6 +113,9 @@ public abstract class ScheduledPollEndpoint extends DefaultEndpoint { if (scheduledExecutorService != null) { consumerProperties.put("scheduledExecutorService", scheduledExecutorService); } + if (scheduler != null) { + consumerProperties.put("scheduler", scheduler); + } } } http://git-wip-us.apache.org/repos/asf/camel/blob/6afe8575/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java b/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java new file mode 100644 index 0000000..fdf99d3 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +import org.apache.camel.CamelContextAware; +import org.apache.camel.Consumer; +import org.apache.camel.ShutdownableService; + +/** + * A pluggable scheduler for {@link org.apache.camel.impl.ScheduledPollConsumer} consumers. + * <p/> + * The default implementation {@link org.apache.camel.impl.DefaultScheduledPollConsumerScheduler} is + * using the {@link java.util.concurrent.ScheduledExecutorService} from the JDK to schedule and run the poll task. + * <p/> + * An alternative implementation is in <tt>camel-quartz</tt> component that allows to use CRON expression + * to define when the scheduler should run. + */ +public interface ScheduledPollConsumerScheduler extends ShutdownableService, CamelContextAware { + + /** + * Schedules the task to run. + * + * @param consumer the consumer. + * @param task the task to run. + */ + void scheduleTask(Consumer consumer, Runnable task); + + /** + * Starts the scheduler. + * <p/> + * If the scheduler is already started, then this is a noop method call. + */ + void startScheduler(); + + /** + * Whether the scheduler has been started. + * + * @return <tt>true</tt> if started, <tt>false</tt> otherwise. + */ + boolean isSchedulerStarted(); + +} http://git-wip-us.apache.org/repos/asf/camel/blob/6afe8575/camel-core/src/test/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategyTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategyTest.java b/camel-core/src/test/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategyTest.java index ad0362c..3568385 100644 --- a/camel-core/src/test/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategyTest.java +++ b/camel-core/src/test/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategyTest.java @@ -17,6 +17,7 @@ package org.apache.camel.impl; import org.apache.camel.ContextTestSupport; +import org.apache.camel.Endpoint; import org.apache.camel.util.ServiceHelper; public class LimitedPollingConsumerPollStrategyTest extends ContextTestSupport { @@ -29,7 +30,8 @@ public class LimitedPollingConsumerPollStrategyTest extends ContextTestSupport { strategy = new LimitedPollingConsumerPollStrategy(); strategy.setLimit(3); - MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(expectedException); + final Endpoint endpoint = getMockEndpoint("mock:foo"); + MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(endpoint, expectedException); consumer.setPollStrategy(strategy); consumer.start(); @@ -50,7 +52,8 @@ public class LimitedPollingConsumerPollStrategyTest extends ContextTestSupport { strategy = new LimitedPollingConsumerPollStrategy(); strategy.setLimit(2); - MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(expectedException); + final Endpoint endpoint = getMockEndpoint("mock:foo"); + MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(endpoint, expectedException); consumer.setPollStrategy(strategy); consumer.start(); @@ -69,7 +72,8 @@ public class LimitedPollingConsumerPollStrategyTest extends ContextTestSupport { strategy = new LimitedPollingConsumerPollStrategy(); strategy.setLimit(3); - MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(expectedException); + final Endpoint endpoint = getMockEndpoint("mock:foo"); + MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(endpoint, expectedException); consumer.setPollStrategy(strategy); consumer.start(); @@ -95,7 +99,8 @@ public class LimitedPollingConsumerPollStrategyTest extends ContextTestSupport { strategy = new LimitedPollingConsumerPollStrategy(); strategy.setLimit(3); - MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(expectedException); + final Endpoint endpoint = getMockEndpoint("mock:foo"); + MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(endpoint, expectedException); consumer.setPollStrategy(strategy); consumer.start(); @@ -131,10 +136,11 @@ public class LimitedPollingConsumerPollStrategyTest extends ContextTestSupport { strategy = new LimitedPollingConsumerPollStrategy(); strategy.setLimit(3); - MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(expectedException); + final Endpoint endpoint = getMockEndpoint("mock:foo"); + MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(endpoint, expectedException); consumer.setPollStrategy(strategy); - MockScheduledPollConsumer consumer2 = new MockScheduledPollConsumer(null); + MockScheduledPollConsumer consumer2 = new MockScheduledPollConsumer(endpoint, null); consumer2.setPollStrategy(strategy); consumer.start(); @@ -163,7 +169,8 @@ public class LimitedPollingConsumerPollStrategyTest extends ContextTestSupport { strategy = new LimitedPollingConsumerPollStrategy(); strategy.setLimit(3); - MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(expectedException); + final Endpoint endpoint = getMockEndpoint("mock:foo"); + MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(endpoint, expectedException); consumer.setPollStrategy(strategy); consumer.start(); http://git-wip-us.apache.org/repos/asf/camel/blob/6afe8575/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java b/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java index 6a80019..be48d09 100644 --- a/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java +++ b/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java @@ -18,6 +18,7 @@ package org.apache.camel.impl; import java.util.concurrent.ScheduledThreadPoolExecutor; +import org.apache.camel.Endpoint; import org.apache.camel.Processor; public class MockScheduledPollConsumer extends ScheduledPollConsumer { @@ -29,8 +30,8 @@ public class MockScheduledPollConsumer extends ScheduledPollConsumer { } // dummy constructor here - we just want to test the run() method, which calls poll() - public MockScheduledPollConsumer(Exception exceptionToThrowOnPoll) { - super(null, null, new ScheduledThreadPoolExecutor(1)); + public MockScheduledPollConsumer(Endpoint endpoint, Exception exceptionToThrowOnPoll) { + super(endpoint, null, new ScheduledThreadPoolExecutor(1)); this.exceptionToThrowOnPoll = exceptionToThrowOnPoll; } http://git-wip-us.apache.org/repos/asf/camel/blob/6afe8575/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java b/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java index ba7f2e2..7574069 100644 --- a/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java +++ b/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java @@ -30,7 +30,8 @@ public class ScheduledPollConsumerTest extends ContextTestSupport { public void testExceptionOnPollAndCanStartAgain() throws Exception { final Exception expectedException = new Exception("Hello, I should be thrown on shutdown only!"); - MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(expectedException); + final Endpoint endpoint = getMockEndpoint("mock:foo"); + MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(endpoint, expectedException); consumer.setPollStrategy(new PollingConsumerPollStrategy() { public boolean begin(Consumer consumer, Endpoint endpoint) { @@ -72,7 +73,8 @@ public class ScheduledPollConsumerTest extends ContextTestSupport { event = ""; final Exception expectedException = new Exception("Hello, I should be thrown on shutdown only!"); - MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(expectedException); + final Endpoint endpoint = getMockEndpoint("mock:foo"); + MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(endpoint, expectedException); consumer.setPollStrategy(new PollingConsumerPollStrategy() { public boolean begin(Consumer consumer, Endpoint endpoint) { @@ -106,7 +108,8 @@ public class ScheduledPollConsumerTest extends ContextTestSupport { } public void testNoExceptionOnPoll() throws Exception { - MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(null); + final Endpoint endpoint = getMockEndpoint("mock:foo"); + MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(endpoint, null); consumer.start(); consumer.run(); consumer.stop();