Author: davsclaus
Date: Sat Jan 14 17:25:26 2012
New Revision: 1231533

URL: http://svn.apache.org/viewvc?rev=1231533&view=rev
Log:
CAMEL-4577: Added a ScheduledBatchPollingConsumer to reuse logic. Thanks to 
Bilgin for the patch.

Added:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
    
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
    
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java
    
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
    
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
    
camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java
    
camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java
    
camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
    
camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java
    
camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
    
camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1231533&r1=1231532&r2=1231533&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
 Sat Jan 14 17:25:26 2012
@@ -23,12 +23,10 @@ import java.util.List;
 import java.util.Queue;
 
 import org.apache.camel.AsyncCallback;
-import org.apache.camel.BatchConsumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.ShutdownRunningTask;
-import org.apache.camel.impl.ScheduledPollConsumer;
-import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.StopWatch;
@@ -39,13 +37,12 @@ import org.slf4j.LoggerFactory;
 /**
  * Base class for file consumers.
  */
-public abstract class GenericFileConsumer<T> extends ScheduledPollConsumer 
implements BatchConsumer, ShutdownAware {
+public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsumer {
     protected final transient Logger log = LoggerFactory.getLogger(getClass());
     protected GenericFileEndpoint<T> endpoint;
     protected GenericFileOperations<T> operations;
     protected boolean loggedIn;
     protected String fileExpressionResult;
-    protected int maxMessagesPerPoll;
     protected volatile ShutdownRunningTask shutdownRunningTask;
     protected volatile int pendingExchanges;
     protected Processor customProcessor;
@@ -140,9 +137,6 @@ public abstract class GenericFileConsume
         return polledMessages;
     }
 
-    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
-        this.maxMessagesPerPoll = maxMessagesPerPoll;
-    }
 
     @SuppressWarnings("unchecked")
     public int processBatch(Queue<Object> exchanges) {
@@ -187,53 +181,6 @@ public abstract class GenericFileConsume
         return total;
     }
 
-    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
-        // store a reference what to do in case when shutting down and we have 
pending messages
-        this.shutdownRunningTask = shutdownRunningTask;
-        // do not defer shutdown
-        return false;
-    }
-
-    public int getPendingExchangesSize() {
-        int answer;
-        // only return the real pending size in case we are configured to 
complete all tasks
-        if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
-            answer = pendingExchanges;
-        } else {
-            answer = 0;
-        }
-
-        if (answer == 0 && isPolling()) {
-            // force at least one pending exchange if we are polling as there 
is a little gap
-            // in the processBatch method and until an exchange gets enlisted 
as in-flight
-            // which happens later, so we need to signal back to the shutdown 
strategy that
-            // there is a pending exchange. When we are no longer polling, 
then we will return 0
-            log.trace("Currently polling so returning 1 as pending exchanges");
-            answer = 1;
-        }
-
-        return answer;
-    }
-
-    public void prepareShutdown() {
-        // noop
-    }
-
-    public boolean isBatchAllowed() {
-        // stop if we are not running
-        boolean answer = isRunAllowed();
-        if (!answer) {
-            return false;
-        }
-
-        if (shutdownRunningTask == null) {
-            // we are not shutting down so continue to run
-            return true;
-        }
-
-        // we are shutting down so only continue if we are configured to 
complete all tasks
-        return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask;
-    }
 
     /**
      * Whether or not we can continue polling for more files

Added: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java?rev=1231533&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java
 (added)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java
 Sat Jan 14 17:25:26 2012
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.BatchConsumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.spi.ShutdownAware;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A useful base class for any consumer which is polling batch based
+ */
+public abstract class ScheduledBatchPollingConsumer extends 
ScheduledPollConsumer implements BatchConsumer, ShutdownAware {
+    private static final transient Logger log = 
LoggerFactory.getLogger(ScheduledBatchPollingConsumer.class);
+    protected volatile ShutdownRunningTask shutdownRunningTask;
+    protected volatile int pendingExchanges;
+    protected int maxMessagesPerPoll;
+
+    public ScheduledBatchPollingConsumer(Endpoint endpoint, Processor 
processor) {
+        super(endpoint, processor);
+    }
+
+    public ScheduledBatchPollingConsumer(Endpoint endpoint, Processor 
processor, ScheduledExecutorService executor) {
+        super(endpoint, processor, executor);
+    }
+
+    @Override
+    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
+        // store a reference what to do in case when shutting down and we have 
pending messages
+        this.shutdownRunningTask = shutdownRunningTask;
+        // do not defer shutdown
+        return false;
+    }
+
+    @Override
+    public int getPendingExchangesSize() {
+        int answer;
+        // only return the real pending size in case we are configured to 
complete all tasks
+        if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
+            answer = pendingExchanges;
+        } else {
+            answer = 0;
+        }
+
+        if (answer == 0 && isPolling()) {
+            // force at least one pending exchange if we are polling as there 
is a little gap
+            // in the processBatch method and until an exchange gets enlisted 
as in-flight
+            // which happens later, so we need to signal back to the shutdown 
strategy that
+            // there is a pending exchange. When we are no longer polling, 
then we will return 0
+            log.trace("Currently polling so returning 1 as pending exchanges");
+            answer = 1;
+        }
+
+        return answer;
+    }
+
+    @Override
+    public void prepareShutdown() {
+        // reset task as the state of the task is not to be preserved
+        // which otherwise may cause isBatchAllowed() to return a wrong answer
+        this.shutdownRunningTask = null;
+    }
+
+    @Override
+    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+        this.maxMessagesPerPoll = maxMessagesPerPoll;
+    }
+
+    /**
+     * Gets the maximum number of messages as a limit to poll at each polling.
+     * <p/>
+     * Is default unlimited, but use 0 or negative number to disable it as 
unlimited.
+     *
+     * @return max messages to poll
+     */
+    public int getMaxMessagesPerPoll() {
+        return maxMessagesPerPoll;
+    }
+
+    @Override
+    public boolean isBatchAllowed() {
+        // stop if we are not running
+        boolean answer = isRunAllowed();
+        if (!answer) {
+            return false;
+        }
+
+        if (shutdownRunningTask == null) {
+            // we are not shutting down so continue to run
+            return true;
+        }
+
+        // we are shutting down so only continue if we are configured to 
complete all tasks
+        return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask;
+    }
+}

