This is an automated email from the ASF dual-hosted git repository. pdallig pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push: new 20eb09c [ZEPPELIN-5089] ExecutorService shutdown 20eb09c is described below commit 20eb09c4fd1e29614791524b773dcfc4922b00e7 Author: Philipp Dallig <philipp.dal...@gmail.com> AuthorDate: Tue Oct 13 16:38:06 2020 +0200 [ZEPPELIN-5089] ExecutorService shutdown ### What is this PR for? Included in this PR: - shutdown LifecylceManager and RM-Heartbeat - Shutdown the ParallelScheduler-Worker tasks - Use Initialization-on-demand_holder_idiom to create `SchedulerFactory` and `ExecutorFactory` singleton - Shutdown all scheduler tasks, when destroying the `SchedulerFactory` - Soft shutdown of ExecutorService with utility class `ExecutorUtil`, which makes visible which thread is not shut down nicely ### What type of PR is it? - Improvement ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5089 ### How should this be tested? * Travis-CI: https://travis-ci.org/github/Reamer/zeppelin/builds/735636051 ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Philipp Dallig <philipp.dal...@gmail.com> Closes #3937 from Reamer/executor_services and squashes the following commits: 394363d01 [Philipp Dallig] Use ScheduledExecutorService to schedule RM-Heartbeat 457e98199 [Philipp Dallig] final corrections f76b708a3 [Philipp Dallig] Use ExecutorFactory in TimeoutLifecycleManager 81bbf6545 [Philipp Dallig] Add scheduledExecutors to ExecutorFactory ef8100a78 [Philipp Dallig] Soft shutdown of executor threads bc0f35845 [Philipp Dallig] stop all sub scheduler 2be1f028e [Philipp Dallig] Use Initialization-on-demand holder idiom for static singleton f1e40e2ed [Philipp Dallig] style changes (cherry picked from commit 4fceaf55907d28454a5dff6c3c5c45c57bf63b53) Signed-off-by: Philipp Dallig <philipp.dal...@gmail.com> --- .../zeppelin/interpreter/InterpreterGroup.java | 10 ++-- .../lifecycle/TimeoutLifecycleManager.java | 11 +++-- .../remote/RemoteInterpreterServer.java | 26 ++++------ .../zeppelin/scheduler/AbstractScheduler.java | 6 ++- .../apache/zeppelin/scheduler/ExecutorFactory.java | 57 ++++++++++++++++------ .../apache/zeppelin/scheduler/FIFOScheduler.java | 12 ++++- .../zeppelin/scheduler/ParallelScheduler.java | 18 +++++++ .../org/apache/zeppelin/scheduler/Scheduler.java | 3 ++ .../zeppelin/scheduler/SchedulerFactory.java | 41 ++++++++-------- .../org/apache/zeppelin/util/ExecutorUtil.java | 54 ++++++++++++++++++++ .../zeppelin/scheduler/FIFOSchedulerTest.java | 30 ++++++------ .../zeppelin/scheduler/ParallelSchedulerTest.java | 26 +++++----- .../src/test/resources/log4j.properties | 2 +- .../apache/zeppelin/scheduler/RemoteScheduler.java | 8 +++ .../zeppelin/scheduler/RemoteSchedulerTest.java | 4 +- 15 files changed, 209 insertions(+), 99 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java index a569621..1497559 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java @@ -49,7 +49,7 @@ public class InterpreterGroup { protected String id; private String webUrl; // sessionId --> interpreters - protected Map<String, List<Interpreter>> sessions = new ConcurrentHashMap(); + protected Map<String, List<Interpreter>> sessions = new ConcurrentHashMap<>(); private AngularObjectRegistry angularObjectRegistry; private InterpreterHookRegistry hookRegistry; private ResourcePool resourcePool; @@ -115,15 +115,15 @@ public class InterpreterGroup { public AngularObjectRegistry getAngularObjectRegistry() { return angularObjectRegistry; } - + public void setAngularObjectRegistry(AngularObjectRegistry angularObjectRegistry) { this.angularObjectRegistry = angularObjectRegistry; } - + public InterpreterHookRegistry getInterpreterHookRegistry() { return hookRegistry; } - + public void setInterpreterHookRegistry(InterpreterHookRegistry hookRegistry) { this.hookRegistry = hookRegistry; } @@ -178,7 +178,7 @@ public class InterpreterGroup { interpreter.close(); interpreter.getScheduler().stop(); } catch (InterpreterException e) { - LOGGER.warn("Fail to close interpreter: " + interpreter.getClassName(), e); + LOGGER.warn("Fail to close interpreter: {}", interpreter.getClassName(), e); } } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java index 9cbb42f..dd4eaf9 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java @@ -21,12 +21,11 @@ import org.apache.thrift.TException; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.LifecycleManager; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer; +import org.apache.zeppelin.scheduler.ExecutorFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; - import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -51,7 +50,8 @@ public class TimeoutLifecycleManager extends LifecycleManager { .ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL); long timeoutThreshold = zConf.getLong( ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD); - ScheduledExecutorService checkScheduler = Executors.newScheduledThreadPool(1); + ScheduledExecutorService checkScheduler = ExecutorFactory.singleton() + .createOrGetScheduled("TimeoutLifecycleManager", 1); checkScheduler.scheduleAtFixedRate(() -> { if ((System.currentTimeMillis() - lastBusyTimeInMillis) > timeoutThreshold) { LOGGER.info("Interpreter process idle time exceed threshold, try to stop it"); @@ -64,8 +64,8 @@ public class TimeoutLifecycleManager extends LifecycleManager { LOGGER.debug("Check idle time of interpreter"); } }, checkInterval, checkInterval, MILLISECONDS); - LOGGER.info("TimeoutLifecycleManager is started with checkInterval: " + checkInterval - + ", timeoutThreshold: " + timeoutThreshold); + LOGGER.info("TimeoutLifecycleManager is started with checkInterval: {}, timeoutThreshold: ΒΈ{}", checkInterval, + timeoutThreshold); } @Override @@ -79,4 +79,5 @@ public class TimeoutLifecycleManager extends LifecycleManager { LOGGER.debug("Interpreter process: {} is used", interpreterGroupId); lastBusyTimeInMillis = System.currentTimeMillis(); } + } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 4051dac..2bf9adf 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -64,6 +64,7 @@ import org.apache.zeppelin.resource.DistributedResourcePool; import org.apache.zeppelin.resource.Resource; import org.apache.zeppelin.resource.ResourcePool; import org.apache.zeppelin.resource.ResourceSet; +import org.apache.zeppelin.scheduler.ExecutorFactory; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.scheduler.JobListener; @@ -161,12 +162,13 @@ public class RemoteInterpreterServer extends Thread String portRange, String interpreterGroupId, boolean isTest) throws Exception { - LOGGER.info("Starting remote interpreter server on port {}, intpEventServerAddress: {}:{}", port, - intpEventServerHost, intpEventServerPort); + super("RemoteInterpreterServer-Thread"); if (null != intpEventServerHost) { this.intpEventServerHost = intpEventServerHost; this.intpEventServerPort = intpEventServerPort; if (!isTest) { + LOGGER.info("Starting remote interpreter server on port {}, intpEventServerAddress: {}:{}", port, + intpEventServerHost, intpEventServerPort); intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort); } } else { @@ -227,18 +229,9 @@ public class RemoteInterpreterServer extends Thread if (launcherEnv != null && "yarn".endsWith(launcherEnv)) { try { YarnUtils.register(host, port); - Thread thread = new Thread(() -> { - while(!Thread.interrupted() && server.isServing()) { - YarnUtils.heartbeat(); - try { - Thread.sleep(60 * 1000); - } catch (InterruptedException e) { - LOGGER.warn(e.getMessage(), e); - } - } - }); - thread.setName("RM-Heartbeat-Thread"); - thread.start(); + ScheduledExecutorService yarnHeartbeat = ExecutorFactory.singleton() + .createOrGetScheduled("RM-Heartbeat", 1); + yarnHeartbeat.scheduleAtFixedRate(YarnUtils::heartbeat, 0, 1, TimeUnit.MINUTES); } catch (Exception e) { LOGGER.error("Fail to register yarn app", e); } @@ -311,6 +304,7 @@ public class RemoteInterpreterServer extends Thread } if (!isTest) { SchedulerFactory.singleton().destroy(); + ExecutorFactory.singleton().shutdownAll(); } if ("yarn".equals(launcherEnv)) { @@ -370,8 +364,8 @@ public class RemoteInterpreterServer extends Thread private LifecycleManager createLifecycleManager() throws Exception { String lifecycleManagerClass = zConf.getLifecycleManagerClass(); - Class clazz = Class.forName(lifecycleManagerClass); - LOGGER.info("Creating interpreter lifecycle manager: " + lifecycleManagerClass); + Class<?> clazz = Class.forName(lifecycleManagerClass); + LOGGER.info("Creating interpreter lifecycle manager: {}", lifecycleManagerClass); return (LifecycleManager) clazz.getConstructor(ZeppelinConfiguration.class, RemoteInterpreterServer.class) .newInstance(zConf, this); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java index 079be3b..fdd456b 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java @@ -46,6 +46,7 @@ public abstract class AbstractScheduler implements Scheduler { this.name = name; } + @Override public String getName() { return this.name; } @@ -66,6 +67,7 @@ public abstract class AbstractScheduler implements Scheduler { try { queue.put(job); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RuntimeException(String.format("Unable to submit job %s", job.getId()), e); } jobs.put(job.getId(), job); @@ -122,7 +124,7 @@ public abstract class AbstractScheduler implements Scheduler { return; } - LOGGER.info("Job " + runningJob.getId() + " started by scheduler " + name); + LOGGER.info("Job {} started by scheduler {}",runningJob.getId(), name); // Don't set RUNNING status when it is RemoteScheduler, update it via JobStatusPoller if (!getClass().getSimpleName().equals("RemoteScheduler")) { runningJob.setStatus(Job.Status.RUNNING); @@ -149,7 +151,7 @@ public abstract class AbstractScheduler implements Scheduler { runningJob.setStatus(Job.Status.FINISHED); } } - LOGGER.info("Job " + runningJob.getId() + " finished by scheduler " + name); + LOGGER.info("Job {} finished by scheduler {} with status {}", runningJob.getId(), name, runningJob.getStatus()); // reset aborted flag to allow retry runningJob.aborted = false; jobs.remove(runningJob.getId()); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java index 70c7e29..b20ccc7 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java @@ -18,43 +18,57 @@ package org.apache.zeppelin.scheduler; import java.util.HashMap; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.zeppelin.util.ExecutorUtil; /** * Factory class for Executor */ public class ExecutorFactory { - private static ExecutorFactory instance; - private static Long _executorLock = new Long(0); private Map<String, ExecutorService> executors = new HashMap<>(); + private Map<String, ScheduledExecutorService> scheduledExecutors = new HashMap<>(); private ExecutorFactory() { } + //Using the Initialization-on-demand holder idiom (https://en.wikipedia.org/wiki/Initialization-on-demand_holder_idiom) + private static final class InstanceHolder { + private static final ExecutorFactory INSTANCE = new ExecutorFactory(); + } + public static ExecutorFactory singleton() { - if (instance == null) { - synchronized (_executorLock) { - if (instance == null) { - instance = new ExecutorFactory(); - } - } - } - return instance; + return InstanceHolder.INSTANCE; } public ExecutorService createOrGet(String name, int numThread) { synchronized (executors) { if (!executors.containsKey(name)) { - executors.put(name, Executors.newScheduledThreadPool(numThread, + executors.put(name, Executors.newScheduledThreadPool( + numThread, new SchedulerThreadFactory(name))); } return executors.get(name); } } + public ScheduledExecutorService createOrGetScheduled(String name, int numThread) { + synchronized (scheduledExecutors) { + if (!scheduledExecutors.containsKey(name)) { + scheduledExecutors.put(name, Executors.newScheduledThreadPool( + numThread, + new SchedulerThreadFactory(name))); + } + return scheduledExecutors.get(name); + } + } + /** * ThreadPool created for running note via rest api. * TODO(zjffdu) Should use property to configure the thread pool size. @@ -68,18 +82,31 @@ public class ExecutorFactory { synchronized (executors) { if (executors.containsKey(name)) { ExecutorService e = executors.get(name); - e.shutdown(); + ExecutorUtil.softShutdown(name, e, 1, TimeUnit.MINUTES); executors.remove(name); } } + synchronized (scheduledExecutors) { + if (scheduledExecutors.containsKey(name)) { + ExecutorService e = scheduledExecutors.get(name); + ExecutorUtil.softShutdown(name, e, 1, TimeUnit.MINUTES); + scheduledExecutors.remove(name); + } + } } - public void shutdownAll() { synchronized (executors) { - for (String name : executors.keySet()) { - shutdown(name); + for (Entry<String, ExecutorService> executor : executors.entrySet()) { + ExecutorUtil.softShutdown(executor.getKey(), executor.getValue(), 1, TimeUnit.MINUTES); + } + executors.clear(); + } + synchronized (scheduledExecutors) { + for (Entry<String, ScheduledExecutorService> scheduledExecutor : scheduledExecutors.entrySet()) { + ExecutorUtil.softShutdown(scheduledExecutor.getKey(), scheduledExecutor.getValue(), 1, TimeUnit.MINUTES); } + scheduledExecutors.clear(); } } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java index b9d5e82..3448636 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java @@ -19,6 +19,9 @@ package org.apache.zeppelin.scheduler; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.zeppelin.util.ExecutorUtil; /** * FIFOScheduler runs submitted job sequentially @@ -29,7 +32,7 @@ public class FIFOScheduler extends AbstractScheduler { FIFOScheduler(String name) { super(name); - executor = Executors.newSingleThreadExecutor( + this.executor = Executors.newSingleThreadExecutor( new SchedulerThreadFactory("FIFOScheduler-" + name + "-Worker-")); } @@ -41,7 +44,12 @@ public class FIFOScheduler extends AbstractScheduler { @Override public void stop() { + stop(2, TimeUnit.MINUTES); + } + + @Override + public void stop(int stopTimeoutVal, TimeUnit stopTimeoutUnit) { super.stop(); - executor.shutdownNow(); + ExecutorUtil.softShutdown(name, executor, stopTimeoutVal, stopTimeoutUnit); } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java index 1c12c03..979435a 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java @@ -19,12 +19,19 @@ package org.apache.zeppelin.scheduler; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.zeppelin.util.ExecutorUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Parallel scheduler runs submitted job concurrently. */ public class ParallelScheduler extends AbstractScheduler { + private static final Logger LOGGER = LoggerFactory.getLogger(ParallelScheduler.class); + private ExecutorService executor; ParallelScheduler(String name, int maxConcurrency) { @@ -38,4 +45,15 @@ public class ParallelScheduler extends AbstractScheduler { // submit this job to a FixedThreadPool so that at most maxConcurrencyJobs running executor.execute(() -> runJob(runningJob)); } + + @Override + public void stop() { + stop(2, TimeUnit.MINUTES); + } + + @Override + public void stop(int stopTimeoutVal, TimeUnit stopTimeoutUnit) { + super.stop(); + ExecutorUtil.softShutdown(name, executor, stopTimeoutVal, stopTimeoutUnit); + } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java index d2b68b3..820495b 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java @@ -18,6 +18,7 @@ package org.apache.zeppelin.scheduler; import java.util.List; +import java.util.concurrent.TimeUnit; /** * Interface for scheduler. Scheduler is used for manage the lifecycle of job. @@ -40,4 +41,6 @@ public interface Scheduler extends Runnable { void stop(); + void stop(int stopTimeoutVal, TimeUnit stopTimeoutUnit); + } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java index 16242be..8e76c0f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java @@ -18,12 +18,15 @@ package org.apache.zeppelin.scheduler; import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.util.ExecutorUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; /** * Factory class for creating schedulers except RemoteScheduler as RemoteScheduler runs in @@ -37,38 +40,34 @@ public class SchedulerFactory { protected ExecutorService executor; protected Map<String, Scheduler> schedulers = new HashMap<>(); - private static SchedulerFactory singleton; - private static Long singletonLock = new Long(0); + // Using the Initialization-on-demand holder idiom (https://en.wikipedia.org/wiki/Initialization-on-demand_holder_idiom) + private static final class InstanceHolder { + private static final SchedulerFactory INSTANCE = new SchedulerFactory(); + } public static SchedulerFactory singleton() { - if (singleton == null) { - synchronized (singletonLock) { - if (singleton == null) { - try { - singleton = new SchedulerFactory(); - } catch (Exception e) { - LOGGER.error(e.toString(), e); - } - } - } - } - return singleton; + return InstanceHolder.INSTANCE; } - SchedulerFactory() { + private SchedulerFactory() { ZeppelinConfiguration zConf = ZeppelinConfiguration.create(); int threadPoolSize = zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE); - LOGGER.info("Scheduler Thread Pool Size: " + threadPoolSize); + LOGGER.info("Scheduler Thread Pool Size: {}", threadPoolSize); executor = ExecutorFactory.singleton().createOrGet(SCHEDULER_EXECUTOR_NAME, threadPoolSize); } public void destroy() { LOGGER.info("Destroy all executors"); - ExecutorFactory.singleton().shutdown(SCHEDULER_EXECUTOR_NAME); - this.executor.shutdownNow(); - this.executor = null; - singleton = null; + synchronized (schedulers) { + // stop all child thread of schedulers + for (Entry<String, Scheduler> scheduler : schedulers.entrySet()) { + LOGGER.info("Stopping Scheduler {}", scheduler.getKey()); + scheduler.getValue().stop(); + } + schedulers.clear(); + } + ExecutorUtil.softShutdown("SchedulerFactoryExecutor", executor, 60, TimeUnit.SECONDS); } public Scheduler createOrGetFIFOScheduler(String name) { @@ -93,7 +92,7 @@ public class SchedulerFactory { } } - + public Scheduler createOrGetScheduler(Scheduler scheduler) { synchronized (schedulers) { if (!schedulers.containsKey(scheduler.getName())) { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/ExecutorUtil.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/ExecutorUtil.java new file mode 100644 index 0000000..1e81877 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/ExecutorUtil.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.util; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ExecutorUtil { + + private static final Logger LOGGER = LoggerFactory.getLogger(ExecutorUtil.class); + + private ExecutorUtil() { + // Util class + } + + // This softshutdown is based on https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html + public static void softShutdown(String name, ExecutorService executor, int stopTimeoutVal, TimeUnit stopTimeoutUnit) { + executor.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!executor.awaitTermination(stopTimeoutVal, stopTimeoutUnit)) { + LOGGER.warn("{} was not shut down in the given time {} {} - interrupting now", name, stopTimeoutVal, stopTimeoutUnit); + executor.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!executor.awaitTermination(stopTimeoutVal, stopTimeoutUnit)) { + LOGGER.error("executor {} did not terminate", name); + } + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + executor.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } +} diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java index e2d91ad..f383d41 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java @@ -17,30 +17,28 @@ package org.apache.zeppelin.scheduler; -import junit.framework.TestCase; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import org.apache.zeppelin.scheduler.Job.Status; +import org.junit.Before; import org.junit.Test; -public class FIFOSchedulerTest extends TestCase { +public class FIFOSchedulerTest { private SchedulerFactory schedulerSvc; - @Override - public void setUp() throws Exception { - schedulerSvc = new SchedulerFactory(); - } - - @Override - public void tearDown() { - schedulerSvc.destroy(); + @Before + public void setUp() { + schedulerSvc = SchedulerFactory.singleton(); } @Test public void testRun() throws InterruptedException { Scheduler s = schedulerSvc.createOrGetFIFOScheduler("test"); - Job job1 = new SleepingJob("job1", null, 500); - Job job2 = new SleepingJob("job2", null, 500); + Job<?> job1 = new SleepingJob("job1", null, 500); + Job<?> job2 = new SleepingJob("job2", null, 500); s.submit(job1); s.submit(job2); @@ -53,15 +51,15 @@ public class FIFOSchedulerTest extends TestCase { assertEquals(Status.FINISHED, job1.getStatus()); assertEquals(Status.RUNNING, job2.getStatus()); assertTrue((500 < (Long) job1.getReturn())); - s.stop(); + schedulerSvc.removeScheduler(s.getName()); } @Test public void testAbort() throws InterruptedException { Scheduler s = schedulerSvc.createOrGetFIFOScheduler("test"); - Job job1 = new SleepingJob("job1", null, 500); - Job job2 = new SleepingJob("job2", null, 500); + Job<?> job1 = new SleepingJob("job1", null, 500); + Job<?> job2 = new SleepingJob("job2", null, 500); s.submit(job1); s.submit(job2); @@ -78,6 +76,6 @@ public class FIFOSchedulerTest extends TestCase { assertTrue((500 > (Long) job1.getReturn())); assertEquals(null, job2.getReturn()); - s.stop(); + schedulerSvc.removeScheduler(s.getName()); } } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/ParallelSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/ParallelSchedulerTest.java index 9b9b9ba..fc8fa88 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/ParallelSchedulerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/ParallelSchedulerTest.java @@ -17,32 +17,28 @@ package org.apache.zeppelin.scheduler; +import static org.junit.Assert.assertEquals; -import junit.framework.TestCase; import org.apache.zeppelin.scheduler.Job.Status; +import org.junit.BeforeClass; import org.junit.Test; -public class ParallelSchedulerTest extends TestCase { +public class ParallelSchedulerTest { - private SchedulerFactory schedulerSvc; + private static SchedulerFactory schedulerSvc; - @Override - public void setUp() throws Exception { - schedulerSvc = new SchedulerFactory(); - } - - @Override - public void tearDown() { - schedulerSvc.destroy(); + @BeforeClass + public static void setUp() { + schedulerSvc = SchedulerFactory.singleton(); } @Test public void testRun() throws InterruptedException { Scheduler s = schedulerSvc.createOrGetParallelScheduler("test", 2); - Job job1 = new SleepingJob("job1", null, 500); - Job job2 = new SleepingJob("job2", null, 500); - Job job3 = new SleepingJob("job3", null, 500); + Job<?> job1 = new SleepingJob("job1", null, 500); + Job<?> job2 = new SleepingJob("job2", null, 500); + Job<?> job3 = new SleepingJob("job3", null, 500); s.submit(job1); s.submit(job2); @@ -58,6 +54,6 @@ public class ParallelSchedulerTest extends TestCase { assertEquals(Status.FINISHED, job1.getStatus()); assertEquals(Status.FINISHED, job2.getStatus()); assertEquals(Status.RUNNING, job3.getStatus()); + schedulerSvc.removeScheduler(s.getName()); } - } diff --git a/zeppelin-interpreter/src/test/resources/log4j.properties b/zeppelin-interpreter/src/test/resources/log4j.properties index b724845..4f78acf 100644 --- a/zeppelin-interpreter/src/test/resources/log4j.properties +++ b/zeppelin-interpreter/src/test/resources/log4j.properties @@ -19,7 +19,7 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n +log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p [%t] %c:%L - %m%n #log4j.appender.stdout.layout.ConversionPattern= #%5p [%t] (%F:%L) - %m%n #%-4r [%t] %-5p %c %x - %m%n diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java index 3797c8b..d8797ff 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java @@ -19,10 +19,12 @@ package org.apache.zeppelin.scheduler; import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; import org.apache.zeppelin.scheduler.Job.Status; +import org.apache.zeppelin.util.ExecutorUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; /** * RemoteScheduler runs in ZeppelinServer and proxies Scheduler running on RemoteInterpreter. @@ -219,4 +221,10 @@ public class RemoteScheduler extends AbstractScheduler { } } } + + @Override + public void stop(int stopTimeoutVal, TimeUnit stopTimeoutUnit) { + super.stop(); + } + } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java index 84e7fbe..8b84243 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java @@ -48,13 +48,15 @@ public class RemoteSchedulerTest extends AbstractInterpreterTest private static final int TICK_WAIT = 100; private static final int MAX_WAIT_CYCLES = 100; + @Override @Before public void setUp() throws Exception { super.setUp(); - schedulerSvc = new SchedulerFactory(); + schedulerSvc = SchedulerFactory.singleton(); interpreterSetting = interpreterSettingManager.getInterpreterSettingByName("test"); } + @Override @After public void tearDown() { interpreterSetting.close();