Updated Branches:
  refs/heads/master 64f5a86a1 -> d9eb98ca8

CAMEL-6631: Introduce ScheduledPollConsumerScheduler SPI to plugin different 
schedulers for poll consumer components such as file/ftp etc.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6afe8575
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6afe8575
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6afe8575

Branch: refs/heads/master
Commit: 6afe8575d495351290bd4859f1552c404dd11dd0
Parents: 64f5a86
Author: Claus Ibsen <davscl...@apache.org>
Authored: Tue Aug 13 12:30:49 2013 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Tue Aug 13 12:30:49 2013 +0200

----------------------------------------------------------------------
 .../DefaultScheduledPollConsumerScheduler.java  | 158 +++++++++++++++++++
 .../camel/impl/ScheduledPollConsumer.java       |  72 ++++-----
 .../camel/impl/ScheduledPollEndpoint.java       |   7 +
 .../spi/ScheduledPollConsumerScheduler.java     |  56 +++++++
 .../LimitedPollingConsumerPollStrategyTest.java |  21 ++-
 .../camel/impl/MockScheduledPollConsumer.java   |   5 +-
 .../camel/impl/ScheduledPollConsumerTest.java   |   9 +-
 7 files changed, 277 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/6afe8575/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
new file mode 100644
index 0000000..28d1cc0
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.Locale;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.spi.ScheduledPollConsumerScheduler;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultScheduledPollConsumerScheduler extends 
org.apache.camel.support.ServiceSupport implements 
ScheduledPollConsumerScheduler {
+
+    private static final transient Logger LOG = 
LoggerFactory.getLogger(DefaultScheduledPollConsumerScheduler.class);
+    private CamelContext camelContext;
+    private Consumer consumer;
+    private ScheduledExecutorService scheduledExecutorService;
+    private boolean shutdownExecutor;
+    private volatile ScheduledFuture<?> future;
+    private Runnable task;
+
+    private long initialDelay = 1000;
+    private long delay = 500;
+    private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
+    private boolean useFixedDelay = true;
+
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    public long getInitialDelay() {
+        return initialDelay;
+    }
+
+    public void setInitialDelay(long initialDelay) {
+        this.initialDelay = initialDelay;
+    }
+
+    public long getDelay() {
+        return delay;
+    }
+
+    public void setDelay(long delay) {
+        this.delay = delay;
+    }
+
+    public TimeUnit getTimeUnit() {
+        return timeUnit;
+    }
+
+    public void setTimeUnit(TimeUnit timeUnit) {
+        this.timeUnit = timeUnit;
+    }
+
+    public boolean isUseFixedDelay() {
+        return useFixedDelay;
+    }
+
+    public void setUseFixedDelay(boolean useFixedDelay) {
+        this.useFixedDelay = useFixedDelay;
+    }
+
+    public ScheduledExecutorService getScheduledExecutorService() {
+        return scheduledExecutorService;
+    }
+
+    public void setScheduledExecutorService(ScheduledExecutorService 
scheduledExecutorService) {
+        this.scheduledExecutorService = scheduledExecutorService;
+    }
+
+    @Override
+    public void scheduleTask(Consumer consumer, Runnable task) {
+        this.consumer = consumer;
+        this.task = task;
+    }
+
+    @Override
+    public void startScheduler() {
+        // only schedule task if we have not already done that
+        if (future == null) {
+            if (isUseFixedDelay()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Scheduling poll (fixed delay) with 
initialDelay: {}, delay: {} ({}) for: {}",
+                            new Object[]{getInitialDelay(), getDelay(), 
getTimeUnit().name().toLowerCase(Locale.ENGLISH), consumer.getEndpoint()});
+                }
+                future = scheduledExecutorService.scheduleWithFixedDelay(task, 
getInitialDelay(), getDelay(), getTimeUnit());
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Scheduling poll (fixed rate) with initialDelay: 
{}, delay: {} ({}) for: {}",
+                            new Object[]{getInitialDelay(), getDelay(), 
getTimeUnit().name().toLowerCase(Locale.ENGLISH), consumer.getEndpoint()});
+                }
+                future = scheduledExecutorService.scheduleAtFixedRate(task, 
getInitialDelay(), getDelay(), getTimeUnit());
+            }
+        }
+    }
+
+    @Override
+    public boolean isSchedulerStarted() {
+        return future != null;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        ObjectHelper.notNull(consumer, "Consumer", this);
+        ObjectHelper.notNull(camelContext, "CamelContext", this);
+        ObjectHelper.notNull(task, "Task", this);
+
+        // if no existing executor provided, then create a new thread pool 
ourselves
+        if (scheduledExecutorService == null) {
+            // we only need one thread in the pool to schedule this task
+            this.scheduledExecutorService = 
getCamelContext().getExecutorServiceManager()
+                    .newScheduledThreadPool(this, 
consumer.getEndpoint().getEndpointUri(), 1);
+            // and we should shutdown the thread pool when no longer needed
+            this.shutdownExecutor = true;
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (future != null) {
+            LOG.debug("This consumer is stopping, so cancelling scheduled 
task: " + future);
+            future.cancel(false);
+            future = null;
+        }
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        if (shutdownExecutor && scheduledExecutorService != null) {
+            
getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutorService);
+            scheduledExecutorService = null;
+            future = null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6afe8575/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java 
b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
index 739c692..599045a 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
@@ -16,9 +16,9 @@
  */
 package org.apache.camel.impl;
 
-import java.util.Locale;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Endpoint;
@@ -28,7 +28,9 @@ import org.apache.camel.PollingConsumerPollingStrategy;
 import org.apache.camel.Processor;
 import org.apache.camel.SuspendableService;
 import org.apache.camel.spi.PollingConsumerPollStrategy;
+import org.apache.camel.spi.ScheduledPollConsumerScheduler;
 import org.apache.camel.spi.UriParam;
+import org.apache.camel.util.IntrospectionSupport;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
 import org.slf4j.Logger;
@@ -42,9 +44,8 @@ import org.slf4j.LoggerFactory;
 public abstract class ScheduledPollConsumer extends DefaultConsumer implements 
Runnable, SuspendableService, PollingConsumerPollingStrategy {
     private static final transient Logger LOG = 
LoggerFactory.getLogger(ScheduledPollConsumer.class);
 
+    private ScheduledPollConsumerScheduler scheduler;
     private ScheduledExecutorService scheduledExecutorService;
-    private boolean shutdownExecutor;
-    private volatile ScheduledFuture<?> future;
 
     // if adding more options then align with 
ScheduledPollEndpoint#configureScheduledPollConsumerProperties
     @UriParam
@@ -232,6 +233,18 @@ public abstract class ScheduledPollConsumer extends 
DefaultConsumer implements R
         return polling;
     }
 
+    public ScheduledPollConsumerScheduler getScheduler() {
+        return scheduler;
+    }
+
+    /**
+     * Sets a cutom scheduler to use for scheduling running this task (poll).
+     * @param scheduler
+     */
+    public void setScheduler(ScheduledPollConsumerScheduler scheduler) {
+        this.scheduler = scheduler;
+    }
+
     public long getInitialDelay() {
         return initialDelay;
     }
@@ -335,7 +348,7 @@ public abstract class ScheduledPollConsumer extends 
DefaultConsumer implements R
      * @return <tt>true</tt> if started, <tt>false</tt> if not.
      */
     public boolean isSchedulerStarted() {
-        return future != null;
+        return scheduler.isSchedulerStarted();
     }
 
     /**
@@ -366,18 +379,22 @@ public abstract class ScheduledPollConsumer extends 
DefaultConsumer implements R
     protected void doStart() throws Exception {
         super.doStart();
 
-        // if no existing executor provided, then create a new thread pool 
ourselves
-        if (scheduledExecutorService == null) {
-            // we only need one thread in the pool to schedule this task
-            this.scheduledExecutorService = 
getEndpoint().getCamelContext().getExecutorServiceManager()
-                    .newScheduledThreadPool(this, 
getEndpoint().getEndpointUri(), 1);
-            // and we should shutdown the thread pool when no longer needed
-            this.shutdownExecutor = true;
+        if (scheduler == null) {
+            scheduler = new DefaultScheduledPollConsumerScheduler();
         }
+        scheduler.setCamelContext(getEndpoint().getCamelContext());
+        scheduler.scheduleTask(this, this);
 
-        ObjectHelper.notNull(scheduledExecutorService, 
"scheduledExecutorService", this);
+        // configure scheduler with options from this consumer
+        Map<String, Object> properties = new HashMap<String, Object>();
+        IntrospectionSupport.getProperties(this, properties, null);
+        
IntrospectionSupport.setProperties(getEndpoint().getCamelContext().getTypeConverter(),
 scheduler, properties);
+
+        ObjectHelper.notNull(scheduler, "scheduler", this);
         ObjectHelper.notNull(pollStrategy, "pollStrategy", this);
 
+        ServiceHelper.startService(scheduler);
+
         if (isStartScheduler()) {
             startScheduler();
         }
@@ -389,41 +406,18 @@ public abstract class ScheduledPollConsumer extends 
DefaultConsumer implements R
      * If the scheduler is already started, then this is a noop method call.
      */
     public void startScheduler() {
-        // only schedule task if we have not already done that
-        if (future == null) {
-            if (isUseFixedDelay()) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Scheduling poll (fixed delay) with 
initialDelay: {}, delay: {} ({}) for: {}",
-                            new Object[]{getInitialDelay(), getDelay(), 
getTimeUnit().name().toLowerCase(Locale.ENGLISH), getEndpoint()});
-                }
-                future = scheduledExecutorService.scheduleWithFixedDelay(this, 
getInitialDelay(), getDelay(), getTimeUnit());
-            } else {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Scheduling poll (fixed rate) with initialDelay: 
{}, delay: {} ({}) for: {}",
-                            new Object[]{getInitialDelay(), getDelay(), 
getTimeUnit().name().toLowerCase(Locale.ENGLISH), getEndpoint()});
-                }
-                future = scheduledExecutorService.scheduleAtFixedRate(this, 
getInitialDelay(), getDelay(), getTimeUnit());
-            }
-        }
+        scheduler.startScheduler();
     }
 
     @Override
     protected void doStop() throws Exception {
-        if (future != null) {
-            LOG.debug("This consumer is stopping, so cancelling scheduled 
task: " + future);
-            future.cancel(false);
-            future = null;
-        }
+        ServiceHelper.stopService(scheduler);
         super.doStop();
     }
 
     @Override
     protected void doShutdown() throws Exception {
-        if (shutdownExecutor && scheduledExecutorService != null) {
-            
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutorService);
-            scheduledExecutorService = null;
-            future = null;
-        }
+        ServiceHelper.stopAndShutdownServices(scheduler);
         super.doShutdown();
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/6afe8575/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java 
b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
index b3a0ed6..4de9110 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
@@ -64,6 +64,7 @@ public abstract class ScheduledPollEndpoint extends 
DefaultEndpoint {
         Object sendEmptyMessageWhenIdle = 
options.remove("sendEmptyMessageWhenIdle");
         Object greedy = options.remove("greedy");
         Object scheduledExecutorService  = 
options.remove("scheduledExecutorService");
+        Object scheduler  = options.remove("scheduler");
         boolean setConsumerProperties = false;
         
         // the following is split into two if statements to satisfy the 
checkstyle max complexity constraint
@@ -73,6 +74,9 @@ public abstract class ScheduledPollEndpoint extends 
DefaultEndpoint {
         if (runLoggingLevel != null || startScheduler != null || 
sendEmptyMessageWhenIdle != null || greedy != null || scheduledExecutorService 
!= null) {
             setConsumerProperties = true;
         }
+        if (scheduler != null) {
+            setConsumerProperties = true;
+        }
         
         if (setConsumerProperties) {
         
@@ -109,6 +113,9 @@ public abstract class ScheduledPollEndpoint extends 
DefaultEndpoint {
             if (scheduledExecutorService != null) {
                 consumerProperties.put("scheduledExecutorService", 
scheduledExecutorService);
             }
+            if (scheduler != null) {
+                consumerProperties.put("scheduler", scheduler);
+            }
         }
     }
     

http://git-wip-us.apache.org/repos/asf/camel/blob/6afe8575/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java
 
b/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java
new file mode 100644
index 0000000..fdf99d3
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Consumer;
+import org.apache.camel.ShutdownableService;
+
+/**
+ * A pluggable scheduler for {@link 
org.apache.camel.impl.ScheduledPollConsumer} consumers.
+ * <p/>
+ * The default implementation {@link 
org.apache.camel.impl.DefaultScheduledPollConsumerScheduler} is
+ * using the {@link java.util.concurrent.ScheduledExecutorService} from the 
JDK to schedule and run the poll task.
+ * <p/>
+ * An alternative implementation is in <tt>camel-quartz</tt> component that 
allows to use CRON expression
+ * to define when the scheduler should run.
+ */
+public interface ScheduledPollConsumerScheduler extends ShutdownableService, 
CamelContextAware {
+
+    /**
+     * Schedules the task to run.
+     *
+     * @param consumer the consumer.
+     * @param task the task to run.
+     */
+    void scheduleTask(Consumer consumer, Runnable task);
+
+    /**
+     * Starts the scheduler.
+     * <p/>
+     * If the scheduler is already started, then this is a noop method call.
+     */
+    void startScheduler();
+
+    /**
+     * Whether the scheduler has been started.
+     *
+     * @return <tt>true</tt> if started, <tt>false</tt> otherwise.
+     */
+    boolean isSchedulerStarted();
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6afe8575/camel-core/src/test/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategyTest.java
 
b/camel-core/src/test/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategyTest.java
index ad0362c..3568385 100644
--- 
a/camel-core/src/test/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategyTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategyTest.java
@@ -17,6 +17,7 @@
 package org.apache.camel.impl;
 
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
 import org.apache.camel.util.ServiceHelper;
 
 public class LimitedPollingConsumerPollStrategyTest extends ContextTestSupport 
{
@@ -29,7 +30,8 @@ public class LimitedPollingConsumerPollStrategyTest extends 
ContextTestSupport {
         strategy = new LimitedPollingConsumerPollStrategy();
         strategy.setLimit(3);
 
-        MockScheduledPollConsumer consumer = new 
MockScheduledPollConsumer(expectedException);
+        final Endpoint endpoint = getMockEndpoint("mock:foo");
+        MockScheduledPollConsumer consumer = new 
MockScheduledPollConsumer(endpoint, expectedException);
         consumer.setPollStrategy(strategy);
 
         consumer.start();
@@ -50,7 +52,8 @@ public class LimitedPollingConsumerPollStrategyTest extends 
ContextTestSupport {
         strategy = new LimitedPollingConsumerPollStrategy();
         strategy.setLimit(2);
 
-        MockScheduledPollConsumer consumer = new 
MockScheduledPollConsumer(expectedException);
+        final Endpoint endpoint = getMockEndpoint("mock:foo");
+        MockScheduledPollConsumer consumer = new 
MockScheduledPollConsumer(endpoint, expectedException);
         consumer.setPollStrategy(strategy);
 
         consumer.start();
@@ -69,7 +72,8 @@ public class LimitedPollingConsumerPollStrategyTest extends 
ContextTestSupport {
         strategy = new LimitedPollingConsumerPollStrategy();
         strategy.setLimit(3);
 
-        MockScheduledPollConsumer consumer = new 
MockScheduledPollConsumer(expectedException);
+        final Endpoint endpoint = getMockEndpoint("mock:foo");
+        MockScheduledPollConsumer consumer = new 
MockScheduledPollConsumer(endpoint, expectedException);
         consumer.setPollStrategy(strategy);
 
         consumer.start();
@@ -95,7 +99,8 @@ public class LimitedPollingConsumerPollStrategyTest extends 
ContextTestSupport {
         strategy = new LimitedPollingConsumerPollStrategy();
         strategy.setLimit(3);
 
-        MockScheduledPollConsumer consumer = new 
MockScheduledPollConsumer(expectedException);
+        final Endpoint endpoint = getMockEndpoint("mock:foo");
+        MockScheduledPollConsumer consumer = new 
MockScheduledPollConsumer(endpoint, expectedException);
         consumer.setPollStrategy(strategy);
 
         consumer.start();
@@ -131,10 +136,11 @@ public class LimitedPollingConsumerPollStrategyTest 
extends ContextTestSupport {
         strategy = new LimitedPollingConsumerPollStrategy();
         strategy.setLimit(3);
 
-        MockScheduledPollConsumer consumer = new 
MockScheduledPollConsumer(expectedException);
+        final Endpoint endpoint = getMockEndpoint("mock:foo");
+        MockScheduledPollConsumer consumer = new 
MockScheduledPollConsumer(endpoint, expectedException);
         consumer.setPollStrategy(strategy);
 
-        MockScheduledPollConsumer consumer2 = new 
MockScheduledPollConsumer(null);
+        MockScheduledPollConsumer consumer2 = new 
MockScheduledPollConsumer(endpoint, null);
         consumer2.setPollStrategy(strategy);
 
         consumer.start();
@@ -163,7 +169,8 @@ public class LimitedPollingConsumerPollStrategyTest extends 
ContextTestSupport {
         strategy = new LimitedPollingConsumerPollStrategy();
         strategy.setLimit(3);
 
-        MockScheduledPollConsumer consumer = new 
MockScheduledPollConsumer(expectedException);
+        final Endpoint endpoint = getMockEndpoint("mock:foo");
+        MockScheduledPollConsumer consumer = new 
MockScheduledPollConsumer(endpoint, expectedException);
         consumer.setPollStrategy(strategy);
 
         consumer.start();

http://git-wip-us.apache.org/repos/asf/camel/blob/6afe8575/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java 
b/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java
index 6a80019..be48d09 100644
--- 
a/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java
+++ 
b/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java
@@ -18,6 +18,7 @@ package org.apache.camel.impl;
 
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
+import org.apache.camel.Endpoint;
 import org.apache.camel.Processor;
 
 public class MockScheduledPollConsumer extends ScheduledPollConsumer {
@@ -29,8 +30,8 @@ public class MockScheduledPollConsumer extends 
ScheduledPollConsumer {
     }
 
     // dummy constructor here - we just want to test the run() method, which 
calls poll()   
-    public MockScheduledPollConsumer(Exception exceptionToThrowOnPoll) {
-        super(null, null, new ScheduledThreadPoolExecutor(1));
+    public MockScheduledPollConsumer(Endpoint endpoint, Exception 
exceptionToThrowOnPoll) {
+        super(endpoint, null, new ScheduledThreadPoolExecutor(1));
         this.exceptionToThrowOnPoll = exceptionToThrowOnPoll;
     }
     

http://git-wip-us.apache.org/repos/asf/camel/blob/6afe8575/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java 
b/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java
index ba7f2e2..7574069 100644
--- 
a/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java
@@ -30,7 +30,8 @@ public class ScheduledPollConsumerTest extends 
ContextTestSupport {
     public void testExceptionOnPollAndCanStartAgain() throws Exception {
 
         final Exception expectedException = new Exception("Hello, I should be 
thrown on shutdown only!");
-        MockScheduledPollConsumer consumer = new 
MockScheduledPollConsumer(expectedException);
+        final Endpoint endpoint = getMockEndpoint("mock:foo");
+        MockScheduledPollConsumer consumer = new 
MockScheduledPollConsumer(endpoint, expectedException);
 
         consumer.setPollStrategy(new PollingConsumerPollStrategy() {
             public boolean begin(Consumer consumer, Endpoint endpoint) {
@@ -72,7 +73,8 @@ public class ScheduledPollConsumerTest extends 
ContextTestSupport {
         event = "";
 
         final Exception expectedException = new Exception("Hello, I should be 
thrown on shutdown only!");
-        MockScheduledPollConsumer consumer = new 
MockScheduledPollConsumer(expectedException);
+        final Endpoint endpoint = getMockEndpoint("mock:foo");
+        MockScheduledPollConsumer consumer = new 
MockScheduledPollConsumer(endpoint, expectedException);
 
         consumer.setPollStrategy(new PollingConsumerPollStrategy() {
             public boolean begin(Consumer consumer, Endpoint endpoint) {
@@ -106,7 +108,8 @@ public class ScheduledPollConsumerTest extends 
ContextTestSupport {
     }
 
     public void testNoExceptionOnPoll() throws Exception {
-        MockScheduledPollConsumer consumer = new 
MockScheduledPollConsumer(null);
+        final Endpoint endpoint = getMockEndpoint("mock:foo");
+        MockScheduledPollConsumer consumer = new 
MockScheduledPollConsumer(endpoint, null);
         consumer.start();
         consumer.run(); 
         consumer.stop();

Reply via email to