Modified: 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java?rev=1231533&r1=1231532&r2=1231533&view=diff
==============================================================================
--- 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
 (original)
+++ 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
 Sat Jan 14 17:25:26 2012
@@ -27,13 +27,10 @@ import com.amazonaws.services.s3.model.O
 import com.amazonaws.services.s3.model.S3Object;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 
-import org.apache.camel.BatchConsumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.NoFactoryAvailableException;
 import org.apache.camel.Processor;
-import org.apache.camel.ShutdownRunningTask;
-import org.apache.camel.impl.ScheduledPollConsumer;
-import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
@@ -46,12 +43,9 @@ import org.slf4j.LoggerFactory;
  * <a href="http://aws.amazon.com/s3/";>AWS S3</a>
  * 
  */
-public class S3Consumer extends ScheduledPollConsumer implements 
BatchConsumer, ShutdownAware {
+public class S3Consumer extends ScheduledBatchPollingConsumer {
     
     private static final transient Logger LOG = 
LoggerFactory.getLogger(S3Consumer.class);
-    
-    private volatile ShutdownRunningTask shutdownRunningTask;
-    private volatile int pendingExchanges;
 
     public S3Consumer(S3Endpoint endpoint, Processor processor) throws 
NoFactoryAvailableException {
         super(endpoint, processor);
@@ -68,7 +62,7 @@ public class S3Consumer extends Schedule
         
         ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
         listObjectsRequest.setBucketName(bucketName);
-        listObjectsRequest.setMaxKeys(getMaxMessagesPerPoll());
+        listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
         
         ObjectListing listObjects = 
getAmazonS3Client().listObjects(listObjectsRequest);
         
@@ -165,55 +159,7 @@ public class S3Consumer extends Schedule
             LOG.warn("Exchange failed, so rolling back message status: {}", 
exchange);
         }
     }
-    
-    public boolean isBatchAllowed() {
-        // stop if we are not running
-        boolean answer = isRunAllowed();
-        if (!answer) {
-            return false;
-        }
-
-        if (shutdownRunningTask == null) {
-            // we are not shutting down so continue to run
-            return true;
-        }
-
-        // we are shutting down so only continue if we are configured to 
complete all tasks
-        return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask;
-    }
-
-    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
-        // store a reference what to do in case when shutting down and we have 
pending messages
-        this.shutdownRunningTask = shutdownRunningTask;
-        // do not defer shutdown
-        return false;
-    }
-
-    public int getPendingExchangesSize() {
-        int answer;
-        // only return the real pending size in case we are configured to 
complete all tasks
-        if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
-            answer = pendingExchanges;
-        } else {
-            answer = 0;
-        }
-
-        if (answer == 0 && isPolling()) {
-            // force at least one pending exchange if we are polling as there 
is a little gap
-            // in the processBatch method and until an exchange gets enlisted 
as in-flight
-            // which happens later, so we need to signal back to the shutdown 
strategy that
-            // there is a pending exchange. When we are no longer polling, 
then we will return 0
-            log.trace("Currently polling so returning 1 as pending exchanges");
-            answer = 1;
-        }
 
