Author: davsclaus Date: Sat Nov 5 12:46:13 2011 New Revision: 1197948 URL: http://svn.apache.org/viewvc?rev=1197948&view=rev Log: CAMEL-4605: Fixed issue when stopping Batch Consumer routes and have configured route with CompleteAllTask, there may be a slim chance the route will be shutdown too early before the last batch message had a chance to be enlisted as in flight exchange.
Added: camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/ camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/FtpCronScheduledRoutePolicyTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java camel/trunk/tests/camel-itest/pom.xml Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Sat Nov 5 12:46:13 2011 @@ -195,12 +195,24 @@ public abstract class GenericFileConsume } public int getPendingExchangesSize() { + int answer; // only return the real pending size in case we are configured to complete all tasks if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { - return pendingExchanges; + answer = pendingExchanges; } else { - return 0; + answer = 0; } + + if (answer == 0 && isPolling()) { + // force at least one pending exchange if we are polling as there is a little gap + // in the processBatch method and until an exchange gets enlisted as in-flight + // which happens later, so we need to signal back to the shutdown strategy that + // there is a pending exchange. When we are no longer polling, then we will return 0 + log.trace("Currently polling so returning 1 as pending exchanges"); + answer = 1; + } + + return answer; } public void prepareShutdown() { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Sat Nov 5 12:46:13 2011 @@ -52,6 +52,7 @@ public abstract class ScheduledPollConsu private PollingConsumerPollStrategy pollStrategy = new DefaultPollingConsumerPollStrategy(); private LoggingLevel runLoggingLevel = LoggingLevel.TRACE; private boolean sendEmptyMessageWhenIdle; + private volatile boolean polling; public ScheduledPollConsumer(Endpoint endpoint, Processor processor) { super(endpoint, processor); @@ -130,19 +131,25 @@ public abstract class ScheduledPollConsu LOG.debug("Retrying attempt {} to poll: {}", retryCounter, this.getEndpoint()); } - boolean begin = pollStrategy.begin(this, getEndpoint()); - if (begin) { - retryCounter++; - int polledMessages = poll(); - - if (polledMessages == 0 && isSendEmptyMessageWhenIdle()) { - // send an "empty" exchange - processEmptyMessage(); + // mark we are polling which should also include the begin/poll/commit + polling = true; + try { + boolean begin = pollStrategy.begin(this, getEndpoint()); + if (begin) { + retryCounter++; + int polledMessages = poll(); + + if (polledMessages == 0 && isSendEmptyMessageWhenIdle()) { + // send an "empty" exchange + processEmptyMessage(); + } + + pollStrategy.commit(this, getEndpoint(), polledMessages); + } else { + LOG.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy); } - - pollStrategy.commit(this, getEndpoint(), polledMessages); - } else { - LOG.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy); + } finally { + polling = false; } } @@ -190,6 +197,13 @@ public abstract class ScheduledPollConsu return isRunAllowed() && !isSuspended(); } + /** + * Whether polling is currently in progress + */ + protected boolean isPolling() { + return polling; + } + public long getInitialDelay() { return initialDelay; } Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff ============================================================================== --- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java (original) +++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java Sat Nov 5 12:46:13 2011 @@ -190,12 +190,24 @@ public class S3Consumer extends Schedule } public int getPendingExchangesSize() { + int answer; // only return the real pending size in case we are configured to complete all tasks if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { - return pendingExchanges; + answer = pendingExchanges; } else { - return 0; + answer = 0; } + + if (answer == 0 && isPolling()) { + // force at least one pending exchange if we are polling as there is a little gap + // in the processBatch method and until an exchange gets enlisted as in-flight + // which happens later, so we need to signal back to the shutdown strategy that + // there is a pending exchange. When we are no longer polling, then we will return 0 + log.trace("Currently polling so returning 1 as pending exchanges"); + answer = 1; + } + + return answer; } public void prepareShutdown() { Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff ============================================================================== --- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java (original) +++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java Sat Nov 5 12:46:13 2011 @@ -190,12 +190,24 @@ public class SqsConsumer extends Schedul } public int getPendingExchangesSize() { + int answer; // only return the real pending size in case we are configured to complete all tasks if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { - return pendingExchanges; + answer = pendingExchanges; } else { - return 0; + answer = 0; } + + if (answer == 0 && isPolling()) { + // force at least one pending exchange if we are polling as there is a little gap + // in the processBatch method and until an exchange gets enlisted as in-flight + // which happens later, so we need to signal back to the shutdown strategy that + // there is a pending exchange. When we are no longer polling, then we will return 0 + log.trace("Currently polling so returning 1 as pending exchanges"); + answer = 1; + } + + return answer; } public void prepareShutdown() { Modified: camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff ============================================================================== --- camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java (original) +++ camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java Sat Nov 5 12:46:13 2011 @@ -169,12 +169,24 @@ public class IBatisConsumer extends Sche } public int getPendingExchangesSize() { + int answer; // only return the real pending size in case we are configured to complete all tasks if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { - return pendingExchanges; + answer = pendingExchanges; } else { - return 0; + answer = 0; } + + if (answer == 0 && isPolling()) { + // force at least one pending exchange if we are polling as there is a little gap + // in the processBatch method and until an exchange gets enlisted as in-flight + // which happens later, so we need to signal back to the shutdown strategy that + // there is a pending exchange. When we are no longer polling, then we will return 0 + log.trace("Currently polling so returning 1 as pending exchanges"); + answer = 1; + } + + return answer; } public void prepareShutdown() { Modified: camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff ============================================================================== --- camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java (original) +++ camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java Sat Nov 5 12:46:13 2011 @@ -141,12 +141,24 @@ public class JcloudsBlobStoreConsumer ex } public int getPendingExchangesSize() { + int answer; // only return the real pending size in case we are configured to complete all tasks if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { - return pendingExchanges; + answer = pendingExchanges; } else { - return 0; + answer = 0; } + + if (answer == 0 && isPolling()) { + // force at least one pending exchange if we are polling as there is a little gap + // in the processBatch method and until an exchange gets enlisted as in-flight + // which happens later, so we need to signal back to the shutdown strategy that + // there is a pending exchange. When we are no longer polling, then we will return 0 + log.trace("Currently polling so returning 1 as pending exchanges"); + answer = 1; + } + + return answer; } @Override Modified: camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff ============================================================================== --- camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java (original) +++ camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java Sat Nov 5 12:46:13 2011 @@ -158,12 +158,24 @@ public class JpaConsumer extends Schedul } public int getPendingExchangesSize() { + int answer; // only return the real pending size in case we are configured to complete all tasks if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { - return pendingExchanges; + answer = pendingExchanges; } else { - return 0; + answer = 0; } + + if (answer == 0 && isPolling()) { + // force at least one pending exchange if we are polling as there is a little gap + // in the processBatch method and until an exchange gets enlisted as in-flight + // which happens later, so we need to signal back to the shutdown strategy that + // there is a pending exchange. When we are no longer polling, then we will return 0 + log.trace("Currently polling so returning 1 as pending exchanges"); + answer = 1; + } + + return answer; } public void prepareShutdown() { Modified: camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff ============================================================================== --- camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java (original) +++ camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java Sat Nov 5 12:46:13 2011 @@ -151,12 +151,24 @@ public class KratiConsumer extends Sched @Override public int getPendingExchangesSize() { + int answer; // only return the real pending size in case we are configured to complete all tasks if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { - return pendingExchanges; + answer = pendingExchanges; } else { - return 0; + answer = 0; } + + if (answer == 0 && isPolling()) { + // force at least one pending exchange if we are polling as there is a little gap + // in the processBatch method and until an exchange gets enlisted as in-flight + // which happens later, so we need to signal back to the shutdown strategy that + // there is a pending exchange. When we are no longer polling, then we will return 0 + log.trace("Currently polling so returning 1 as pending exchanges"); + answer = 1; + } + + return answer; } @Override Modified: camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff ============================================================================== --- camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java (original) +++ camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java Sat Nov 5 12:46:13 2011 @@ -211,12 +211,24 @@ public class MailConsumer extends Schedu } public int getPendingExchangesSize() { + int answer; // only return the real pending size in case we are configured to complete all tasks if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { - return pendingExchanges; + answer = pendingExchanges; } else { - return 0; + answer = 0; } + + if (answer == 0 && isPolling()) { + // force at least one pending exchange if we are polling as there is a little gap + // in the processBatch method and until an exchange gets enlisted as in-flight + // which happens later, so we need to signal back to the shutdown strategy that + // there is a pending exchange. When we are no longer polling, then we will return 0 + log.trace("Currently polling so returning 1 as pending exchanges"); + answer = 1; + } + + return answer; } public void prepareShutdown() { Modified: camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff ============================================================================== --- camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java (original) +++ camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java Sat Nov 5 12:46:13 2011 @@ -169,12 +169,24 @@ public class MyBatisConsumer extends Sch } public int getPendingExchangesSize() { + int answer; // only return the real pending size in case we are configured to complete all tasks if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { - return pendingExchanges; + answer = pendingExchanges; } else { - return 0; + answer = 0; } + + if (answer == 0 && isPolling()) { + // force at least one pending exchange if we are polling as there is a little gap + // in the processBatch method and until an exchange gets enlisted as in-flight + // which happens later, so we need to signal back to the shutdown strategy that + // there is a pending exchange. When we are no longer polling, then we will return 0 + log.trace("Currently polling so returning 1 as pending exchanges"); + answer = 1; + } + + return answer; } public void prepareShutdown() { Modified: camel/trunk/tests/camel-itest/pom.xml URL: http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest/pom.xml?rev=1197948&r1=1197947&r2=1197948&view=diff ============================================================================== --- camel/trunk/tests/camel-itest/pom.xml (original) +++ camel/trunk/tests/camel-itest/pom.xml Sat Nov 5 12:46:13 2011 @@ -100,6 +100,11 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> + <artifactId>camel-quartz</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> <artifactId>camel-rss</artifactId> <scope>test</scope> <!-- conflicts with mockmail for unit testing, so we exclude this geronimo spec --> Added: camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/FtpCronScheduledRoutePolicyTest.java URL: http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/FtpCronScheduledRoutePolicyTest.java?rev=1197948&view=auto ============================================================================== --- camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/FtpCronScheduledRoutePolicyTest.java (added) +++ camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/FtpCronScheduledRoutePolicyTest.java Sat Nov 5 12:46:13 2011 @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.itest.quartz; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Exchange; +import org.apache.camel.ShutdownRunningTask; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.builder.ThreadPoolProfileBuilder; +import org.apache.camel.routepolicy.quartz.CronScheduledRoutePolicy; +import org.apache.camel.spi.ThreadPoolProfile; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.ftpserver.FtpServer; +import org.apache.ftpserver.FtpServerFactory; +import org.apache.ftpserver.filesystem.nativefs.NativeFileSystemFactory; +import org.apache.ftpserver.ftplet.UserManager; +import org.apache.ftpserver.listener.ListenerFactory; +import org.apache.ftpserver.usermanager.ClearTextPasswordEncryptor; +import org.apache.ftpserver.usermanager.impl.PropertiesUserManager; +import org.junit.Ignore; +import org.junit.Test; + +/** + * + */ +@Ignore("Manual test") +public class FtpCronScheduledRoutePolicyTest extends CamelTestSupport { + + protected FtpServer ftpServer; + private String ftp = "ftp:localhost:20128/myapp?password=admin&username=admin&delay=5s&idempotent=false&localWorkDirectory=target/tmp"; + + @Test + public void testFtpCronScheduledRoutePolicyTest() throws Exception { + template.sendBodyAndHeader("file:res/home/myapp", "Hello World", Exchange.FILE_NAME, "hello.txt"); + + Thread.sleep(10 * 1000 * 60); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + CronScheduledRoutePolicy policy = new CronScheduledRoutePolicy(); + policy.setRouteStartTime("* 0/2 * * * ?"); + policy.setRouteStopTime("* 1/2 * * * ?"); + policy.setRouteStopGracePeriod(250); + policy.setTimeUnit(TimeUnit.SECONDS); + + ThreadPoolProfile profile = new ThreadPoolProfileBuilder("foo") + .poolSize(2).maxPoolSize(2).maxPoolSize(-1).build(); + + context.getExecutorServiceManager().registerThreadPoolProfile(profile); + + from(ftp) + .noAutoStartup().routePolicy(policy).shutdownRunningTask(ShutdownRunningTask.CompleteAllTasks) + .log("Processing ${file:name}") + .to("log:done"); + } + }; + } + + public void setUp() throws Exception { + super.setUp(); + deleteDirectory("res"); + createDirectory("res/home/myapp"); + initFtpServer(); + ftpServer.start(); + } + + public void tearDown() throws Exception { + super.tearDown(); + ftpServer.stop(); + ftpServer = null; + } + + protected void initFtpServer() throws Exception { + FtpServerFactory serverFactory = new FtpServerFactory(); + + // setup user management to read our users.properties and use clear text passwords + File file = new File("./src/test/resources/users.properties").getAbsoluteFile(); + UserManager uman = new PropertiesUserManager(new ClearTextPasswordEncryptor(), file, "admin"); + serverFactory.setUserManager(uman); + + NativeFileSystemFactory fsf = new NativeFileSystemFactory(); + fsf.setCreateHome(true); + serverFactory.setFileSystem(fsf); + + ListenerFactory factory = new ListenerFactory(); + factory.setPort(20128); + serverFactory.addListener("default", factory.createListener()); + + ftpServer = serverFactory.createServer(); + } + +}