batch consumer will create new sessions instead of bailing out now

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a5646bbf
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a5646bbf
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a5646bbf

Branch: refs/heads/master
Commit: a5646bbf2ff78dc87036a2c319159db962764128
Parents: 0dc847e
Author: Bryan Love <bryan.l...@iovation.com>
Authored: Wed Mar 22 16:43:33 2017 -0700
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Tue Mar 28 10:03:54 2017 +0200

----------------------------------------------------------------------
 .../component/sjms/batch/SjmsBatchConsumer.java | 34 +++++++++++++-------
 1 file changed, 22 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a5646bbf/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index 630bdfb..2f2440d 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -298,24 +298,34 @@ public class SjmsBatchConsumer extends DefaultConsumer {
         @Override
         public void run() {
             try {
-                // a batch corresponds to a single session that will be 
committed or rolled back by a background thread
-                final Session session = connection.createSession(TRANSACTED, 
Session.CLIENT_ACKNOWLEDGE);
-                try {
-                    // only batch consumption from queues is supported - it 
makes no sense to transactionally consume
-                    // from a topic as you don't car about message loss, users 
can just use a regular aggregator instead
-                    Queue queue = session.createQueue(destinationName);
-                    MessageConsumer consumer = session.createConsumer(queue);
-
+                // this loop is intended to keep the consumer up and running 
as long as it's supposed to be, but allow it to bail if signaled
+                while (running.get()) {
+                    // a batch corresponds to a single session that will be 
committed or rolled back by a background thread
+                    final Session session = 
connection.createSession(TRANSACTED, Session.CLIENT_ACKNOWLEDGE);
                     try {
-                        task.consumeBatchesOnLoop(session, consumer);
+                        // only batch consumption from queues is supported - 
it makes no sense to transactionally consume
+                        // from a topic as you don't car about message loss, 
users can just use a regular aggregator instead
+                        Queue queue = session.createQueue(destinationName);
+                        MessageConsumer consumer = 
session.createConsumer(queue);
+
+                        try {
+                            task.consumeBatchesOnLoop(session, consumer);
+                        } finally {
+                            closeJmsConsumer(consumer);
+                        }
+                    } catch (javax.jms.IllegalStateException ex) {
+                        // from consumeBatchesOnLoop
+                        // this will log the exception and the parent loop 
will create a new session
+                        getExceptionHandler().handleException("Exception 
caught consuming from " + destinationName, ex);
+                        //rest a minute to avoid destroying the logs
+                        Thread.sleep(2000);
                     } finally {
-                        closeJmsConsumer(consumer);
+                        closeJmsSession(session);
                     }
-                } finally {
-                    closeJmsSession(session);
                 }
             } catch (Throwable ex) {
                 // from consumeBatchesOnLoop
+                // catch anything besides the IllegalStateException and exit 
the application
                 getExceptionHandler().handleException("Exception caught 
consuming from " + destinationName, ex);
             } finally {
                 // indicate that we have shut down

Reply via email to