-        return answer;
-    }
-
-    public void prepareShutdown() {
-     // noop
-    }
-    
     protected S3Configuration getConfiguration() {
         return getEndpoint().getConfiguration();
     }
@@ -226,15 +172,7 @@ public class S3Consumer extends Schedule
     public S3Endpoint getEndpoint() {
         return (S3Endpoint) super.getEndpoint();
     }
-    
-    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
-        getEndpoint().setMaxMessagesPerPoll(maxMessagesPerPoll);
-    }
-    
-    public int getMaxMessagesPerPoll() {
-        return getEndpoint().getMaxMessagesPerPoll();
-    }
-    
+
     @Override
     public String toString() {
         return "S3Consumer[" + 
URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";

Modified: 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java?rev=1231533&r1=1231532&r2=1231533&view=diff
==============================================================================
--- 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java
 (original)
+++ 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java
 Sat Jan 14 17:25:26 2012
@@ -49,7 +49,7 @@ public class S3Endpoint extends Schedule
     private AmazonS3Client s3Client;
     private S3Configuration configuration;
     private int maxMessagesPerPoll = 10;
-    
+
     @Deprecated
     public S3Endpoint(String uri, CamelContext context, S3Configuration 
configuration) {
         super(uri, context);
@@ -63,6 +63,7 @@ public class S3Endpoint extends Schedule
     public Consumer createConsumer(Processor processor) throws Exception {
         S3Consumer s3Consumer = new S3Consumer(this, processor);
         configureConsumer(s3Consumer);
+        s3Consumer.setMaxMessagesPerPoll(maxMessagesPerPoll);
         return s3Consumer;
     }
 
@@ -178,7 +179,7 @@ public class S3Endpoint extends Schedule
         }
         return client;
     }
-    
+
     public int getMaxMessagesPerPoll() {
         return maxMessagesPerPoll;
     }

Modified: 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java?rev=1231533&r1=1231532&r2=1231533&view=diff
==============================================================================
--- 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
 (original)
+++ 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
 Sat Jan 14 17:25:26 2012
@@ -27,13 +27,10 @@ import com.amazonaws.services.sqs.model.
 import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
 import com.amazonaws.services.sqs.model.ReceiveMessageResult;
 
-import org.apache.camel.BatchConsumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.NoFactoryAvailableException;
 import org.apache.camel.Processor;
-import org.apache.camel.ShutdownRunningTask;
-import org.apache.camel.impl.ScheduledPollConsumer;
-import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
@@ -47,12 +44,9 @@ import org.slf4j.LoggerFactory;
  * <a href="http://aws.amazon.com/sqs/";>AWS SQS</a>
  * 
  */
-public class SqsConsumer extends ScheduledPollConsumer implements 
BatchConsumer, ShutdownAware {
+public class SqsConsumer extends ScheduledBatchPollingConsumer {
     
     private static final transient Logger LOG = 
LoggerFactory.getLogger(SqsConsumer.class);
-    
-    private volatile ShutdownRunningTask shutdownRunningTask;
-    private volatile int pendingExchanges;
 
     public SqsConsumer(SqsEndpoint endpoint, Processor processor) throws 
NoFactoryAvailableException {
         super(endpoint, processor);
@@ -143,7 +137,7 @@ public class SqsConsumer extends Schedul
                 LOG.trace("Deleting message with receipt handle {}...", 
receiptHandle);
                 
                 getClient().deleteMessage(deleteRequest);
-                
+
                 LOG.trace("Message deleted");
             }
         } catch (AmazonClientException e) {
@@ -165,55 +159,7 @@ public class SqsConsumer extends Schedul
             LOG.warn("Exchange failed, so rolling back message status: {}", 
exchange);
         }
     }
