batch consumer will create new sessions instead of bailing out now
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a5646bbf Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a5646bbf Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a5646bbf Branch: refs/heads/master Commit: a5646bbf2ff78dc87036a2c319159db962764128 Parents: 0dc847e Author: Bryan Love <bryan.l...@iovation.com> Authored: Wed Mar 22 16:43:33 2017 -0700 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Mar 28 10:03:54 2017 +0200 ---------------------------------------------------------------------- .../component/sjms/batch/SjmsBatchConsumer.java | 34 +++++++++++++------- 1 file changed, 22 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/a5646bbf/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java index 630bdfb..2f2440d 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java @@ -298,24 +298,34 @@ public class SjmsBatchConsumer extends DefaultConsumer { @Override public void run() { try { - // a batch corresponds to a single session that will be committed or rolled back by a background thread - final Session session = connection.createSession(TRANSACTED, Session.CLIENT_ACKNOWLEDGE); - try { - // only batch consumption from queues is supported - it makes no sense to transactionally consume - // from a topic as you don't car about message loss, users can just use a regular aggregator instead - Queue queue = session.createQueue(destinationName); - MessageConsumer consumer = session.createConsumer(queue); - + // this loop is intended to keep the consumer up and running as long as it's supposed to be, but allow it to bail if signaled + while (running.get()) { + // a batch corresponds to a single session that will be committed or rolled back by a background thread + final Session session = connection.createSession(TRANSACTED, Session.CLIENT_ACKNOWLEDGE); try { - task.consumeBatchesOnLoop(session, consumer); + // only batch consumption from queues is supported - it makes no sense to transactionally consume + // from a topic as you don't car about message loss, users can just use a regular aggregator instead + Queue queue = session.createQueue(destinationName); + MessageConsumer consumer = session.createConsumer(queue); + + try { + task.consumeBatchesOnLoop(session, consumer); + } finally { + closeJmsConsumer(consumer); + } + } catch (javax.jms.IllegalStateException ex) { + // from consumeBatchesOnLoop + // this will log the exception and the parent loop will create a new session + getExceptionHandler().handleException("Exception caught consuming from " + destinationName, ex); + //rest a minute to avoid destroying the logs + Thread.sleep(2000); } finally { - closeJmsConsumer(consumer); + closeJmsSession(session); } - } finally { - closeJmsSession(session); } } catch (Throwable ex) { // from consumeBatchesOnLoop + // catch anything besides the IllegalStateException and exit the application getExceptionHandler().handleException("Exception caught consuming from " + destinationName, ex); } finally { // indicate that we have shut down