Author: davsclaus Date: Sat Jan 14 17:25:26 2012 New Revision: 1231533 URL: http://svn.apache.org/viewvc?rev=1231533&view=rev Log: CAMEL-4577: Added a ScheduledBatchPollingConsumer to reuse logic. Thanks to Bilgin for the patch.
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1231533&r1=1231532&r2=1231533&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Sat Jan 14 17:25:26 2012 @@ -23,12 +23,10 @@ import java.util.List; import java.util.Queue; import org.apache.camel.AsyncCallback; -import org.apache.camel.BatchConsumer; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.ShutdownRunningTask; -import org.apache.camel.impl.ScheduledPollConsumer; -import org.apache.camel.spi.ShutdownAware; +import org.apache.camel.impl.ScheduledBatchPollingConsumer; import org.apache.camel.util.CastUtils; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.StopWatch; @@ -39,13 +37,12 @@ import org.slf4j.LoggerFactory; /** * Base class for file consumers. */ -public abstract class GenericFileConsumer<T> extends ScheduledPollConsumer implements BatchConsumer, ShutdownAware { +public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsumer { protected final transient Logger log = LoggerFactory.getLogger(getClass()); protected GenericFileEndpoint<T> endpoint; protected GenericFileOperations<T> operations; protected boolean loggedIn; protected String fileExpressionResult; - protected int maxMessagesPerPoll; protected volatile ShutdownRunningTask shutdownRunningTask; protected volatile int pendingExchanges; protected Processor customProcessor; @@ -140,9 +137,6 @@ public abstract class GenericFileConsume return polledMessages; } - public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { - this.maxMessagesPerPoll = maxMessagesPerPoll; - } @SuppressWarnings("unchecked") public int processBatch(Queue<Object> exchanges) { @@ -187,53 +181,6 @@ public abstract class GenericFileConsume return total; } - public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { - // store a reference what to do in case when shutting down and we have pending messages - this.shutdownRunningTask = shutdownRunningTask; - // do not defer shutdown - return false; - } - - public int getPendingExchangesSize() { - int answer; - // only return the real pending size in case we are configured to complete all tasks - if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { - answer = pendingExchanges; - } else { - answer = 0; - } - - if (answer == 0 && isPolling()) { - // force at least one pending exchange if we are polling as there is a little gap - // in the processBatch method and until an exchange gets enlisted as in-flight - // which happens later, so we need to signal back to the shutdown strategy that - // there is a pending exchange. When we are no longer polling, then we will return 0 - log.trace("Currently polling so returning 1 as pending exchanges"); - answer = 1; - } - - return answer; - } - - public void prepareShutdown() { - // noop - } - - public boolean isBatchAllowed() { - // stop if we are not running - boolean answer = isRunAllowed(); - if (!answer) { - return false; - } - - if (shutdownRunningTask == null) { - // we are not shutting down so continue to run - return true; - } - - // we are shutting down so only continue if we are configured to complete all tasks - return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask; - } /** * Whether or not we can continue polling for more files Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java?rev=1231533&view=auto ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java (added) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java Sat Jan 14 17:25:26 2012 @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.camel.BatchConsumer; +import org.apache.camel.Endpoint; +import org.apache.camel.Processor; +import org.apache.camel.ShutdownRunningTask; +import org.apache.camel.spi.ShutdownAware; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A useful base class for any consumer which is polling batch based + */ +public abstract class ScheduledBatchPollingConsumer extends ScheduledPollConsumer implements BatchConsumer, ShutdownAware { + private static final transient Logger log = LoggerFactory.getLogger(ScheduledBatchPollingConsumer.class); + protected volatile ShutdownRunningTask shutdownRunningTask; + protected volatile int pendingExchanges; + protected int maxMessagesPerPoll; + + public ScheduledBatchPollingConsumer(Endpoint endpoint, Processor processor) { + super(endpoint, processor); + } + + public ScheduledBatchPollingConsumer(Endpoint endpoint, Processor processor, ScheduledExecutorService executor) { + super(endpoint, processor, executor); + } + + @Override + public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { + // store a reference what to do in case when shutting down and we have pending messages + this.shutdownRunningTask = shutdownRunningTask; + // do not defer shutdown + return false; + } + + @Override + public int getPendingExchangesSize() { + int answer; + // only return the real pending size in case we are configured to complete all tasks + if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { + answer = pendingExchanges; + } else { + answer = 0; + } + + if (answer == 0 && isPolling()) { + // force at least one pending exchange if we are polling as there is a little gap + // in the processBatch method and until an exchange gets enlisted as in-flight + // which happens later, so we need to signal back to the shutdown strategy that + // there is a pending exchange. When we are no longer polling, then we will return 0 + log.trace("Currently polling so returning 1 as pending exchanges"); + answer = 1; + } + + return answer; + } + + @Override + public void prepareShutdown() { + // reset task as the state of the task is not to be preserved + // which otherwise may cause isBatchAllowed() to return a wrong answer + this.shutdownRunningTask = null; + } + + @Override + public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { + this.maxMessagesPerPoll = maxMessagesPerPoll; + } + + /** + * Gets the maximum number of messages as a limit to poll at each polling. + * <p/> + * Is default unlimited, but use 0 or negative number to disable it as unlimited. + * + * @return max messages to poll + */ + public int getMaxMessagesPerPoll() { + return maxMessagesPerPoll; + } + + @Override + public boolean isBatchAllowed() { + // stop if we are not running + boolean answer = isRunAllowed(); + if (!answer) { + return false; + } + + if (shutdownRunningTask == null) { + // we are not shutting down so continue to run + return true; + } + + // we are shutting down so only continue if we are configured to complete all tasks + return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask; + } +} Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java?rev=1231533&r1=1231532&r2=1231533&view=diff ============================================================================== --- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java (original) +++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java Sat Jan 14 17:25:26 2012 @@ -27,13 +27,10 @@ import com.amazonaws.services.s3.model.O import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectSummary; -import org.apache.camel.BatchConsumer; import org.apache.camel.Exchange; import org.apache.camel.NoFactoryAvailableException; import org.apache.camel.Processor; -import org.apache.camel.ShutdownRunningTask; -import org.apache.camel.impl.ScheduledPollConsumer; -import org.apache.camel.spi.ShutdownAware; +import org.apache.camel.impl.ScheduledBatchPollingConsumer; import org.apache.camel.spi.Synchronization; import org.apache.camel.util.CastUtils; import org.apache.camel.util.ObjectHelper; @@ -46,12 +43,9 @@ import org.slf4j.LoggerFactory; * <a href="http://aws.amazon.com/s3/">AWS S3</a> * */ -public class S3Consumer extends ScheduledPollConsumer implements BatchConsumer, ShutdownAware { +public class S3Consumer extends ScheduledBatchPollingConsumer { private static final transient Logger LOG = LoggerFactory.getLogger(S3Consumer.class); - - private volatile ShutdownRunningTask shutdownRunningTask; - private volatile int pendingExchanges; public S3Consumer(S3Endpoint endpoint, Processor processor) throws NoFactoryAvailableException { super(endpoint, processor); @@ -68,7 +62,7 @@ public class S3Consumer extends Schedule ListObjectsRequest listObjectsRequest = new ListObjectsRequest(); listObjectsRequest.setBucketName(bucketName); - listObjectsRequest.setMaxKeys(getMaxMessagesPerPoll()); + listObjectsRequest.setMaxKeys(maxMessagesPerPoll); ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest); @@ -165,55 +159,7 @@ public class S3Consumer extends Schedule LOG.warn("Exchange failed, so rolling back message status: {}", exchange); } } - - public boolean isBatchAllowed() { - // stop if we are not running - boolean answer = isRunAllowed(); - if (!answer) { - return false; - } - - if (shutdownRunningTask == null) { - // we are not shutting down so continue to run - return true; - } - - // we are shutting down so only continue if we are configured to complete all tasks - return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask; - } - - public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { - // store a reference what to do in case when shutting down and we have pending messages - this.shutdownRunningTask = shutdownRunningTask; - // do not defer shutdown - return false; - } - - public int getPendingExchangesSize() { - int answer; - // only return the real pending size in case we are configured to complete all tasks - if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { - answer = pendingExchanges; - } else { - answer = 0; - } - - if (answer == 0 && isPolling()) { - // force at least one pending exchange if we are polling as there is a little gap - // in the processBatch method and until an exchange gets enlisted as in-flight - // which happens later, so we need to signal back to the shutdown strategy that - // there is a pending exchange. When we are no longer polling, then we will return 0 - log.trace("Currently polling so returning 1 as pending exchanges"); - answer = 1; - } - return answer; - } - - public void prepareShutdown() { - // noop - } - protected S3Configuration getConfiguration() { return getEndpoint().getConfiguration(); } @@ -226,15 +172,7 @@ public class S3Consumer extends Schedule public S3Endpoint getEndpoint() { return (S3Endpoint) super.getEndpoint(); } - - public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { - getEndpoint().setMaxMessagesPerPoll(maxMessagesPerPoll); - } - - public int getMaxMessagesPerPoll() { - return getEndpoint().getMaxMessagesPerPoll(); - } - + @Override public String toString() { return "S3Consumer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]"; Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java?rev=1231533&r1=1231532&r2=1231533&view=diff ============================================================================== --- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java (original) +++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java Sat Jan 14 17:25:26 2012 @@ -49,7 +49,7 @@ public class S3Endpoint extends Schedule private AmazonS3Client s3Client; private S3Configuration configuration; private int maxMessagesPerPoll = 10; - + @Deprecated public S3Endpoint(String uri, CamelContext context, S3Configuration configuration) { super(uri, context); @@ -63,6 +63,7 @@ public class S3Endpoint extends Schedule public Consumer createConsumer(Processor processor) throws Exception { S3Consumer s3Consumer = new S3Consumer(this, processor); configureConsumer(s3Consumer); + s3Consumer.setMaxMessagesPerPoll(maxMessagesPerPoll); return s3Consumer; } @@ -178,7 +179,7 @@ public class S3Endpoint extends Schedule } return client; } - + public int getMaxMessagesPerPoll() { return maxMessagesPerPoll; } Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java?rev=1231533&r1=1231532&r2=1231533&view=diff ============================================================================== --- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java (original) +++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java Sat Jan 14 17:25:26 2012 @@ -27,13 +27,10 @@ import com.amazonaws.services.sqs.model. import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.amazonaws.services.sqs.model.ReceiveMessageResult; -import org.apache.camel.BatchConsumer; import org.apache.camel.Exchange; import org.apache.camel.NoFactoryAvailableException; import org.apache.camel.Processor; -import org.apache.camel.ShutdownRunningTask; -import org.apache.camel.impl.ScheduledPollConsumer; -import org.apache.camel.spi.ShutdownAware; +import org.apache.camel.impl.ScheduledBatchPollingConsumer; import org.apache.camel.spi.Synchronization; import org.apache.camel.util.CastUtils; import org.apache.camel.util.ObjectHelper; @@ -47,12 +44,9 @@ import org.slf4j.LoggerFactory; * <a href="http://aws.amazon.com/sqs/">AWS SQS</a> * */ -public class SqsConsumer extends ScheduledPollConsumer implements BatchConsumer, ShutdownAware { +public class SqsConsumer extends ScheduledBatchPollingConsumer { private static final transient Logger LOG = LoggerFactory.getLogger(SqsConsumer.class); - - private volatile ShutdownRunningTask shutdownRunningTask; - private volatile int pendingExchanges; public SqsConsumer(SqsEndpoint endpoint, Processor processor) throws NoFactoryAvailableException { super(endpoint, processor); @@ -143,7 +137,7 @@ public class SqsConsumer extends Schedul LOG.trace("Deleting message with receipt handle {}...", receiptHandle); getClient().deleteMessage(deleteRequest); - + LOG.trace("Message deleted"); } } catch (AmazonClientException e) { @@ -165,55 +159,7 @@ public class SqsConsumer extends Schedul LOG.warn("Exchange failed, so rolling back message status: {}", exchange); } } - - public boolean isBatchAllowed() { - // stop if we are not running - boolean answer = isRunAllowed(); - if (!answer) { - return false; - } - - if (shutdownRunningTask == null) { - // we are not shutting down so continue to run - return true; - } - // we are shutting down so only continue if we are configured to complete all tasks - return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask; - } - - public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { - // store a reference what to do in case when shutting down and we have pending messages - this.shutdownRunningTask = shutdownRunningTask; - // do not defer shutdown - return false; - } - - public int getPendingExchangesSize() { - int answer; - // only return the real pending size in case we are configured to complete all tasks - if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { - answer = pendingExchanges; - } else { - answer = 0; - } - - if (answer == 0 && isPolling()) { - // force at least one pending exchange if we are polling as there is a little gap - // in the processBatch method and until an exchange gets enlisted as in-flight - // which happens later, so we need to signal back to the shutdown strategy that - // there is a pending exchange. When we are no longer polling, then we will return 0 - log.trace("Currently polling so returning 1 as pending exchanges"); - answer = 1; - } - - return answer; - } - - public void prepareShutdown() { - // noop - } - protected SqsConfiguration getConfiguration() { return getEndpoint().getConfiguration(); } @@ -230,15 +176,7 @@ public class SqsConsumer extends Schedul public SqsEndpoint getEndpoint() { return (SqsEndpoint) super.getEndpoint(); } - - public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { - getEndpoint().setMaxMessagesPerPoll(maxMessagesPerPoll); - } - - public int getMaxMessagesPerPoll() { - return getEndpoint().getMaxMessagesPerPoll(); - } - + @Override public String toString() { return "SqsConsumer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]"; Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java?rev=1231533&r1=1231532&r2=1231533&view=diff ============================================================================== --- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java (original) +++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java Sat Jan 14 17:25:26 2012 @@ -63,6 +63,7 @@ public class SqsEndpoint extends Schedul public Consumer createConsumer(Processor processor) throws Exception { SqsConsumer sqsConsumer = new SqsConsumer(this, processor); configureConsumer(sqsConsumer); + sqsConsumer.setMaxMessagesPerPoll(maxMessagesPerPoll); return sqsConsumer; } Modified: camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java?rev=1231533&r1=1231532&r2=1231533&view=diff ============================================================================== --- camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java (original) +++ camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java Sat Jan 14 17:25:26 2012 @@ -20,14 +20,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Queue; -import org.apache.camel.BatchConsumer; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Message; import org.apache.camel.Processor; -import org.apache.camel.ShutdownRunningTask; -import org.apache.camel.impl.ScheduledPollConsumer; -import org.apache.camel.spi.ShutdownAware; +import org.apache.camel.impl.ScheduledBatchPollingConsumer; import org.apache.camel.util.CastUtils; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; @@ -38,7 +35,7 @@ import org.slf4j.LoggerFactory; * * @see org.apache.camel.component.ibatis.strategy.IBatisProcessingStrategy */ -public class IBatisConsumer extends ScheduledPollConsumer implements BatchConsumer, ShutdownAware { +public class IBatisConsumer extends ScheduledBatchPollingConsumer { private static final Logger LOG = LoggerFactory.getLogger(IBatisConsumer.class); @@ -49,9 +46,6 @@ public class IBatisConsumer extends Sche } } - protected volatile ShutdownRunningTask shutdownRunningTask; - protected volatile int pendingExchanges; - /** * Statement to run after data has been processed in the route */ @@ -116,10 +110,6 @@ public class IBatisConsumer extends Sche return processBatch(CastUtils.cast(answer)); } - public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { - this.maxMessagesPerPoll = maxMessagesPerPoll; - } - public int processBatch(Queue<Object> exchanges) throws Exception { final IBatisEndpoint endpoint = getEndpoint(); @@ -161,54 +151,6 @@ public class IBatisConsumer extends Sche return total; } - public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { - // store a reference what to do in case when shutting down and we have pending messages - this.shutdownRunningTask = shutdownRunningTask; - // do not defer shutdown - return false; - } - - public int getPendingExchangesSize() { - int answer; - // only return the real pending size in case we are configured to complete all tasks - if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { - answer = pendingExchanges; - } else { - answer = 0; - } - - if (answer == 0 && isPolling()) { - // force at least one pending exchange if we are polling as there is a little gap - // in the processBatch method and until an exchange gets enlisted as in-flight - // which happens later, so we need to signal back to the shutdown strategy that - // there is a pending exchange. When we are no longer polling, then we will return 0 - log.trace("Currently polling so returning 1 as pending exchanges"); - answer = 1; - } - - return answer; - } - - public void prepareShutdown() { - // noop - } - - public boolean isBatchAllowed() { - // stop if we are not running - boolean answer = isRunAllowed(); - if (!answer) { - return false; - } - - if (shutdownRunningTask == null) { - // we are not shutting down so continue to run - return true; - } - - // we are shutting down so only continue if we are configured to complete all tasks - return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask; - } - private Exchange createExchange(Object data) { final IBatisEndpoint endpoint = getEndpoint(); final Exchange exchange = endpoint.createExchange(ExchangePattern.InOnly); Modified: camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java?rev=1231533&r1=1231532&r2=1231533&view=diff ============================================================================== --- camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java (original) +++ camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java Sat Jan 14 17:25:26 2012 @@ -23,6 +23,8 @@ import org.apache.camel.BatchConsumer; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.ShutdownRunningTask; +import org.apache.camel.impl.ScheduledBatchPollingConsumer; +import org.apache.camel.impl.ScheduledPollConsumer; import org.apache.camel.spi.ShutdownAware; import org.apache.camel.spi.Synchronization; import org.apache.camel.util.CastUtils; @@ -33,21 +35,14 @@ import org.jclouds.blobstore.options.Lis import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class JcloudsBlobStoreConsumer extends JcloudsConsumer implements BatchConsumer, ShutdownAware { +public class JcloudsBlobStoreConsumer extends ScheduledBatchPollingConsumer { private static final Logger LOG = LoggerFactory.getLogger(JcloudsBlobStoreConsumer.class); - private final JcloudsBlobStoreEndpoint endpoint; - private final String container; private final BlobStore blobStore; - private int maxMessagesPerPoll = 10; - private volatile ShutdownRunningTask shutdownRunningTask; - private volatile int pendingExchanges; - - public JcloudsBlobStoreConsumer(JcloudsBlobStoreEndpoint endpoint, Processor processor, BlobStore blobStore) { super(endpoint, processor); this.blobStore = blobStore; @@ -75,11 +70,6 @@ public class JcloudsBlobStoreConsumer ex return queue.isEmpty() ? 0 : processBatch(CastUtils.cast(queue)); } - @Override - public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { - this.maxMessagesPerPoll = maxMessagesPerPoll; - } - public int processBatch(Queue<Object> exchanges) throws Exception { int total = exchanges.size(); @@ -112,53 +102,4 @@ public class JcloudsBlobStoreConsumer ex return total; } - - public boolean isBatchAllowed() { - // stop if we are not running - boolean answer = isRunAllowed(); - if (!answer) { - return false; - } - - if (shutdownRunningTask == null) { - // we are not shutting down so continue to run - return true; - } - - // we are shutting down so only continue if we are configured to complete all tasks - return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask; - } - - public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { - // store a reference what to do in case when shutting down and we have pending messages - this.shutdownRunningTask = shutdownRunningTask; - // do not defer shutdown - return false; - } - - public int getPendingExchangesSize() { - int answer; - // only return the real pending size in case we are configured to complete all tasks - if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { - answer = pendingExchanges; - } else { - answer = 0; - } - - if (answer == 0 && isPolling()) { - // force at least one pending exchange if we are polling as there is a little gap - // in the processBatch method and until an exchange gets enlisted as in-flight - // which happens later, so we need to signal back to the shutdown strategy that - // there is a pending exchange. When we are no longer polling, then we will return 0 - log.trace("Currently polling so returning 1 as pending exchanges"); - answer = 1; - } - - return answer; - } - - @Override - public void prepareShutdown() { - //Empty method - } } Modified: camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?rev=1231533&r1=1231532&r2=1231533&view=diff ============================================================================== --- camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java (original) +++ camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java Sat Jan 14 17:25:26 2012 @@ -27,12 +27,10 @@ import javax.persistence.LockModeType; import javax.persistence.PersistenceException; import javax.persistence.Query; -import org.apache.camel.BatchConsumer; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.ShutdownRunningTask; -import org.apache.camel.impl.ScheduledPollConsumer; -import org.apache.camel.spi.ShutdownAware; +import org.apache.camel.impl.ScheduledBatchPollingConsumer; import org.apache.camel.util.CastUtils; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; @@ -42,7 +40,7 @@ import org.springframework.orm.jpa.JpaCa /** * @version */ -public class JpaConsumer extends ScheduledPollConsumer implements BatchConsumer, ShutdownAware { +public class JpaConsumer extends ScheduledBatchPollingConsumer { private static final transient Logger LOG = LoggerFactory.getLogger(JpaConsumer.class); private final JpaEndpoint endpoint; @@ -53,10 +51,7 @@ public class JpaConsumer extends Schedul private String namedQuery; private String nativeQuery; private Class<?> resultClass; - private int maxMessagesPerPoll; private boolean transacted; - private volatile ShutdownRunningTask shutdownRunningTask; - private volatile int pendingExchanges; private static final class DataHolder { private Exchange exchange; @@ -128,9 +123,6 @@ public class JpaConsumer extends Schedul return endpoint.getCamelContext().getTypeConverter().convertTo(int.class, messagePolled); } - public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { - this.maxMessagesPerPoll = maxMessagesPerPoll; - } public int processBatch(Queue<Object> exchanges) throws Exception { int total = exchanges.size(); @@ -172,54 +164,6 @@ public class JpaConsumer extends Schedul return total; } - public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { - // store a reference what to do in case when shutting down and we have pending messages - this.shutdownRunningTask = shutdownRunningTask; - // do not defer shutdown - return false; - } - - public int getPendingExchangesSize() { - int answer; - // only return the real pending size in case we are configured to complete all tasks - if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { - answer = pendingExchanges; - } else { - answer = 0; - } - - if (answer == 0 && isPolling()) { - // force at least one pending exchange if we are polling as there is a little gap - // in the processBatch method and until an exchange gets enlisted as in-flight - // which happens later, so we need to signal back to the shutdown strategy that - // there is a pending exchange. When we are no longer polling, then we will return 0 - log.trace("Currently polling so returning 1 as pending exchanges"); - answer = 1; - } - - return answer; - } - - public void prepareShutdown() { - // noop - } - - public boolean isBatchAllowed() { - // stop if we are not running - boolean answer = isRunAllowed(); - if (!answer) { - return false; - } - - if (shutdownRunningTask == null) { - // we are not shutting down so continue to run - return true; - } - - // we are shutting down so only continue if we are configured to complete all tasks - return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask; - } - // Properties // ------------------------------------------------------------------------- @Override Modified: camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java?rev=1231533&r1=1231532&r2=1231533&view=diff ============================================================================== --- camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java (original) +++ camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java Sat Jan 14 17:25:26 2012 @@ -19,13 +19,11 @@ package org.apache.camel.component.krati import java.util.Iterator; import java.util.LinkedList; import java.util.Queue; + import krati.store.DataStore; -import org.apache.camel.BatchConsumer; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.ShutdownRunningTask; -import org.apache.camel.impl.ScheduledPollConsumer; -import org.apache.camel.spi.ShutdownAware; +import org.apache.camel.impl.ScheduledBatchPollingConsumer; import org.apache.camel.spi.Synchronization; import org.apache.camel.util.CastUtils; import org.apache.camel.util.ObjectHelper; @@ -36,7 +34,7 @@ import org.slf4j.LoggerFactory; /** * The Krati consumer. */ -public class KratiConsumer extends ScheduledPollConsumer implements BatchConsumer, ShutdownAware { +public class KratiConsumer extends ScheduledBatchPollingConsumer { private static final transient Logger LOG = LoggerFactory.getLogger(KratiConsumer.class); @@ -44,9 +42,6 @@ public class KratiConsumer extends Sched protected DataStore<Object, Object> dataStore; protected int maxMessagesPerPoll = 10; - protected volatile ShutdownRunningTask shutdownRunningTask; - protected volatile int pendingExchanges; - public KratiConsumer(KratiEndpoint endpoint, Processor processor, DataStore<Object, Object> dataStore) { super(endpoint, processor); this.endpoint = endpoint; @@ -72,21 +67,6 @@ public class KratiConsumer extends Sched return queue.isEmpty() ? 0 : processBatch(CastUtils.cast(queue)); } - /** - * Sets a maximum number of messages as a limit to poll at each polling. - * <p/> - * Can be used to limit eg to 100 to avoid when starting and there are millions - * of messages for you in the first poll. - * <p/> - * Default value is 10. - * - * @param maxMessagesPerPoll maximum messages to poll. - */ - @Override - public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { - this.maxMessagesPerPoll = maxMessagesPerPoll; - } - @Override public int processBatch(Queue<Object> exchanges) throws Exception { int total = exchanges.size(); @@ -123,55 +103,4 @@ public class KratiConsumer extends Sched return total; } - - @Override - public boolean isBatchAllowed() { - // stop if we are not running - boolean answer = isRunAllowed(); - if (!answer) { - return false; - } - - if (shutdownRunningTask == null) { - // we are not shutting down so continue to run - return true; - } - - // we are shutting down so only continue if we are configured to complete all tasks - return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask; - } - - @Override - public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { - // store a reference what to do in case when shutting down and we have pending messages - this.shutdownRunningTask = shutdownRunningTask; - // do not defer shutdown - return false; - } - - @Override - public int getPendingExchangesSize() { - int answer; - // only return the real pending size in case we are configured to complete all tasks - if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { - answer = pendingExchanges; - } else { - answer = 0; - } - - if (answer == 0 && isPolling()) { - // force at least one pending exchange if we are polling as there is a little gap - // in the processBatch method and until an exchange gets enlisted as in-flight - // which happens later, so we need to signal back to the shutdown strategy that - // there is a pending exchange. When we are no longer polling, then we will return 0 - log.trace("Currently polling so returning 1 as pending exchanges"); - answer = 1; - } - - return answer; - } - - @Override - public void prepareShutdown() { - } } Modified: camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java?rev=1231533&r1=1231532&r2=1231533&view=diff ============================================================================== --- camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java (original) +++ camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java Sat Jan 14 17:25:26 2012 @@ -29,12 +29,9 @@ import javax.mail.MessagingException; import javax.mail.Store; import javax.mail.search.FlagTerm; -import org.apache.camel.BatchConsumer; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.ShutdownRunningTask; -import org.apache.camel.impl.ScheduledPollConsumer; -import org.apache.camel.spi.ShutdownAware; +import org.apache.camel.impl.ScheduledBatchPollingConsumer; import org.apache.camel.spi.Synchronization; import org.apache.camel.util.CastUtils; import org.apache.camel.util.ObjectHelper; @@ -45,7 +42,7 @@ import org.slf4j.LoggerFactory; * A {@link org.apache.camel.Consumer Consumer} which consumes messages from JavaMail using a * {@link javax.mail.Transport Transport} and dispatches them to the {@link Processor} */ -public class MailConsumer extends ScheduledPollConsumer implements BatchConsumer, ShutdownAware { +public class MailConsumer extends ScheduledBatchPollingConsumer { public static final String POP3_UID = "CamelPop3Uid"; public static final long DEFAULT_CONSUMER_DELAY = 60 * 1000L; private static final transient Logger LOG = LoggerFactory.getLogger(MailConsumer.class); @@ -53,9 +50,6 @@ public class MailConsumer extends Schedu private final JavaMailSender sender; private Folder folder; private Store store; - private int maxMessagesPerPoll; - private volatile ShutdownRunningTask shutdownRunningTask; - private volatile int pendingExchanges; public MailConsumer(MailEndpoint endpoint, Processor processor, JavaMailSender sender) { super(endpoint, processor); @@ -152,10 +146,6 @@ public class MailConsumer extends Schedu return polledMessages; } - public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { - this.maxMessagesPerPoll = maxMessagesPerPoll; - } - public int processBatch(Queue<Object> exchanges) throws Exception { int total = exchanges.size(); @@ -202,54 +192,6 @@ public class MailConsumer extends Schedu return total; } - public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { - // store a reference what to do in case when shutting down and we have pending messages - this.shutdownRunningTask = shutdownRunningTask; - // do not defer shutdown - return false; - } - - public int getPendingExchangesSize() { - int answer; - // only return the real pending size in case we are configured to complete all tasks - if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { - answer = pendingExchanges; - } else { - answer = 0; - } - - if (answer == 0 && isPolling()) { - // force at least one pending exchange if we are polling as there is a little gap - // in the processBatch method and until an exchange gets enlisted as in-flight - // which happens later, so we need to signal back to the shutdown strategy that - // there is a pending exchange. When we are no longer polling, then we will return 0 - log.trace("Currently polling so returning 1 as pending exchanges"); - answer = 1; - } - - return answer; - } - - public void prepareShutdown() { - // noop - } - - public boolean isBatchAllowed() { - // stop if we are not running - boolean answer = isRunAllowed(); - if (!answer) { - return false; - } - - if (shutdownRunningTask == null) { - // we are not shutting down so continue to run - return true; - } - - // we are shutting down so only continue if we are configured to complete all tasks - return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask; - } - protected Queue<Exchange> createExchanges(Message[] messages) throws MessagingException { Queue<Exchange> answer = new LinkedList<Exchange>(); Modified: camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java?rev=1231533&r1=1231532&r2=1231533&view=diff ============================================================================== --- camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java (original) +++ camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java Sat Jan 14 17:25:26 2012 @@ -20,14 +20,12 @@ import java.util.LinkedList; import java.util.List; import java.util.Queue; -import org.apache.camel.BatchConsumer; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.ShutdownRunningTask; -import org.apache.camel.impl.ScheduledPollConsumer; -import org.apache.camel.spi.ShutdownAware; +import org.apache.camel.impl.ScheduledBatchPollingConsumer; import org.apache.camel.util.CastUtils; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; @@ -38,7 +36,7 @@ import org.slf4j.LoggerFactory; * * @version */ -public class MyBatisConsumer extends ScheduledPollConsumer implements BatchConsumer, ShutdownAware { +public class MyBatisConsumer extends ScheduledBatchPollingConsumer { private static final Logger LOG = LoggerFactory.getLogger(MyBatisConsumer.class); @@ -116,10 +114,6 @@ public class MyBatisConsumer extends Sch return processBatch(CastUtils.cast(answer)); } - public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { - this.maxMessagesPerPoll = maxMessagesPerPoll; - } - public int processBatch(Queue<Object> exchanges) throws Exception { final MyBatisEndpoint endpoint = getEndpoint(); @@ -161,54 +155,6 @@ public class MyBatisConsumer extends Sch return total; } - public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { - // store a reference what to do in case when shutting down and we have pending messages - this.shutdownRunningTask = shutdownRunningTask; - // do not defer shutdown - return false; - } - - public int getPendingExchangesSize() { - int answer; - // only return the real pending size in case we are configured to complete all tasks - if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { - answer = pendingExchanges; - } else { - answer = 0; - } - - if (answer == 0 && isPolling()) { - // force at least one pending exchange if we are polling as there is a little gap - // in the processBatch method and until an exchange gets enlisted as in-flight - // which happens later, so we need to signal back to the shutdown strategy that - // there is a pending exchange. When we are no longer polling, then we will return 0 - log.trace("Currently polling so returning 1 as pending exchanges"); - answer = 1; - } - - return answer; - } - - public void prepareShutdown() { - // noop - } - - public boolean isBatchAllowed() { - // stop if we are not running - boolean answer = isRunAllowed(); - if (!answer) { - return false; - } - - if (shutdownRunningTask == null) { - // we are not shutting down so continue to run - return true; - } - - // we are shutting down so only continue if we are configured to complete all tasks - return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask; - } - private Exchange createExchange(Object data) { final MyBatisEndpoint endpoint = getEndpoint(); final Exchange exchange = endpoint.createExchange(ExchangePattern.InOnly);