Updated Branches:
  refs/heads/master f099af733 -> 68b357d8b

CAMEL-6635: Darn the polling consumer logic wasn't too well equipped for this 
kind. Had to revert to use the old way to allow picking up late files that 
arrives later when the poll starts.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/68b357d8
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/68b357d8
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/68b357d8

Branch: refs/heads/master
Commit: 68b357d8b792676c44b7b4c88ec72f73c6501e65
Parents: f099af7
Author: Claus Ibsen <davscl...@apache.org>
Authored: Wed Aug 14 19:26:15 2013 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Wed Aug 14 19:26:15 2013 +0200

----------------------------------------------------------------------
 .../camel/impl/EventDrivenPollingConsumer.java  |   4 +-
 .../camel/impl/ScheduledPollConsumer.java       |  28 +++--
 .../SingleScheduledPollConsumerScheduler.java   | 104 -------------------
 .../file/FileConsumePollEnrichFileTest.java     |  11 +-
 4 files changed, 22 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/68b357d8/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
 
b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
index 296c8c4..fdfed6e 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
@@ -143,9 +143,9 @@ public class EventDrivenPollingConsumer extends 
PollingConsumerSupport implement
         if (consumer instanceof PollingConsumerPollingStrategy) {
             PollingConsumerPollingStrategy strategy = 
(PollingConsumerPollingStrategy) consumer;
             strategy.onInit();
+        } else {
+            ServiceHelper.startService(consumer);
         }
-
-        ServiceHelper.startService(consumer);
     }
 
     protected void doStop() throws Exception {

http://git-wip-us.apache.org/repos/asf/camel/blob/68b357d8/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java 
b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
index ce015da..17bc72a 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
@@ -398,11 +398,7 @@ public abstract class ScheduledPollConsumer extends 
DefaultConsumer implements R
         }
         scheduler.setCamelContext(getEndpoint().getCamelContext());
         scheduler.onInit(this);
-
-        if (!(scheduler instanceof SingleScheduledPollConsumerScheduler)) {
-            // schedule task if its not the single scheduled
-            scheduler.scheduleTask(this);
-        }
+        scheduler.scheduleTask(this);
 
         // configure scheduler with options from this consumer
         Map<String, Object> properties = new HashMap<String, Object>();
@@ -458,29 +454,29 @@ public abstract class ScheduledPollConsumer extends 
DefaultConsumer implements R
 
     @Override
     public void onInit() throws Exception {
-        // use a single scheduler so we do not have it running it periodically 
when we use
-        // this consumer as a EventDrivenPollingConsumer
-        scheduler = new SingleScheduledPollConsumerScheduler();
+        // make sure the scheduler is starter
+        startScheduler = true;
     }
 
     @Override
     public long beforePoll(long timeout) throws Exception {
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Before poll {}", getEndpoint());
+        LOG.trace("Before poll {}", getEndpoint());
+        // resume or start our self
+        if (!ServiceHelper.resumeService(this)) {
+            ServiceHelper.startService(this);
         }
-        scheduler.scheduleTask(this);
 
-        // ensure at least timeout is as long as one poll delay normally is
-        // to give the poll a chance to run once
+        // ensure at least timeout is as long as one poll delay
         return Math.max(timeout, getDelay());
     }
 
     @Override
     public void afterPoll() throws Exception {
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("After poll {}", getEndpoint());
+        LOG.trace("After poll {}", getEndpoint());
+        // suspend or stop our self
+        if (!ServiceHelper.suspendService(this)) {
+            ServiceHelper.stopService(this);
         }
-        scheduler.unscheduleTask();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/68b357d8/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java
 
b/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java
deleted file mode 100644
index 347e1d8..0000000
--- 
a/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.impl;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.Consumer;
-import org.apache.camel.spi.ScheduledPollConsumerScheduler;
-
-/**
- * A {@link ScheduledPollConsumerScheduler} which is <b>not</b> scheduled but 
uses a regular single-threaded {@link ExecutorService}
- * to execute the task when {@link #scheduleTask(Runnable)} is invoked.
- * <p/>
- * This is used when the {@link org.apache.camel.PollingConsumer} EIP is 
implemented using the {@link EventDrivenPollingConsumer}
- * bridging a {@link ScheduledPollConsumer} implementation. In this case we 
use this single threaded regular thread pool
- * to execute the poll task on-demand, instead of using the usual scheduled 
thread pool which does not fit well with a
- * on-demand poll attempt.
- */
-public class SingleScheduledPollConsumerScheduler extends 
org.apache.camel.support.ServiceSupport implements 
ScheduledPollConsumerScheduler {
-
-    private Consumer consumer;
-    private CamelContext camelContext;
-    private ExecutorService executorService;
-    private Future future;
-
-    @Override
-    public void onInit(Consumer consumer) {
-        this.consumer = consumer;
-    }
-
-    @Override
-    public void scheduleTask(Runnable task) {
-        if (isSchedulerStarted()) {
-            future = executorService.submit(task);
-        }
-    }
-
-    @Override
-    public void unscheduleTask() {
-        if (future != null) {
-            future.cancel(false);
-            future = null;
-        }
-    }
-
-    @Override
-    public void startScheduler() {
-        // noop
-    }
-
-    @Override
-    public boolean isSchedulerStarted() {
-        return executorService != null && !executorService.isShutdown();
-    }
-
-    @Override
-    public void setCamelContext(CamelContext camelContext) {
-        this.camelContext = camelContext;
-    }
-
-    @Override
-    public CamelContext getCamelContext() {
-        return camelContext;
-    }
-
-    @Override
-    protected void doStart() throws Exception {
-        if (executorService == null) {
-            executorService = 
camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, 
consumer.getEndpoint().getEndpointKey());
-        }
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        if (future != null) {
-            future.cancel(false);
-            future = null;
-        }
-    }
-
-    @Override
-    protected void doShutdown() throws Exception {
-        if (executorService != null) {
-            
camelContext.getExecutorServiceManager().shutdownNow(executorService);
-            executorService = null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/68b357d8/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileTest.java
 
b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileTest.java
index 20011af..726785a 100644
--- 
a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileTest.java
@@ -36,10 +36,15 @@ public class FileConsumePollEnrichFileTest extends 
ContextTestSupport {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceived("Big file");
 
-        template.sendBodyAndHeader("file://target/enrichdata", "Big file", 
Exchange.FILE_NAME, "AAA.dat");
         template.sendBodyAndHeader("file://target/enrich", "Start", 
Exchange.FILE_NAME, "AAA.fin");
 
-        assertMockEndpointsSatisfied();
+        log.info("Sleeping for 2 sec before writing enrichdata file");
+        Thread.sleep(2000);
+        template.sendBodyAndHeader("file://target/enrichdata", "Big file", 
Exchange.FILE_NAME, "AAA.dat");
+        log.info("... write done");
+
+        mock.assertIsSatisfied();
+
         // because the on completion is executed async, we should wait a bit 
to not fail on slow CI servers
         Thread.sleep(200);
         assertFileExists("target/enrich/.done/AAA.fin");
@@ -53,7 +58,7 @@ public class FileConsumePollEnrichFileTest extends 
ContextTestSupport {
             public void configure() throws Exception {
                 from("file://target/enrich?move=.done")
                     .to("mock:start")
-                    .pollEnrich("file://target/enrichdata?move=.done", 5000)
+                    .pollEnrich("file://target/enrichdata?move=.done", 20000)
                     .to("mock:result");
             }
         };

Reply via email to