-    
-    public boolean isBatchAllowed() {
-        // stop if we are not running
-        boolean answer = isRunAllowed();
-        if (!answer) {
-            return false;
-        }
-
-        if (shutdownRunningTask == null) {
-            // we are not shutting down so continue to run
-            return true;
-        }
 
-        // we are shutting down so only continue if we are configured to 
complete all tasks
-        return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask;
-    }
-
-    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
-        // store a reference what to do in case when shutting down and we have 
pending messages
-        this.shutdownRunningTask = shutdownRunningTask;
-        // do not defer shutdown
-        return false;
-    }
-
-    public int getPendingExchangesSize() {
-        int answer;
-        // only return the real pending size in case we are configured to 
complete all tasks
-        if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
-            answer = pendingExchanges;
-        } else {
-            answer = 0;
-        }
-
-        if (answer == 0 && isPolling()) {
-            // force at least one pending exchange if we are polling as there 
is a little gap
-            // in the processBatch method and until an exchange gets enlisted 
as in-flight
-            // which happens later, so we need to signal back to the shutdown 
strategy that
-            // there is a pending exchange. When we are no longer polling, 
then we will return 0
-            log.trace("Currently polling so returning 1 as pending exchanges");
-            answer = 1;
-        }
-
-        return answer;
-    }
-
-    public void prepareShutdown() {
-     // noop
-    }
-    
     protected SqsConfiguration getConfiguration() {
         return getEndpoint().getConfiguration();
     }
@@ -230,15 +176,7 @@ public class SqsConsumer extends Schedul
     public SqsEndpoint getEndpoint() {
         return (SqsEndpoint) super.getEndpoint();
     }
-    
-    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
-        getEndpoint().setMaxMessagesPerPoll(maxMessagesPerPoll);
-    }
-    
-    public int getMaxMessagesPerPoll() {
-        return getEndpoint().getMaxMessagesPerPoll();
-    }
-    
+
     @Override
     public String toString() {
         return "SqsConsumer[" + 
URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";

Modified: 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java?rev=1231533&r1=1231532&r2=1231533&view=diff
==============================================================================
--- 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
 Sat Jan 14 17:25:26 2012
@@ -63,6 +63,7 @@ public class SqsEndpoint extends Schedul
     public Consumer createConsumer(Processor processor) throws Exception {
         SqsConsumer sqsConsumer = new SqsConsumer(this, processor);
         configureConsumer(sqsConsumer);
+        sqsConsumer.setMaxMessagesPerPoll(maxMessagesPerPoll);
         return sqsConsumer;
     }
 

Modified: 
camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java?rev=1231533&r1=1231532&r2=1231533&view=diff
==============================================================================
--- 
camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java
 (original)
+++ 
camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java
 Sat Jan 14 17:25:26 2012
@@ -20,14 +20,11 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 
-import org.apache.camel.BatchConsumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
-import org.apache.camel.ShutdownRunningTask;
-import org.apache.camel.impl.ScheduledPollConsumer;
-import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
@@ -38,7 +35,7 @@ import org.slf4j.LoggerFactory;
  *
  * @see org.apache.camel.component.ibatis.strategy.IBatisProcessingStrategy
  */
-public class IBatisConsumer extends ScheduledPollConsumer implements 
BatchConsumer, ShutdownAware {
+public class IBatisConsumer extends ScheduledBatchPollingConsumer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(IBatisConsumer.class);
 
@@ -49,9 +46,6 @@ public class IBatisConsumer extends Sche
         }
     }
     
-    protected volatile ShutdownRunningTask shutdownRunningTask;
-    protected volatile int pendingExchanges;
-
     /**
      * Statement to run after data has been processed in the route
      */
@@ -116,10 +110,6 @@ public class IBatisConsumer extends Sche
         return processBatch(CastUtils.cast(answer));
     }
 
-    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
-        this.maxMessagesPerPoll = maxMessagesPerPoll;
-    }
-
     public int processBatch(Queue<Object> exchanges) throws Exception {
         final IBatisEndpoint endpoint = getEndpoint();
 
@@ -161,54 +151,6 @@ public class IBatisConsumer extends Sche
         return total;
     }
 
