This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new e76bf0f CAMEL-15653: camel-sjms - Improve logging for sjms batch recovery task. Thanks to Brad Harvey for the patch. e76bf0f is described below commit e76bf0ffd1738386d580782f3c0c71deda4b2fa3 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sun Dec 6 18:21:29 2020 +0100 CAMEL-15653: camel-sjms - Improve logging for sjms batch recovery task. Thanks to Brad Harvey for the patch. --- .../component/sjms/batch/SjmsBatchComponent.java | 22 ----------- .../component/sjms/batch/SjmsBatchConsumer.java | 44 +++++++++++++--------- 2 files changed, 27 insertions(+), 39 deletions(-) diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java index 8bdb635..ddb0dce 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java @@ -17,7 +17,6 @@ package org.apache.camel.component.sjms.batch; import java.util.Map; -import java.util.concurrent.ExecutorService; import javax.jms.ConnectionFactory; @@ -30,8 +29,6 @@ import org.apache.camel.util.ObjectHelper; @Component("sjms-batch") public class SjmsBatchComponent extends HeaderFilterStrategyComponent { - private ExecutorService asyncStartStopExecutorService; - @Metadata(label = "advanced") private ConnectionFactory connectionFactory; @Metadata(label = "advanced") @@ -97,23 +94,4 @@ public class SjmsBatchComponent extends HeaderFilterStrategyComponent { this.recoveryInterval = recoveryInterval; } - @Override - protected void doShutdown() throws Exception { - if (asyncStartStopExecutorService != null) { - getCamelContext().getExecutorServiceManager().shutdownNow(asyncStartStopExecutorService); - asyncStartStopExecutorService = null; - } - super.doShutdown(); - } - - protected synchronized ExecutorService getAsyncStartStopExecutorService() { - if (asyncStartStopExecutorService == null) { - // use a cached thread pool for async start tasks as they can run for a while, and we need a dedicated thread - // for each task, and the thread pool will shrink when no more tasks running - asyncStartStopExecutorService - = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "AsyncStartStopListener"); - } - return asyncStartStopExecutorService; - } - } diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java index a625877..b8c7e95 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java @@ -42,6 +42,7 @@ import org.apache.camel.ExtendedExchange; import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; +import org.apache.camel.spi.ExecutorServiceManager; import org.apache.camel.support.DefaultConsumer; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.StringHelper; @@ -138,8 +139,11 @@ public class SjmsBatchConsumer extends DefaultConsumer { = new StartConsumerTask(recovery, getEndpoint().getRecoveryInterval(), getEndpoint().getKeepAliveDelay()); if (recovery) { - // use a background thread to keep starting the consumer until - getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(task); + // use a background thread to keep starting the consumer until it can connect successfully + String threadNameSuffix = "AsyncStartStopListener[" + destinationName + "]"; + ExecutorServiceManager executorServiceManager = getEndpoint().getCamelContext().getExecutorServiceManager(); + Thread thread = executorServiceManager.newThread(threadNameSuffix, task); + thread.start(); } else { task.run(); } @@ -168,7 +172,7 @@ public class SjmsBatchConsumer extends DefaultConsumer { consumersShutdownLatchRef.set(new CountDownLatch(consumerCount)); if (completionInterval > 0) { - LOG.info("Using CompletionInterval to run every {} millis.", completionInterval); + LOG.info("Using CompletionInterval to run every {} millis for {}.", completionInterval, destinationName); if (timeoutCheckerExecutorService == null) { setTimeoutCheckerExecutorService(getEndpoint().getCamelContext().getExecutorServiceManager() .newScheduledThreadPool(this, SJMS_BATCH_TIMEOUT_CHECKER, 1)); @@ -208,7 +212,7 @@ public class SjmsBatchConsumer extends DefaultConsumer { } if (attempt > 1) { - LOG.info("Successfully refreshed connection after {} attempts.", attempt); + LOG.info("Successfully refreshed connection to {} after {} attempts.", destinationName, attempt); } LOG.info("Started {} consumer(s) for {}:{}", consumerCount, destinationName, completionSize); @@ -234,7 +238,8 @@ public class SjmsBatchConsumer extends DefaultConsumer { // sleeping before next attempt try { - LOG.debug("Attempt #{}. Sleeping {} before next attempt to recover", attempt, recoveryInterval); + LOG.debug("Attempt #{}. Sleeping {} before next attempt to recover {}", attempt, recoveryInterval, + destinationName); Thread.sleep(recoveryInterval); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -251,18 +256,19 @@ public class SjmsBatchConsumer extends DefaultConsumer { CountDownLatch consumersShutdownLatch = consumersShutdownLatchRef.get(); if (consumersShutdownLatch != null) { - LOG.info("Stop signalled, waiting on consumers to shut down"); + LOG.info("Stop signalled, waiting on consumers for {} to shut down", destinationName); if (consumersShutdownLatch.await(60, TimeUnit.SECONDS)) { - LOG.warn("Timeout waiting on consumer threads to signal completion - shutting down"); + LOG.warn("Timeout waiting on consumer threads for {} to signal completion - shutting down", destinationName); } else { - LOG.info("All consumers have been shutdown"); + LOG.info("All consumers for {} have been shutdown", destinationName); } } else { - LOG.info("Stop signalled while there are no consumers yet, so no need to wait for consumers"); + LOG.info("Stop signalled while there are no consumers for {} yet, so no need to wait for consumers", + destinationName); } try { - LOG.debug("Shutting down JMS connection"); + LOG.debug("Shutting down JMS connection for {}", destinationName); connection.close(); } catch (Exception e) { // ignore @@ -374,7 +380,8 @@ public class SjmsBatchConsumer extends DefaultConsumer { if (LOG.isDebugEnabled()) { LOG.debug("Exception caught closing consumer", ex2); } - LOG.warn("Exception caught closing consumer: {}. This exception is ignored.", ex2.getMessage()); + LOG.warn("Exception caught closing consumer for : {}. This exception is ignored.", destinationName, + ex2.getMessage()); } } @@ -388,7 +395,8 @@ public class SjmsBatchConsumer extends DefaultConsumer { if (LOG.isDebugEnabled()) { LOG.debug("Exception caught closing session", ex2); } - LOG.warn("Exception caught closing session: {}. This exception is ignored.", ex2.getMessage()); + LOG.warn("Exception caught closing session for {}: {}. This exception is ignored.", destinationName, + ex2.getMessage()); } } @@ -473,8 +481,9 @@ public class SjmsBatchConsumer extends DefaultConsumer { reset(); } } catch (Exception e) { - LOG.warn("Error during evaluation of completion predicate {}. This exception is ignored.", - e.getMessage(), e); + LOG.warn("Error during evaluation of completion predicate " + e.getMessage() + + ". This exception is ignored.", + e); } } } @@ -494,7 +503,8 @@ public class SjmsBatchConsumer extends DefaultConsumer { } } else { - LOG.info("Shutdown signal received - rolling back batch"); + LOG.info("Shutdown signal received - rolling back {} pending in batch from destination {}", + messageCount, destinationName); session.rollback(); } } @@ -569,7 +579,7 @@ public class SjmsBatchConsumer extends DefaultConsumer { int batchSize = exchange.getProperty(Exchange.BATCH_SIZE, Integer.class); if (LOG.isDebugEnabled()) { long total = MESSAGE_RECEIVED.get() + batchSize; - LOG.debug("Processing batch[{}]:size={}:total={}", id, batchSize, total); + LOG.debug("Processing batch[" + id + "]:size=" + batchSize + ":total=" + total); } if ("timeout".equals(completedBy)) { @@ -587,7 +597,7 @@ public class SjmsBatchConsumer extends DefaultConsumer { long total = MESSAGE_PROCESSED.addAndGet(batchSize); LOG.debug("Completed processing[{}]:total={}", id, total); } catch (Exception e) { - getExceptionHandler().handleException("Error processing exchange", exchange, e); + getExceptionHandler().handleException("Error processing exchange from " + destinationName, exchange, e); } }