Updated Branches: refs/heads/master f099af733 -> 68b357d8b
CAMEL-6635: Darn the polling consumer logic wasn't too well equipped for this kind. Had to revert to use the old way to allow picking up late files that arrives later when the poll starts. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/68b357d8 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/68b357d8 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/68b357d8 Branch: refs/heads/master Commit: 68b357d8b792676c44b7b4c88ec72f73c6501e65 Parents: f099af7 Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Aug 14 19:26:15 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Aug 14 19:26:15 2013 +0200 ---------------------------------------------------------------------- .../camel/impl/EventDrivenPollingConsumer.java | 4 +- .../camel/impl/ScheduledPollConsumer.java | 28 +++-- .../SingleScheduledPollConsumerScheduler.java | 104 ------------------- .../file/FileConsumePollEnrichFileTest.java | 11 +- 4 files changed, 22 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/68b357d8/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java index 296c8c4..fdfed6e 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java @@ -143,9 +143,9 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement if (consumer instanceof PollingConsumerPollingStrategy) { PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer; strategy.onInit(); + } else { + ServiceHelper.startService(consumer); } - - ServiceHelper.startService(consumer); } protected void doStop() throws Exception { http://git-wip-us.apache.org/repos/asf/camel/blob/68b357d8/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java index ce015da..17bc72a 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java @@ -398,11 +398,7 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R } scheduler.setCamelContext(getEndpoint().getCamelContext()); scheduler.onInit(this); - - if (!(scheduler instanceof SingleScheduledPollConsumerScheduler)) { - // schedule task if its not the single scheduled - scheduler.scheduleTask(this); - } + scheduler.scheduleTask(this); // configure scheduler with options from this consumer Map<String, Object> properties = new HashMap<String, Object>(); @@ -458,29 +454,29 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R @Override public void onInit() throws Exception { - // use a single scheduler so we do not have it running it periodically when we use - // this consumer as a EventDrivenPollingConsumer - scheduler = new SingleScheduledPollConsumerScheduler(); + // make sure the scheduler is starter + startScheduler = true; } @Override public long beforePoll(long timeout) throws Exception { - if (LOG.isTraceEnabled()) { - LOG.trace("Before poll {}", getEndpoint()); + LOG.trace("Before poll {}", getEndpoint()); + // resume or start our self + if (!ServiceHelper.resumeService(this)) { + ServiceHelper.startService(this); } - scheduler.scheduleTask(this); - // ensure at least timeout is as long as one poll delay normally is - // to give the poll a chance to run once + // ensure at least timeout is as long as one poll delay return Math.max(timeout, getDelay()); } @Override public void afterPoll() throws Exception { - if (LOG.isTraceEnabled()) { - LOG.trace("After poll {}", getEndpoint()); + LOG.trace("After poll {}", getEndpoint()); + // suspend or stop our self + if (!ServiceHelper.suspendService(this)) { + ServiceHelper.stopService(this); } - scheduler.unscheduleTask(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/68b357d8/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java b/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java deleted file mode 100644 index 347e1d8..0000000 --- a/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.impl; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; - -import org.apache.camel.CamelContext; -import org.apache.camel.Consumer; -import org.apache.camel.spi.ScheduledPollConsumerScheduler; - -/** - * A {@link ScheduledPollConsumerScheduler} which is <b>not</b> scheduled but uses a regular single-threaded {@link ExecutorService} - * to execute the task when {@link #scheduleTask(Runnable)} is invoked. - * <p/> - * This is used when the {@link org.apache.camel.PollingConsumer} EIP is implemented using the {@link EventDrivenPollingConsumer} - * bridging a {@link ScheduledPollConsumer} implementation. In this case we use this single threaded regular thread pool - * to execute the poll task on-demand, instead of using the usual scheduled thread pool which does not fit well with a - * on-demand poll attempt. - */ -public class SingleScheduledPollConsumerScheduler extends org.apache.camel.support.ServiceSupport implements ScheduledPollConsumerScheduler { - - private Consumer consumer; - private CamelContext camelContext; - private ExecutorService executorService; - private Future future; - - @Override - public void onInit(Consumer consumer) { - this.consumer = consumer; - } - - @Override - public void scheduleTask(Runnable task) { - if (isSchedulerStarted()) { - future = executorService.submit(task); - } - } - - @Override - public void unscheduleTask() { - if (future != null) { - future.cancel(false); - future = null; - } - } - - @Override - public void startScheduler() { - // noop - } - - @Override - public boolean isSchedulerStarted() { - return executorService != null && !executorService.isShutdown(); - } - - @Override - public void setCamelContext(CamelContext camelContext) { - this.camelContext = camelContext; - } - - @Override - public CamelContext getCamelContext() { - return camelContext; - } - - @Override - protected void doStart() throws Exception { - if (executorService == null) { - executorService = camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, consumer.getEndpoint().getEndpointKey()); - } - } - - @Override - protected void doStop() throws Exception { - if (future != null) { - future.cancel(false); - future = null; - } - } - - @Override - protected void doShutdown() throws Exception { - if (executorService != null) { - camelContext.getExecutorServiceManager().shutdownNow(executorService); - executorService = null; - } - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/68b357d8/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileTest.java b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileTest.java index 20011af..726785a 100644 --- a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileTest.java +++ b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumePollEnrichFileTest.java @@ -36,10 +36,15 @@ public class FileConsumePollEnrichFileTest extends ContextTestSupport { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceived("Big file"); - template.sendBodyAndHeader("file://target/enrichdata", "Big file", Exchange.FILE_NAME, "AAA.dat"); template.sendBodyAndHeader("file://target/enrich", "Start", Exchange.FILE_NAME, "AAA.fin"); - assertMockEndpointsSatisfied(); + log.info("Sleeping for 2 sec before writing enrichdata file"); + Thread.sleep(2000); + template.sendBodyAndHeader("file://target/enrichdata", "Big file", Exchange.FILE_NAME, "AAA.dat"); + log.info("... write done"); + + mock.assertIsSatisfied(); + // because the on completion is executed async, we should wait a bit to not fail on slow CI servers Thread.sleep(200); assertFileExists("target/enrich/.done/AAA.fin"); @@ -53,7 +58,7 @@ public class FileConsumePollEnrichFileTest extends ContextTestSupport { public void configure() throws Exception { from("file://target/enrich?move=.done") .to("mock:start") - .pollEnrich("file://target/enrichdata?move=.done", 5000) + .pollEnrich("file://target/enrichdata?move=.done", 20000) .to("mock:result"); } };