-    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
-        // store a reference what to do in case when shutting down and we have 
pending messages
-        this.shutdownRunningTask = shutdownRunningTask;
-        // do not defer shutdown
-        return false;
-    }
-
-    public int getPendingExchangesSize() {
-        int answer;
-        // only return the real pending size in case we are configured to 
complete all tasks
-        if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
-            answer = pendingExchanges;
-        } else {
-            answer = 0;
-        }
-
-        if (answer == 0 && isPolling()) {
-            // force at least one pending exchange if we are polling as there 
is a little gap
-            // in the processBatch method and until an exchange gets enlisted 
as in-flight
-            // which happens later, so we need to signal back to the shutdown 
strategy that
-            // there is a pending exchange. When we are no longer polling, 
then we will return 0
-            log.trace("Currently polling so returning 1 as pending exchanges");
-            answer = 1;
-        }
-
-        return answer;
-    }
-
-    public void prepareShutdown() {
-        // noop
-    }
-
-    public boolean isBatchAllowed() {
-        // stop if we are not running
-        boolean answer = isRunAllowed();
-        if (!answer) {
-            return false;
-        }
-
-        if (shutdownRunningTask == null) {
-            // we are not shutting down so continue to run
-            return true;
-        }
-
-        // we are shutting down so only continue if we are configured to 
complete all tasks
-        return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask;
-    }
-
     private Exchange createExchange(Object data) {
         final IBatisEndpoint endpoint = getEndpoint();
         final Exchange exchange = 
endpoint.createExchange(ExchangePattern.InOnly);

Modified: 
camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java?rev=1231533&r1=1231532&r2=1231533&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java
 (original)
+++ 
camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java
 Sat Jan 14 17:25:26 2012
@@ -23,6 +23,8 @@ import org.apache.camel.BatchConsumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
+import org.apache.camel.impl.ScheduledPollConsumer;
 import org.apache.camel.spi.ShutdownAware;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.util.CastUtils;
@@ -33,21 +35,14 @@ import org.jclouds.blobstore.options.Lis
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class JcloudsBlobStoreConsumer extends JcloudsConsumer implements 
BatchConsumer, ShutdownAware {
+public class JcloudsBlobStoreConsumer extends ScheduledBatchPollingConsumer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(JcloudsBlobStoreConsumer.class);
-
     private final JcloudsBlobStoreEndpoint endpoint;
-
     private final String container;
     private final BlobStore blobStore;
-
     private int maxMessagesPerPoll = 10;
 
-    private volatile ShutdownRunningTask shutdownRunningTask;
-    private volatile int pendingExchanges;
-
-
     public JcloudsBlobStoreConsumer(JcloudsBlobStoreEndpoint endpoint, 
Processor processor, BlobStore blobStore) {
         super(endpoint, processor);
         this.blobStore = blobStore;
@@ -75,11 +70,6 @@ public class JcloudsBlobStoreConsumer ex
         return queue.isEmpty() ? 0 : processBatch(CastUtils.cast(queue));
     }
 
-    @Override
-    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
-        this.maxMessagesPerPoll = maxMessagesPerPoll;
-    }
-
     public int processBatch(Queue<Object> exchanges) throws Exception {
         int total = exchanges.size();
 
@@ -112,53 +102,4 @@ public class JcloudsBlobStoreConsumer ex
 
         return total;
     }
-
-    public boolean isBatchAllowed() {
-        // stop if we are not running
-        boolean answer = isRunAllowed();
-        if (!answer) {
-            return false;
-        }
-
-        if (shutdownRunningTask == null) {
-            // we are not shutting down so continue to run
-            return true;
-        }
-
-        // we are shutting down so only continue if we are configured to 
complete all tasks
-        return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask;
-    }
-
-    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
-        // store a reference what to do in case when shutting down and we have 
pending messages
-        this.shutdownRunningTask = shutdownRunningTask;
-        // do not defer shutdown
-        return false;
-    }
-
-    public int getPendingExchangesSize() {
-        int answer;
-        // only return the real pending size in case we are configured to 
complete all tasks
-        if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
-            answer = pendingExchanges;
-        } else {
-            answer = 0;
-        }
-
-        if (answer == 0 && isPolling()) {
-            // force at least one pending exchange if we are polling as there 
is a little gap
-            // in the processBatch method and until an exchange gets enlisted 
as in-flight
-            // which happens later, so we need to signal back to the shutdown 
strategy that
-            // there is a pending exchange. When we are no longer polling, 
then we will return 0
-            log.trace("Currently polling so returning 1 as pending exchanges");
-            answer = 1;
-        }
-
-        return answer;
-    }
-
-    @Override
-    public void prepareShutdown() {
-     //Empty method
-    }
 }

Modified: 
camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?rev=1231533&r1=1231532&r2=1231533&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
 (original)
