Repository: camel Updated Branches: refs/heads/camel-2.18.x 28f4bfe01 -> 5eb6868f6 refs/heads/master 15256c88a -> 25423af4f
CAMEL-10677: Fixed NPE in camel-sjms batch consumer. And use submit for the consumer task as it should not risk using the starter thread which execute can do. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/25423af4 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/25423af4 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/25423af4 Branch: refs/heads/master Commit: 25423af4ff459ea261e1f081bc810bb921bd054f Parents: 15256c8 Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Jan 9 13:58:44 2017 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Jan 9 13:58:44 2017 +0100 ---------------------------------------------------------------------- .../camel/component/sjms/batch/SjmsBatchConsumer.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/25423af4/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java index 48f9ff7..630bdfb 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java @@ -73,7 +73,7 @@ public class SjmsBatchConsumer extends DefaultConsumer { private ExecutorService jmsConsumerExecutors; private final AtomicBoolean running = new AtomicBoolean(false); private final AtomicReference<CountDownLatch> consumersShutdownLatchRef = new AtomicReference<>(); - private Connection connection; + private volatile Connection connection; public SjmsBatchConsumer(SjmsBatchEndpoint sjmsBatchEndpoint, Processor processor) { super(sjmsBatchEndpoint, processor); @@ -177,16 +177,16 @@ public class SjmsBatchConsumer extends DefaultConsumer { localConnection = connectionFactory.createConnection(); localConnection.start(); + // its success so prepare for exit + connection = localConnection; + final List<AtomicBoolean> triggers = new ArrayList<>(); for (int i = 0; i < consumerCount; i++) { BatchConsumptionLoop loop = new BatchConsumptionLoop(); triggers.add(loop.getCompletionTimeoutTrigger()); - jmsConsumerExecutors.execute(loop); + jmsConsumerExecutors.submit(loop); } - // its success so prepare for exit - connection = localConnection; - if (completionInterval > 0) { // trigger completion based on interval timeoutCheckerExecutorService.scheduleAtFixedRate(new CompletionIntervalTask(triggers), completionInterval, completionInterval, TimeUnit.MILLISECONDS);