Updated Branches:
  refs/heads/master 99365506f -> 12fd86170

CAMEL-6635: PollingConsumer from a scheduled consumer such as file/ftp can use 
a regular thread pool instead of being scheduled


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/36f48fb0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/36f48fb0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/36f48fb0

Branch: refs/heads/master
Commit: 36f48fb0942630cb1179b2eb0bb474affb5b3742
Parents: 9936550
Author: Claus Ibsen <davscl...@apache.org>
Authored: Wed Aug 14 11:43:10 2013 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Aug 14 11:58:10 2013 +0200

----------------------------------------------------------------------
 .../DefaultScheduledPollConsumerScheduler.java  |   7 ++
 .../camel/impl/EventDrivenPollingConsumer.java  |   5 +-
 .../camel/impl/ScheduledPollConsumer.java       |  29 +++---
 .../camel/impl/ScheduledPollEndpoint.java       |   1 +
 .../SingleScheduledPollConsumerScheduler.java   | 103 +++++++++++++++++++
 .../spi/ScheduledPollConsumerScheduler.java     |   7 ++
 .../file/FileConsumerCustomSchedulerTest.java   |   5 +
 .../QuartzScheduledPollConsumerScheduler.java   |  12 +++
 8 files changed, 152 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/36f48fb0/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
index 28d1cc0..8c0af76 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
@@ -98,6 +98,13 @@ public class DefaultScheduledPollConsumerScheduler extends 
org.apache.camel.supp
     }
 
     @Override