+++ 
camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
 Sat Jan 14 17:25:26 2012
@@ -27,12 +27,10 @@ import javax.persistence.LockModeType;
 import javax.persistence.PersistenceException;
 import javax.persistence.Query;
 
-import org.apache.camel.BatchConsumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.ShutdownRunningTask;
-import org.apache.camel.impl.ScheduledPollConsumer;
-import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
@@ -42,7 +40,7 @@ import org.springframework.orm.jpa.JpaCa
 /**
  * @version 
  */
-public class JpaConsumer extends ScheduledPollConsumer implements 
BatchConsumer, ShutdownAware {
+public class JpaConsumer extends ScheduledBatchPollingConsumer {
 
     private static final transient Logger LOG = 
LoggerFactory.getLogger(JpaConsumer.class);
     private final JpaEndpoint endpoint;
@@ -53,10 +51,7 @@ public class JpaConsumer extends Schedul
     private String namedQuery;
     private String nativeQuery;
     private Class<?> resultClass;
-    private int maxMessagesPerPoll;
     private boolean transacted;
-    private volatile ShutdownRunningTask shutdownRunningTask;
-    private volatile int pendingExchanges;
 
     private static final class DataHolder {
         private Exchange exchange;
@@ -128,9 +123,6 @@ public class JpaConsumer extends Schedul
         return 
endpoint.getCamelContext().getTypeConverter().convertTo(int.class, 
messagePolled);
     }
 
-    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
-        this.maxMessagesPerPoll = maxMessagesPerPoll;
-    }
 
     public int processBatch(Queue<Object> exchanges) throws Exception {
         int total = exchanges.size();
@@ -172,54 +164,6 @@ public class JpaConsumer extends Schedul
         return total;
     }
 
-    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
-        // store a reference what to do in case when shutting down and we have 
pending messages
-        this.shutdownRunningTask = shutdownRunningTask;
-        // do not defer shutdown
-        return false;
-    }
-
-    public int getPendingExchangesSize() {
-        int answer;
-        // only return the real pending size in case we are configured to 
complete all tasks
-        if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
-            answer = pendingExchanges;
-        } else {
-            answer = 0;
-        }
-
-        if (answer == 0 && isPolling()) {
-            // force at least one pending exchange if we are polling as there 
is a little gap
-            // in the processBatch method and until an exchange gets enlisted 
as in-flight
-            // which happens later, so we need to signal back to the shutdown 
strategy that
-            // there is a pending exchange. When we are no longer polling, 
then we will return 0
-            log.trace("Currently polling so returning 1 as pending exchanges");
-            answer = 1;
-        }
-
-        return answer;
-    }
-
-    public void prepareShutdown() {
-        // noop
-    }
-
-    public boolean isBatchAllowed() {
-        // stop if we are not running
-        boolean answer = isRunAllowed();
-        if (!answer) {
-            return false;
-        }
-
-        if (shutdownRunningTask == null) {
-            // we are not shutting down so continue to run
-            return true;
-        }
-
-        // we are shutting down so only continue if we are configured to 
complete all tasks
-        return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask;
-    }
-
     // Properties
     // 
-------------------------------------------------------------------------
     @Override

Modified: 
camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java?rev=1231533&r1=1231532&r2=1231533&view=diff
==============================================================================
--- 
camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java
 (original)
+++ 
camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java
 Sat Jan 14 17:25:26 2012
@@ -19,13 +19,11 @@ package org.apache.camel.component.krati
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Queue;
+
 import krati.store.DataStore;
-import org.apache.camel.BatchConsumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.ShutdownRunningTask;
-import org.apache.camel.impl.ScheduledPollConsumer;
-import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
@@ -36,7 +34,7 @@ import org.slf4j.LoggerFactory;
 /**
  * The Krati consumer.
  */
