Author: davsclaus Date: Fri Sep 7 07:51:31 2012 New Revision: 1381918 URL: http://svn.apache.org/viewvc?rev=1381918&view=rev Log: CAMEL-5563: API for shutdown thread pool in ExecutorServiceManager aligned so they use the API of the JDK as well the new shutdownGraceful. This gives end user full power what to use.
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java?rev=1381918&r1=1381917&r2=1381918&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceManager.java Fri Sep 7 07:51:31 2012 @@ -59,7 +59,7 @@ public class DefaultExecutorServiceManag private ThreadPoolFactory threadPoolFactory = new DefaultThreadPoolFactory(); private final List<ExecutorService> executorServices = new ArrayList<ExecutorService>(); private String threadNamePattern; - private long shutdownAwaitTermination = 30000; + private long shutdownAwaitTermination = 10000; private String defaultThreadPoolProfileId = "defaultThreadPoolProfile"; private final Map<String, ThreadPoolProfile> threadPoolProfiles = new HashMap<String, ThreadPoolProfile>(); private ThreadPoolProfile builtIndefaultProfile; @@ -256,41 +256,50 @@ public class DefaultExecutorServiceManag @Override public void shutdown(ExecutorService executorService) { - ObjectHelper.notNull(executorService, "executorService"); - shutdown(executorService, shutdownAwaitTermination); + doShutdown(executorService, 0); } @Override - public void shutdown(ExecutorService executorService, long shutdownAwaitTermination) { - ObjectHelper.notNull(executorService, "executorService"); - if (shutdownAwaitTermination <= 0) { - throw new IllegalArgumentException("ShutdownAwaitTermination must be a positive number, was: " + shutdownAwaitTermination); - } + public void shutdownGraceful(ExecutorService executorService) { + doShutdown(executorService, getShutdownAwaitTermination()); + } + + @Override + public void shutdownGraceful(ExecutorService executorService, long shutdownAwaitTermination) { + doShutdown(executorService, shutdownAwaitTermination); + } + private void doShutdown(ExecutorService executorService, long shutdownAwaitTermination) { + ObjectHelper.notNull(executorService, "executorService"); // shutting down a thread pool is a 2 step process. First we try graceful, and if that fails, then we go more aggressively // and try shutting down again. In both cases we wait at most the given shutdown timeout value given - // (total wait could then be 2 x shutdownAwaitTermination) - boolean warned = false; - StopWatch watch = new StopWatch(); + // (total wait could then be 2 x shutdownAwaitTermination, but when we shutdown the 2nd time we are aggressive and thus + // we ought to shutdown much faster) if (!executorService.isShutdown()) { + boolean warned = false; + StopWatch watch = new StopWatch(); + LOG.trace("Shutdown of ExecutorService: {} with await termination: {} millis", executorService, shutdownAwaitTermination); executorService.shutdown(); - try { - if (!awaitTermination(executorService, shutdownAwaitTermination)) { - warned = true; - LOG.warn("Forcing shutdown of ExecutorService: {} due first await termination elapsed.", executorService); - executorService.shutdownNow(); - // we are now shutting down aggressively, so wait to see if we can completely shutdown or not + + if (shutdownAwaitTermination > 0) { + try { if (!awaitTermination(executorService, shutdownAwaitTermination)) { - LOG.warn("Cannot completely force shutdown of ExecutorService: {} due second await termination elapsed.", executorService); + warned = true; + LOG.warn("Forcing shutdown of ExecutorService: {} due first await termination elapsed.", executorService); + executorService.shutdownNow(); + // we are now shutting down aggressively, so wait to see if we can completely shutdown or not + if (!awaitTermination(executorService, shutdownAwaitTermination)) { + LOG.warn("Cannot completely force shutdown of ExecutorService: {} due second await termination elapsed.", executorService); + } } + } catch (InterruptedException e) { + warned = true; + LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService); + // we were interrupted during shutdown, so force shutdown + executorService.shutdownNow(); } - } catch (InterruptedException e) { - warned = true; - LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService); - // we were interrupted during shutdown, so force shutdown - executorService.shutdownNow(); } // if we logged at WARN level, then report at INFO level when we are complete so the end user can see this in the log @@ -314,35 +323,6 @@ public class DefaultExecutorServiceManag executorServices.remove(executorService); } - /** - * Awaits the termination of the thread pool. - * <p/> - * This implementation will log every 5th second at INFO level that we are waiting, so the end user - * can see we are not hanging in case it takes longer time to shutdown the pool. - * - * @param executorService the thread pool - * @param shutdownAwaitTermination time in millis to use as timeout - * @return <tt>true</tt> if the pool is terminated, or <tt>false</tt> if we timed out - * @throws InterruptedException is thrown if we are interrupted during the waiting - */ - private static boolean awaitTermination(ExecutorService executorService, long shutdownAwaitTermination) throws InterruptedException { - // log progress every 5th second so end user is aware of we are shutting down - StopWatch watch = new StopWatch(); - long interval = Math.min(5000, shutdownAwaitTermination); - boolean done = false; - while (!done && interval > 0) { - if (executorService.awaitTermination(interval, TimeUnit.MILLISECONDS)) { - done = true; - } else { - LOG.info("Waited {} for ExecutorService: {} to shutdown...", TimeUtils.printDuration(watch.taken()), executorService); - // recalculate interval - interval = Math.min(5000, shutdownAwaitTermination - watch.taken()); - } - } - - return done; - } - @Override public List<Runnable> shutdownNow(ExecutorService executorService) { return doShutdownNow(executorService, false); @@ -373,7 +353,7 @@ public class DefaultExecutorServiceManag } } - // remove reference as its shutdown + // remove reference as its shutdown (do not remove if fail-safe) if (!failSafe) { executorServices.remove(executorService); } @@ -381,6 +361,25 @@ public class DefaultExecutorServiceManag return answer; } + @Override + public boolean awaitTermination(ExecutorService executorService, long shutdownAwaitTermination) throws InterruptedException { + // log progress every 2nd second so end user is aware of we are shutting down + StopWatch watch = new StopWatch(); + long interval = Math.min(2000, shutdownAwaitTermination); + boolean done = false; + while (!done && interval > 0) { + if (executorService.awaitTermination(interval, TimeUnit.MILLISECONDS)) { + done = true; + } else { + LOG.info("Waited {} for ExecutorService: {} to terminate...", TimeUtils.printDuration(watch.taken()), executorService); + // recalculate interval + interval = Math.min(2000, shutdownAwaitTermination - watch.taken()); + } + } + + return done; + } + /** * Strategy callback when a new {@link java.util.concurrent.ExecutorService} have been created. * Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=1381918&r1=1381917&r2=1381918&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java Fri Sep 7 07:51:31 2012 @@ -355,7 +355,8 @@ public class DefaultShutdownStrategy ext @Override protected void doShutdown() throws Exception { if (executor != null) { - camelContext.getExecutorServiceManager().shutdown(executor); + // force shutting down as we are shutting down Camel + camelContext.getExecutorServiceManager().shutdownNow(executor); // should clear executor so we can restart by creating a new thread pool executor = null; } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java?rev=1381918&r1=1381917&r2=1381918&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java Fri Sep 7 07:51:31 2012 @@ -129,7 +129,7 @@ public interface ExecutorServiceManager * Sets the time to wait for thread pools to shutdown orderly, when invoking the * {@link #shutdown()} method. * <p/> - * The default value is <tt>30000</tt> millis. + * The default value is <tt>10000</tt> millis. * * @param timeInMillis time in millis. */ @@ -139,7 +139,7 @@ public interface ExecutorServiceManager * Gets the time to wait for thread pools to shutdown orderly, when invoking the * {@link #shutdown()} method. * <p/> - * The default value is <tt>30000</tt> millis. + * The default value is <tt>10000</tt> millis. * * @return the timeout value */ @@ -271,6 +271,17 @@ public interface ExecutorServiceManager ScheduledExecutorService newScheduledThreadPool(Object source, String name, String profileId); /** + * Shutdown the given executor service (<b>not</b> graceful). + * <p/> + * This implementation will issues a regular shutdown of the executor service, + * ie calling {@link java.util.concurrent.ExecutorService#shutdown()} and return. + * + * @param executorService the executor service to shutdown + * @see java.util.concurrent.ExecutorService#shutdown() + */ + void shutdown(ExecutorService executorService); + + /** * Shutdown the given executor service graceful at first, and then aggressively * if the await termination timeout was hit. * <p/> @@ -285,7 +296,7 @@ public interface ExecutorServiceManager * @see java.util.concurrent.ExecutorService#shutdown() * @see #getShutdownAwaitTermination() */ - void shutdown(ExecutorService executorService); + void shutdownGraceful(ExecutorService executorService); /** * Shutdown the given executor service graceful at first, and then aggressively @@ -302,10 +313,13 @@ public interface ExecutorServiceManager * @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown * @see java.util.concurrent.ExecutorService#shutdown() */ - void shutdown(ExecutorService executorService, long shutdownAwaitTermination); + void shutdownGraceful(ExecutorService executorService, long shutdownAwaitTermination); /** * Shutdown now the given executor service aggressively. + * <p/> + * This implementation will issues a regular shutdownNow of the executor service, + * ie calling {@link java.util.concurrent.ExecutorService#shutdownNow()} and return. * * @param executorService the executor service to shutdown now * @return list of tasks that never commenced execution @@ -313,4 +327,17 @@ public interface ExecutorServiceManager */ List<Runnable> shutdownNow(ExecutorService executorService); + /** + * Awaits the termination of the thread pool. + * <p/> + * This implementation will log every 2nd second at INFO level that we are waiting, so the end user + * can see we are not hanging in case it takes longer time to shutdown the pool. + * + * @param executorService the thread pool + * @param shutdownAwaitTermination time in millis to use as timeout + * @return <tt>true</tt> if the pool is terminated, or <tt>false</tt> if we timed out + * @throws InterruptedException is thrown if we are interrupted during waiting + */ + boolean awaitTermination(ExecutorService executorService, long shutdownAwaitTermination) throws InterruptedException; + }