+    public void unscheduleTask() {
+        if (future != null) {
+            future.cancel(false);
+        }
+    }
+
+    @Override
     public void startScheduler() {
         // only schedule task if we have not already done that
         if (future == null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/36f48fb0/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
 
b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
index b257f28..296c8c4 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
@@ -143,10 +143,9 @@ public class EventDrivenPollingConsumer extends 
PollingConsumerSupport implement
         if (consumer instanceof PollingConsumerPollingStrategy) {
             PollingConsumerPollingStrategy strategy = 
(PollingConsumerPollingStrategy) consumer;
             strategy.onInit();
-        } else {
-            // for regular consumers start it
-            ServiceHelper.startService(consumer);
         }
+
+        ServiceHelper.startService(consumer);
     }
 
     protected void doStop() throws Exception {

http://git-wip-us.apache.org/repos/asf/camel/blob/36f48fb0/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java 
b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
index 2bb28b3..1670354 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
@@ -27,7 +27,6 @@ import org.apache.camel.FailedToCreateConsumerException;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.PollingConsumerPollingStrategy;
 import org.apache.camel.Processor;
-import org.apache.camel.ResolveEndpointFailedException;
 import org.apache.camel.SuspendableService;
 import org.apache.camel.spi.PollingConsumerPollStrategy;
 import org.apache.camel.spi.ScheduledPollConsumerScheduler;
@@ -398,7 +397,11 @@ public abstract class ScheduledPollConsumer extends 
DefaultConsumer implements R
             scheduler = new DefaultScheduledPollConsumerScheduler();
         }
         scheduler.setCamelContext(getEndpoint().getCamelContext());
-        scheduler.scheduleTask(this, this);
+
+        if (!(scheduler instanceof SingleScheduledPollConsumerScheduler)) {
+            // schedule task if its not the single scheduled
+            scheduler.scheduleTask(this, this);
+        }
 
         // configure scheduler with options from this consumer
         Map<String, Object> properties = new HashMap<String, Object>();
@@ -454,28 +457,26 @@ public abstract class ScheduledPollConsumer extends 
DefaultConsumer implements R
 
     @Override
     public void onInit() throws Exception {
-        // noop
+        // use a single scheduler so we do not have it running it periodically 
when we use
+        // this consumer as a EventDrivenPollingConsumer
+        scheduler = new SingleScheduledPollConsumerScheduler(this);
     }
 
     @Override
     public long beforePoll(long timeout) throws Exception {
-        LOG.trace("Before poll {}", getEndpoint());
-        // resume or start our self
-        if (!ServiceHelper.resumeService(this)) {
-            ServiceHelper.startService(this);
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Before poll {}", getEndpoint());
         }
-
-        // ensure at least timeout is as long as one poll delay
-        return Math.max(timeout, getDelay());
+        scheduler.scheduleTask(this, this);
+        return timeout;
     }
 
     @Override
     public void afterPoll() throws Exception {
-        LOG.trace("After poll {}", getEndpoint());
-        // suspend or stop our self
-        if (!ServiceHelper.suspendService(this)) {
-            ServiceHelper.stopService(this);
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("After poll {}", getEndpoint());
         }
+        scheduler.unscheduleTask();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/36f48fb0/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java 
b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
index 5ccbedb..191f9d1 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
@@ -21,6 +21,7 @@ import java.util.Map;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Component;
+import org.apache.camel.PollingConsumer;
 import org.apache.camel.util.IntrospectionSupport;
 
 /**

http://git-wip-us.apache.org/repos/asf/camel/blob/36f48fb0/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java
 
b/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java
new file mode 100644
index 0000000..3d9e22e
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.spi.ScheduledPollConsumerScheduler;
+
+/**
+ * A {@link ScheduledPollConsumerScheduler} which is <b>not</b> scheduled but 
uses a regular single-threaded {@link ExecutorService}
+ * to execute the task when {@link #scheduleTask(org.apache.camel.Consumer, 
Runnable)} is invoked.
+ * <p/>
+ * This is used when the {@link org.apache.camel.PollingConsumer} EIP is 
implemented using the {@link EventDrivenPollingConsumer}
+ * bridging a {@link ScheduledPollConsumer} implementation. In this case we 
use this single threaded regular thread pool
+ * to execute the poll task on-demand, instead of using the usual scheduled 
thread pool which does not fit well with a
+ * on-demand poll attempt.
+ */
+public class SingleScheduledPollConsumerScheduler extends 
org.apache.camel.support.ServiceSupport implements 
ScheduledPollConsumerScheduler {
+
+    private final Consumer consumer;
+    private CamelContext camelContext;
+    private ExecutorService executorService;
+    private Future future;
+
+    public SingleScheduledPollConsumerScheduler(Consumer consumer) {
+        this.consumer = consumer;
+    }
+
+    @Override
+    public void scheduleTask(Consumer consumer, Runnable task) {
+        if (isSchedulerStarted()) {
+            future = executorService.submit(task);
+        }
+    }
+
+    @Override
+    public void unscheduleTask() {
+        if (future != null) {
+            future.cancel(false);
+            future = null;
+        }
+    }
+
+    @Override
+    public void startScheduler() {
+        // noop
+    }
+
+    @Override
+    public boolean isSchedulerStarted() {
+        return executorService != null && !executorService.isShutdown();
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        if (executorService == null) {
+            executorService = 
camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, 
consumer.getEndpoint().getEndpointKey());
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (future != null) {
+            future.cancel(false);
+            future = null;
+        }
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        if (executorService != null) {
+            
camelContext.getExecutorServiceManager().shutdownNow(executorService);
+            executorService = null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/36f48fb0/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java
 
b/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java
index fdf99d3..db9b41f 100644
--- 
a/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java
+++ 
b/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java
@@ -40,6 +40,13 @@ public interface ScheduledPollConsumerScheduler extends 
ShutdownableService, Cam
     void scheduleTask(Consumer consumer, Runnable task);
 
     /**
+     * Attempts to unschedules the last task which was scheduled.
+     * <p/>
+     * An implementation may not implement this method.
+     */
+    void unscheduleTask();
+
+    /**
      * Starts the scheduler.
      * <p/>
      * If the scheduler is already started, then this is a noop method call.

http://git-wip-us.apache.org/repos/asf/camel/blob/36f48fb0/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java
 
b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java
index cfa6b03..ca22bf0 100644
--- 
a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java
@@ -88,6 +88,11 @@ public class FileConsumerCustomSchedulerTest extends 
ContextTestSupport {
             };
         }
 
+        @Override
+        public void unscheduleTask() {
+            // noop
+        }
+
         public int getCounter() {
             return counter;
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/36f48fb0/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
 
b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
index e4e3c05..fec05c5 100644
--- 
a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
+++ 
b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
@@ -63,6 +63,18 @@ public class QuartzScheduledPollConsumerScheduler extends 
ServiceSupport impleme
     }
 
     @Override
+    public void unscheduleTask() {
+        if (trigger != null) {
+            LOG.debug("Unscheduling trigger: {}", trigger.getKey());
+            try {
+                quartzScheduler.unscheduleJob(trigger.getKey());
+            } catch (SchedulerException e) {
+                throw ObjectHelper.wrapRuntimeCamelException(e);
+            }
+        }
+    }
+
+    @Override
     public void startScheduler() {
         // the quartz component starts the scheduler
     }

Reply via email to