-public class KratiConsumer extends ScheduledPollConsumer implements 
BatchConsumer, ShutdownAware {
+public class KratiConsumer extends ScheduledBatchPollingConsumer {
 
     private static final transient Logger LOG = 
LoggerFactory.getLogger(KratiConsumer.class);
 
@@ -44,9 +42,6 @@ public class KratiConsumer extends Sched
     protected DataStore<Object, Object> dataStore;
     protected int maxMessagesPerPoll = 10;
 
-    protected volatile ShutdownRunningTask shutdownRunningTask;
-    protected volatile int pendingExchanges;
-
     public KratiConsumer(KratiEndpoint endpoint, Processor processor, 
DataStore<Object, Object> dataStore) {
         super(endpoint, processor);
         this.endpoint = endpoint;
@@ -72,21 +67,6 @@ public class KratiConsumer extends Sched
         return queue.isEmpty() ? 0 : processBatch(CastUtils.cast(queue));
     }
 
-    /**
-     * Sets a maximum number of messages as a limit to poll at each polling.
-     * <p/>
-     * Can be used to limit eg to 100 to avoid when starting and there are 
millions
-     * of messages for you in the first poll.
-     * <p/>
-     * Default value is 10.
-     *
-     * @param maxMessagesPerPoll maximum messages to poll.
-     */
-    @Override
-    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
-        this.maxMessagesPerPoll = maxMessagesPerPoll;
-    }
-
     @Override
     public int processBatch(Queue<Object> exchanges) throws Exception {
         int total = exchanges.size();
@@ -123,55 +103,4 @@ public class KratiConsumer extends Sched
 
         return total;
     }
-
-    @Override
-    public boolean isBatchAllowed() {
-        // stop if we are not running
-        boolean answer = isRunAllowed();
-        if (!answer) {
-            return false;
-        }
-
-        if (shutdownRunningTask == null) {
-            // we are not shutting down so continue to run
-            return true;
-        }
-
-        // we are shutting down so only continue if we are configured to 
complete all tasks
-        return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask;
-    }
-
-    @Override
-    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
-        // store a reference what to do in case when shutting down and we have 
pending messages
-        this.shutdownRunningTask = shutdownRunningTask;
-        // do not defer shutdown
-        return false;
-    }
-
-    @Override
-    public int getPendingExchangesSize() {
-        int answer;
-        // only return the real pending size in case we are configured to 
complete all tasks
-        if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
-            answer = pendingExchanges;
-        } else {
-            answer = 0;
-        }
-
-        if (answer == 0 && isPolling()) {
-            // force at least one pending exchange if we are polling as there 
is a little gap
-            // in the processBatch method and until an exchange gets enlisted 
as in-flight
-            // which happens later, so we need to signal back to the shutdown 
strategy that
-            // there is a pending exchange. When we are no longer polling, 
then we will return 0
-            log.trace("Currently polling so returning 1 as pending exchanges");
-            answer = 1;
-        }
-
-        return answer;
-    }
-
-    @Override
-    public void prepareShutdown() {
-    }
 }

Modified: 
camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java?rev=1231533&r1=1231532&r2=1231533&view=diff
==============================================================================
--- 
camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
 (original)
+++ 
camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
 Sat Jan 14 17:25:26 2012
@@ -29,12 +29,9 @@ import javax.mail.MessagingException;
 import javax.mail.Store;
 import javax.mail.search.FlagTerm;
 
-import org.apache.camel.BatchConsumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.ShutdownRunningTask;
-import org.apache.camel.impl.ScheduledPollConsumer;
-import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
@@ -45,7 +42,7 @@ import org.slf4j.LoggerFactory;
  * A {@link org.apache.camel.Consumer Consumer} which consumes messages from 
JavaMail using a
  * {@link javax.mail.Transport Transport} and dispatches them to the {@link 
Processor}
  */
