Author: davsclaus Date: Sat Nov 5 12:56:22 2011 New Revision: 1197951 URL: http://svn.apache.org/viewvc?rev=1197951&view=rev Log: CAMEL-4605: Fixed issue when stopping Batch Consumer routes and have configured route with CompleteAllTask, there may be a slim chance the route will be shutdown too early before the last batch message had a chance to be enlisted as in flight exchange.
Added: camel/branches/camel-2.8.x/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/ - copied from r1197948, camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/ camel/branches/camel-2.8.x/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/FtpCronScheduledRoutePolicyTest.java - copied unchanged from r1197948, camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/FtpCronScheduledRoutePolicyTest.java Modified: camel/branches/camel-2.8.x/ (props changed) camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java camel/branches/camel-2.8.x/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java camel/branches/camel-2.8.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java camel/branches/camel-2.8.x/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java camel/branches/camel-2.8.x/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java camel/branches/camel-2.8.x/tests/camel-itest/pom.xml Propchange: camel/branches/camel-2.8.x/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Sat Nov 5 12:56:22 2011 @@ -1 +1 @@ -/camel/trunk:1186106,1186625,1186772,1187221,1187485,1187882,1187893,1188070-1188085,1188642,1188674,1188879,1188881,1189139,1189600,1189681,1189693,1189737,1190212-1190213,1190246,1190303,1195317,1195616,1196210,1197450,1197933 +/camel/trunk:1186106,1186625,1186772,1187221,1187485,1187882,1187893,1188070-1188085,1188642,1188674,1188879,1188881,1189139,1189600,1189681,1189693,1189737,1190212-1190213,1190246,1190303,1195317,1195616,1196210,1197450,1197933,1197948 Propchange: camel/branches/camel-2.8.x/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1197951&r1=1197950&r2=1197951&view=diff ============================================================================== --- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original) +++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Sat Nov 5 12:56:22 2011 @@ -196,12 +196,24 @@ public abstract class GenericFileConsume } public int getPendingExchangesSize() { + int answer; // only return the real pending size in case we are configured to complete all tasks if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { - return pendingExchanges; + answer = pendingExchanges; } else { - return 0; + answer = 0; } + + if (answer == 0 && isPolling()) { + // force at least one pending exchange if we are polling as there is a little gap + // in the processBatch method and until an exchange gets enlisted as in-flight + // which happens later, so we need to signal back to the shutdown strategy that + // there is a pending exchange. When we are no longer polling, then we will return 0 + log.trace("Currently polling so returning 1 as pending exchanges"); + answer = 1; + } + + return answer; } public void prepareShutdown() { Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1197951&r1=1197950&r2=1197951&view=diff ============================================================================== --- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original) +++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Sat Nov 5 12:56:22 2011 @@ -50,6 +50,7 @@ public abstract class ScheduledPollConsu private boolean useFixedDelay = true; private PollingConsumerPollStrategy pollStrategy = new DefaultPollingConsumerPollStrategy(); private LoggingLevel runLoggingLevel = LoggingLevel.TRACE; + private volatile boolean polling; public ScheduledPollConsumer(Endpoint endpoint, Processor processor) { super(endpoint, processor); @@ -128,13 +129,20 @@ public abstract class ScheduledPollConsu LOG.debug("Retrying attempt {} to poll: {}", retryCounter, this.getEndpoint()); } - boolean begin = pollStrategy.begin(this, getEndpoint()); - if (begin) { - retryCounter++; - int polledMessages = poll(); - pollStrategy.commit(this, getEndpoint(), polledMessages); - } else { - LOG.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy); + // mark we are polling which should also include the begin/poll/commit + polling = true; + try { + boolean begin = pollStrategy.begin(this, getEndpoint()); + if (begin) { + retryCounter++; + int polledMessages = poll(); + + pollStrategy.commit(this, getEndpoint(), polledMessages); + } else { + LOG.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy); + } + } finally { + polling = false; } } @@ -171,6 +179,13 @@ public abstract class ScheduledPollConsu return isRunAllowed() && !isSuspended(); } + /** + * Whether polling is currently in progress + */ + protected boolean isPolling() { + return polling; + } + public long getInitialDelay() { return initialDelay; } Modified: camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java?rev=1197951&r1=1197950&r2=1197951&view=diff ============================================================================== --- camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java (original) +++ camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java Sat Nov 5 12:56:22 2011 @@ -190,12 +190,24 @@ public class S3Consumer extends Schedule } public int getPendingExchangesSize() { + int answer; // only return the real pending size in case we are configured to complete all tasks if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { - return pendingExchanges; + answer = pendingExchanges; } else { - return 0; + answer = 0; } + + if (answer == 0 && isPolling()) { + // force at least one pending exchange if we are polling as there is a little gap + // in the processBatch method and until an exchange gets enlisted as in-flight + // which happens later, so we need to signal back to the shutdown strategy that + // there is a pending exchange. When we are no longer polling, then we will return 0 + log.trace("Currently polling so returning 1 as pending exchanges"); + answer = 1; + } + + return answer; } public void prepareShutdown() { Modified: camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java?rev=1197951&r1=1197950&r2=1197951&view=diff ============================================================================== --- camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java (original) +++ camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java Sat Nov 5 12:56:22 2011 @@ -190,12 +190,24 @@ public class SqsConsumer extends Schedul } public int getPendingExchangesSize() { + int answer; // only return the real pending size in case we are configured to complete all tasks if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { - return pendingExchanges; + answer = pendingExchanges; } else { - return 0; + answer = 0; } + + if (answer == 0 && isPolling()) { + // force at least one pending exchange if we are polling as there is a little gap + // in the processBatch method and until an exchange gets enlisted as in-flight + // which happens later, so we need to signal back to the shutdown strategy that + // there is a pending exchange. When we are no longer polling, then we will return 0 + log.trace("Currently polling so returning 1 as pending exchanges"); + answer = 1; + } + + return answer; } public void prepareShutdown() { Modified: camel/branches/camel-2.8.x/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java?rev=1197951&r1=1197950&r2=1197951&view=diff ============================================================================== --- camel/branches/camel-2.8.x/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java (original) +++ camel/branches/camel-2.8.x/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java Sat Nov 5 12:56:22 2011 @@ -169,12 +169,24 @@ public class IBatisConsumer extends Sche } public int getPendingExchangesSize() { + int answer; // only return the real pending size in case we are configured to complete all tasks if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { - return pendingExchanges; + answer = pendingExchanges; } else { - return 0; + answer = 0; } + + if (answer == 0 && isPolling()) { + // force at least one pending exchange if we are polling as there is a little gap + // in the processBatch method and until an exchange gets enlisted as in-flight + // which happens later, so we need to signal back to the shutdown strategy that + // there is a pending exchange. When we are no longer polling, then we will return 0 + log.trace("Currently polling so returning 1 as pending exchanges"); + answer = 1; + } + + return answer; } public void prepareShutdown() { Modified: camel/branches/camel-2.8.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?rev=1197951&r1=1197950&r2=1197951&view=diff ============================================================================== --- camel/branches/camel-2.8.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java (original) +++ camel/branches/camel-2.8.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java Sat Nov 5 12:56:22 2011 @@ -158,12 +158,24 @@ public class JpaConsumer extends Schedul } public int getPendingExchangesSize() { + int answer; // only return the real pending size in case we are configured to complete all tasks if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { - return pendingExchanges; + answer = pendingExchanges; } else { - return 0; + answer = 0; } + + if (answer == 0 && isPolling()) { + // force at least one pending exchange if we are polling as there is a little gap + // in the processBatch method and until an exchange gets enlisted as in-flight + // which happens later, so we need to signal back to the shutdown strategy that + // there is a pending exchange. When we are no longer polling, then we will return 0 + log.trace("Currently polling so returning 1 as pending exchanges"); + answer = 1; + } + + return answer; } public void prepareShutdown() { Modified: camel/branches/camel-2.8.x/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java?rev=1197951&r1=1197950&r2=1197951&view=diff ============================================================================== --- camel/branches/camel-2.8.x/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java (original) +++ camel/branches/camel-2.8.x/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java Sat Nov 5 12:56:22 2011 @@ -198,12 +198,24 @@ public class MailConsumer extends Schedu } public int getPendingExchangesSize() { + int answer; // only return the real pending size in case we are configured to complete all tasks if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { - return pendingExchanges; + answer = pendingExchanges; } else { - return 0; + answer = 0; } + + if (answer == 0 && isPolling()) { + // force at least one pending exchange if we are polling as there is a little gap + // in the processBatch method and until an exchange gets enlisted as in-flight + // which happens later, so we need to signal back to the shutdown strategy that + // there is a pending exchange. When we are no longer polling, then we will return 0 + log.trace("Currently polling so returning 1 as pending exchanges"); + answer = 1; + } + + return answer; } public void prepareShutdown() { Modified: camel/branches/camel-2.8.x/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java?rev=1197951&r1=1197950&r2=1197951&view=diff ============================================================================== --- camel/branches/camel-2.8.x/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java (original) +++ camel/branches/camel-2.8.x/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java Sat Nov 5 12:56:22 2011 @@ -169,12 +169,24 @@ public class MyBatisConsumer extends Sch } public int getPendingExchangesSize() { + int answer; // only return the real pending size in case we are configured to complete all tasks if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { - return pendingExchanges; + answer = pendingExchanges; } else { - return 0; + answer = 0; } + + if (answer == 0 && isPolling()) { + // force at least one pending exchange if we are polling as there is a little gap + // in the processBatch method and until an exchange gets enlisted as in-flight + // which happens later, so we need to signal back to the shutdown strategy that + // there is a pending exchange. When we are no longer polling, then we will return 0 + log.trace("Currently polling so returning 1 as pending exchanges"); + answer = 1; + } + + return answer; } public void prepareShutdown() { Modified: camel/branches/camel-2.8.x/tests/camel-itest/pom.xml URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/tests/camel-itest/pom.xml?rev=1197951&r1=1197950&r2=1197951&view=diff ============================================================================== --- camel/branches/camel-2.8.x/tests/camel-itest/pom.xml (original) +++ camel/branches/camel-2.8.x/tests/camel-itest/pom.xml Sat Nov 5 12:56:22 2011 @@ -100,6 +100,11 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> + <artifactId>camel-quartz</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> <artifactId>camel-rss</artifactId> <scope>test</scope> <!-- conflicts with mockmail for unit testing, so we exclude this geronimo spec -->