-public class MailConsumer extends ScheduledPollConsumer implements 
BatchConsumer, ShutdownAware {
+public class MailConsumer extends ScheduledBatchPollingConsumer {
     public static final String POP3_UID = "CamelPop3Uid";
     public static final long DEFAULT_CONSUMER_DELAY = 60 * 1000L;
     private static final transient Logger LOG = 
LoggerFactory.getLogger(MailConsumer.class);
@@ -53,9 +50,6 @@ public class MailConsumer extends Schedu
     private final JavaMailSender sender;
     private Folder folder;
     private Store store;
-    private int maxMessagesPerPoll;
-    private volatile ShutdownRunningTask shutdownRunningTask;
-    private volatile int pendingExchanges;
 
     public MailConsumer(MailEndpoint endpoint, Processor processor, 
JavaMailSender sender) {
         super(endpoint, processor);
@@ -152,10 +146,6 @@ public class MailConsumer extends Schedu
         return polledMessages;
     }
 
-    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
-        this.maxMessagesPerPoll = maxMessagesPerPoll;
-    }
-
     public int processBatch(Queue<Object> exchanges) throws Exception {
         int total = exchanges.size();
 
@@ -202,54 +192,6 @@ public class MailConsumer extends Schedu
         return total;
     }
 
-    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
-        // store a reference what to do in case when shutting down and we have 
pending messages
-        this.shutdownRunningTask = shutdownRunningTask;
-        // do not defer shutdown
-        return false;
-    }
-
-    public int getPendingExchangesSize() {
-        int answer;
-        // only return the real pending size in case we are configured to 
complete all tasks
-        if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
-            answer = pendingExchanges;
-        } else {
-            answer = 0;
-        }
-
-        if (answer == 0 && isPolling()) {
-            // force at least one pending exchange if we are polling as there 
is a little gap
-            // in the processBatch method and until an exchange gets enlisted 
as in-flight
-            // which happens later, so we need to signal back to the shutdown 
strategy that
-            // there is a pending exchange. When we are no longer polling, 
then we will return 0
-            log.trace("Currently polling so returning 1 as pending exchanges");
-            answer = 1;
-        }
-
-        return answer;
-    }
-
-    public void prepareShutdown() {
-        // noop
-    }
-
-    public boolean isBatchAllowed() {
-        // stop if we are not running
-        boolean answer = isRunAllowed();
-        if (!answer) {
-            return false;
-        }
-
-        if (shutdownRunningTask == null) {
-            // we are not shutting down so continue to run
-            return true;
-        }
-
-        // we are shutting down so only continue if we are configured to 
complete all tasks
-        return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask;
-    }
-
     protected Queue<Exchange> createExchanges(Message[] messages) throws 
MessagingException {
         Queue<Exchange> answer = new LinkedList<Exchange>();
 

Modified: 
camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java?rev=1231533&r1=1231532&r2=1231533&view=diff
==============================================================================
--- 
camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java
 (original)
+++ 
camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java
 Sat Jan 14 17:25:26 2012
@@ -20,14 +20,12 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 
-import org.apache.camel.BatchConsumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.ShutdownRunningTask;
-import org.apache.camel.impl.ScheduledPollConsumer;
-import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
@@ -38,7 +36,7 @@ import org.slf4j.LoggerFactory;
  *
  * @version 
  */
-public class MyBatisConsumer extends ScheduledPollConsumer implements 
BatchConsumer, ShutdownAware {
+public class MyBatisConsumer extends ScheduledBatchPollingConsumer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MyBatisConsumer.class);
 
@@ -116,10 +114,6 @@ public class MyBatisConsumer extends Sch
         return processBatch(CastUtils.cast(answer));
     }
 
-    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
-        this.maxMessagesPerPoll = maxMessagesPerPoll;
-    }
-
     public int processBatch(Queue<Object> exchanges) throws Exception {
         final MyBatisEndpoint endpoint = getEndpoint();
 
@@ -161,54 +155,6 @@ public class MyBatisConsumer extends Sch
         return total;
     }
 
-    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
-        // store a reference what to do in case when shutting down and we have 
pending messages
-        this.shutdownRunningTask = shutdownRunningTask;
-        // do not defer shutdown
-        return false;
-    }
-
-    public int getPendingExchangesSize() {
-        int answer;
-        // only return the real pending size in case we are configured to 
complete all tasks
-        if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
-            answer = pendingExchanges;
-        } else {
-            answer = 0;
-        }
-
-        if (answer == 0 && isPolling()) {
-            // force at least one pending exchange if we are polling as there 
is a little gap
-            // in the processBatch method and until an exchange gets enlisted 
as in-flight
-            // which happens later, so we need to signal back to the shutdown 
strategy that
-            // there is a pending exchange. When we are no longer polling, 
then we will return 0
-            log.trace("Currently polling so returning 1 as pending exchanges");
-            answer = 1;
-        }
-
-        return answer;
-    }
-
-    public void prepareShutdown() {
-        // noop
-    }
-
-    public boolean isBatchAllowed() {
-        // stop if we are not running
-        boolean answer = isRunAllowed();
-        if (!answer) {
-            return false;
-        }
-
-        if (shutdownRunningTask == null) {
-            // we are not shutting down so continue to run
-            return true;
-        }
-
-        // we are shutting down so only continue if we are configured to 
complete all tasks
-        return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask;
-    }
-
     private Exchange createExchange(Object data) {
         final MyBatisEndpoint endpoint = getEndpoint();
         final Exchange exchange = 
endpoint.createExchange(ExchangePattern.InOnly);


Reply via email to