This is an automated email from the ASF dual-hosted git repository.

pdallig pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 20eb09c  [ZEPPELIN-5089] ExecutorService shutdown
20eb09c is described below

commit 20eb09c4fd1e29614791524b773dcfc4922b00e7
Author: Philipp Dallig <philipp.dal...@gmail.com>
AuthorDate: Tue Oct 13 16:38:06 2020 +0200

    [ZEPPELIN-5089] ExecutorService shutdown
    
    ### What is this PR for?
    Included in this PR:
     - shutdown LifecylceManager and RM-Heartbeat
     - Shutdown the ParallelScheduler-Worker tasks
     - Use Initialization-on-demand_holder_idiom to create `SchedulerFactory` 
and `ExecutorFactory` singleton
     - Shutdown all scheduler tasks, when destroying the `SchedulerFactory`
     - Soft shutdown of ExecutorService with utility class `ExecutorUtil`, 
which makes visible which thread is not shut down nicely
    
    ### What type of PR is it?
     - Improvement
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5089
    
    ### How should this be tested?
    * Travis-CI: https://travis-ci.org/github/Reamer/zeppelin/builds/735636051
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Philipp Dallig <philipp.dal...@gmail.com>
    
    Closes #3937 from Reamer/executor_services and squashes the following 
commits:
    
    394363d01 [Philipp Dallig] Use ScheduledExecutorService to schedule 
RM-Heartbeat
    457e98199 [Philipp Dallig] final corrections
    f76b708a3 [Philipp Dallig] Use ExecutorFactory in TimeoutLifecycleManager
    81bbf6545 [Philipp Dallig] Add scheduledExecutors to ExecutorFactory
    ef8100a78 [Philipp Dallig] Soft shutdown of executor threads
    bc0f35845 [Philipp Dallig] stop all sub scheduler
    2be1f028e [Philipp Dallig] Use Initialization-on-demand holder idiom for 
static singleton
    f1e40e2ed [Philipp Dallig] style changes
    
    (cherry picked from commit 4fceaf55907d28454a5dff6c3c5c45c57bf63b53)
    Signed-off-by: Philipp Dallig <philipp.dal...@gmail.com>
---
 .../zeppelin/interpreter/InterpreterGroup.java     | 10 ++--
 .../lifecycle/TimeoutLifecycleManager.java         | 11 +++--
 .../remote/RemoteInterpreterServer.java            | 26 ++++------
 .../zeppelin/scheduler/AbstractScheduler.java      |  6 ++-
 .../apache/zeppelin/scheduler/ExecutorFactory.java | 57 ++++++++++++++++------
 .../apache/zeppelin/scheduler/FIFOScheduler.java   | 12 ++++-
 .../zeppelin/scheduler/ParallelScheduler.java      | 18 +++++++
 .../org/apache/zeppelin/scheduler/Scheduler.java   |  3 ++
 .../zeppelin/scheduler/SchedulerFactory.java       | 41 ++++++++--------
 .../org/apache/zeppelin/util/ExecutorUtil.java     | 54 ++++++++++++++++++++
 .../zeppelin/scheduler/FIFOSchedulerTest.java      | 30 ++++++------
 .../zeppelin/scheduler/ParallelSchedulerTest.java  | 26 +++++-----
 .../src/test/resources/log4j.properties            |  2 +-
 .../apache/zeppelin/scheduler/RemoteScheduler.java |  8 +++
 .../zeppelin/scheduler/RemoteSchedulerTest.java    |  4 +-
 15 files changed, 209 insertions(+), 99 deletions(-)

diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
index a569621..1497559 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
@@ -49,7 +49,7 @@ public class InterpreterGroup {
   protected String id;
   private String webUrl;
   // sessionId --> interpreters
-  protected Map<String, List<Interpreter>> sessions = new ConcurrentHashMap();
+  protected Map<String, List<Interpreter>> sessions = new 
ConcurrentHashMap<>();
   private AngularObjectRegistry angularObjectRegistry;
   private InterpreterHookRegistry hookRegistry;
   private ResourcePool resourcePool;
@@ -115,15 +115,15 @@ public class InterpreterGroup {
   public AngularObjectRegistry getAngularObjectRegistry() {
     return angularObjectRegistry;
   }
-  
+
   public void setAngularObjectRegistry(AngularObjectRegistry 
angularObjectRegistry) {
     this.angularObjectRegistry = angularObjectRegistry;
   }
-  
+
   public InterpreterHookRegistry getInterpreterHookRegistry() {
     return hookRegistry;
   }
-  
+
   public void setInterpreterHookRegistry(InterpreterHookRegistry hookRegistry) 
{
     this.hookRegistry = hookRegistry;
   }
@@ -178,7 +178,7 @@ public class InterpreterGroup {
           interpreter.close();
           interpreter.getScheduler().stop();
         } catch (InterpreterException e) {
-          LOGGER.warn("Fail to close interpreter: " + 
interpreter.getClassName(), e);
+          LOGGER.warn("Fail to close interpreter: {}", 
interpreter.getClassName(), e);
         }
       }
     }
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java
index 9cbb42f..dd4eaf9 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java
@@ -21,12 +21,11 @@ import org.apache.thrift.TException;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.LifecycleManager;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer;
+import org.apache.zeppelin.scheduler.ExecutorFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 
@@ -51,7 +50,8 @@ public class TimeoutLifecycleManager extends LifecycleManager 
{
             .ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL);
     long timeoutThreshold = zConf.getLong(
         
ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD);
-    ScheduledExecutorService checkScheduler = 
Executors.newScheduledThreadPool(1);
+    ScheduledExecutorService checkScheduler = ExecutorFactory.singleton()
+        .createOrGetScheduled("TimeoutLifecycleManager", 1);
     checkScheduler.scheduleAtFixedRate(() -> {
       if ((System.currentTimeMillis() - lastBusyTimeInMillis) > 
timeoutThreshold) {
         LOGGER.info("Interpreter process idle time exceed threshold, try to 
stop it");
@@ -64,8 +64,8 @@ public class TimeoutLifecycleManager extends LifecycleManager 
{
         LOGGER.debug("Check idle time of interpreter");
       }
     }, checkInterval, checkInterval, MILLISECONDS);
-    LOGGER.info("TimeoutLifecycleManager is started with checkInterval: " + 
checkInterval
-        + ", timeoutThreshold: " + timeoutThreshold);
+    LOGGER.info("TimeoutLifecycleManager is started with checkInterval: {}, 
timeoutThreshold: ΒΈ{}", checkInterval,
+        timeoutThreshold);
   }
 
   @Override
@@ -79,4 +79,5 @@ public class TimeoutLifecycleManager extends LifecycleManager 
{
     LOGGER.debug("Interpreter process: {} is used", interpreterGroupId);
     lastBusyTimeInMillis = System.currentTimeMillis();
   }
+
 }
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 4051dac..2bf9adf 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -64,6 +64,7 @@ import org.apache.zeppelin.resource.DistributedResourcePool;
 import org.apache.zeppelin.resource.Resource;
 import org.apache.zeppelin.resource.ResourcePool;
 import org.apache.zeppelin.resource.ResourceSet;
+import org.apache.zeppelin.scheduler.ExecutorFactory;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.scheduler.JobListener;
@@ -161,12 +162,13 @@ public class RemoteInterpreterServer extends Thread
                                  String portRange,
                                  String interpreterGroupId,
                                  boolean isTest) throws Exception {
-    LOGGER.info("Starting remote interpreter server on port {}, 
intpEventServerAddress: {}:{}", port,
-            intpEventServerHost, intpEventServerPort);
+    super("RemoteInterpreterServer-Thread");
     if (null != intpEventServerHost) {
       this.intpEventServerHost = intpEventServerHost;
       this.intpEventServerPort = intpEventServerPort;
       if (!isTest) {
+        LOGGER.info("Starting remote interpreter server on port {}, 
intpEventServerAddress: {}:{}", port,
+          intpEventServerHost, intpEventServerPort);
         intpEventClient = new 
RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort);
       }
     } else {
@@ -227,18 +229,9 @@ public class RemoteInterpreterServer extends Thread
           if (launcherEnv != null && "yarn".endsWith(launcherEnv)) {
             try {
               YarnUtils.register(host, port);
-              Thread thread = new Thread(() -> {
-                while(!Thread.interrupted() && server.isServing()) {
-                  YarnUtils.heartbeat();
-                  try {
-                    Thread.sleep(60 * 1000);
-                  } catch (InterruptedException e) {
-                    LOGGER.warn(e.getMessage(), e);
-                  }
-                }
-              });
-              thread.setName("RM-Heartbeat-Thread");
-              thread.start();
+              ScheduledExecutorService yarnHeartbeat = 
ExecutorFactory.singleton()
+                .createOrGetScheduled("RM-Heartbeat", 1);
+              yarnHeartbeat.scheduleAtFixedRate(YarnUtils::heartbeat, 0, 1, 
TimeUnit.MINUTES);
             } catch (Exception e) {
               LOGGER.error("Fail to register yarn app", e);
             }
@@ -311,6 +304,7 @@ public class RemoteInterpreterServer extends Thread
       }
       if (!isTest) {
         SchedulerFactory.singleton().destroy();
+        ExecutorFactory.singleton().shutdownAll();
       }
 
       if ("yarn".equals(launcherEnv)) {
@@ -370,8 +364,8 @@ public class RemoteInterpreterServer extends Thread
 
   private LifecycleManager createLifecycleManager() throws Exception {
     String lifecycleManagerClass = zConf.getLifecycleManagerClass();
-    Class clazz = Class.forName(lifecycleManagerClass);
-    LOGGER.info("Creating interpreter lifecycle manager: " + 
lifecycleManagerClass);
+    Class<?> clazz = Class.forName(lifecycleManagerClass);
+    LOGGER.info("Creating interpreter lifecycle manager: {}", 
lifecycleManagerClass);
     return (LifecycleManager) 
clazz.getConstructor(ZeppelinConfiguration.class, RemoteInterpreterServer.class)
             .newInstance(zConf, this);
   }
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
index 079be3b..fdd456b 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
@@ -46,6 +46,7 @@ public abstract class AbstractScheduler implements Scheduler {
     this.name = name;
   }
 
+  @Override
   public String getName() {
     return this.name;
   }
@@ -66,6 +67,7 @@ public abstract class AbstractScheduler implements Scheduler {
     try {
       queue.put(job);
     } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
       throw new RuntimeException(String.format("Unable to submit job %s", 
job.getId()), e);
     }
     jobs.put(job.getId(), job);
@@ -122,7 +124,7 @@ public abstract class AbstractScheduler implements 
Scheduler {
       return;
     }
 
-    LOGGER.info("Job " + runningJob.getId() + " started by scheduler " + name);
+    LOGGER.info("Job {} started by scheduler {}",runningJob.getId(), name);
     // Don't set RUNNING status when it is RemoteScheduler, update it via 
JobStatusPoller
     if (!getClass().getSimpleName().equals("RemoteScheduler")) {
       runningJob.setStatus(Job.Status.RUNNING);
@@ -149,7 +151,7 @@ public abstract class AbstractScheduler implements 
Scheduler {
         runningJob.setStatus(Job.Status.FINISHED);
       }
     }
-    LOGGER.info("Job " + runningJob.getId() + " finished by scheduler " + 
name);
+    LOGGER.info("Job {} finished by scheduler {} with status {}", 
runningJob.getId(), name, runningJob.getStatus());
     // reset aborted flag to allow retry
     runningJob.aborted = false;
     jobs.remove(runningJob.getId());
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java
index 70c7e29..b20ccc7 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java
@@ -18,43 +18,57 @@ package org.apache.zeppelin.scheduler;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.zeppelin.util.ExecutorUtil;
 
 /**
  * Factory class for Executor
  */
 public class ExecutorFactory {
-  private static ExecutorFactory instance;
-  private static Long _executorLock = new Long(0);
 
   private Map<String, ExecutorService> executors = new HashMap<>();
+  private Map<String, ScheduledExecutorService> scheduledExecutors = new 
HashMap<>();
 
   private ExecutorFactory() {
 
   }
 
+  //Using the Initialization-on-demand holder idiom 
(https://en.wikipedia.org/wiki/Initialization-on-demand_holder_idiom)
+  private static final class InstanceHolder {
+    private static final ExecutorFactory INSTANCE = new ExecutorFactory();
+  }
+
   public static ExecutorFactory singleton() {
-    if (instance == null) {
-      synchronized (_executorLock) {
-        if (instance == null) {
-          instance = new ExecutorFactory();
-        }
-      }
-    }
-    return instance;
+    return InstanceHolder.INSTANCE;
   }
 
   public ExecutorService createOrGet(String name, int numThread) {
     synchronized (executors) {
       if (!executors.containsKey(name)) {
-        executors.put(name, Executors.newScheduledThreadPool(numThread,
+        executors.put(name, Executors.newScheduledThreadPool(
+            numThread,
             new SchedulerThreadFactory(name)));
       }
       return executors.get(name);
     }
   }
 
+  public ScheduledExecutorService createOrGetScheduled(String name, int 
numThread) {
+    synchronized (scheduledExecutors) {
+      if (!scheduledExecutors.containsKey(name)) {
+        scheduledExecutors.put(name, Executors.newScheduledThreadPool(
+            numThread,
+            new SchedulerThreadFactory(name)));
+      }
+      return scheduledExecutors.get(name);
+    }
+  }
+
   /**
    * ThreadPool created for running note via rest api.
    * TODO(zjffdu) Should use property to configure the thread pool size.
@@ -68,18 +82,31 @@ public class ExecutorFactory {
     synchronized (executors) {
       if (executors.containsKey(name)) {
         ExecutorService e = executors.get(name);
-        e.shutdown();
+        ExecutorUtil.softShutdown(name, e, 1, TimeUnit.MINUTES);
         executors.remove(name);
       }
     }
+    synchronized (scheduledExecutors) {
+      if (scheduledExecutors.containsKey(name)) {
+        ExecutorService e = scheduledExecutors.get(name);
+        ExecutorUtil.softShutdown(name, e, 1, TimeUnit.MINUTES);
+        scheduledExecutors.remove(name);
+      }
+    }
   }
 
-
   public void shutdownAll() {
     synchronized (executors) {
-      for (String name : executors.keySet()) {
-        shutdown(name);
+      for (Entry<String, ExecutorService> executor : executors.entrySet()) {
+        ExecutorUtil.softShutdown(executor.getKey(), executor.getValue(), 1, 
TimeUnit.MINUTES);
+      }
+      executors.clear();
+    }
+    synchronized (scheduledExecutors) {
+      for (Entry<String, ScheduledExecutorService> scheduledExecutor : 
scheduledExecutors.entrySet()) {
+        ExecutorUtil.softShutdown(scheduledExecutor.getKey(), 
scheduledExecutor.getValue(), 1, TimeUnit.MINUTES);
       }
+      scheduledExecutors.clear();
     }
   }
 }
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
index b9d5e82..3448636 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
@@ -19,6 +19,9 @@ package org.apache.zeppelin.scheduler;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.zeppelin.util.ExecutorUtil;
 
 /**
  * FIFOScheduler runs submitted job sequentially
@@ -29,7 +32,7 @@ public class FIFOScheduler extends AbstractScheduler {
 
   FIFOScheduler(String name) {
     super(name);
-    executor = Executors.newSingleThreadExecutor(
+    this.executor = Executors.newSingleThreadExecutor(
         new SchedulerThreadFactory("FIFOScheduler-" + name + "-Worker-"));
   }
 
@@ -41,7 +44,12 @@ public class FIFOScheduler extends AbstractScheduler {
 
   @Override
   public void stop() {
+    stop(2, TimeUnit.MINUTES);
+  }
+
+  @Override
+  public void stop(int stopTimeoutVal, TimeUnit stopTimeoutUnit) {
     super.stop();
-    executor.shutdownNow();
+    ExecutorUtil.softShutdown(name, executor, stopTimeoutVal, stopTimeoutUnit);
   }
 }
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
index 1c12c03..979435a 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
@@ -19,12 +19,19 @@ package org.apache.zeppelin.scheduler;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.zeppelin.util.ExecutorUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Parallel scheduler runs submitted job concurrently.
  */
 public class ParallelScheduler extends AbstractScheduler {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ParallelScheduler.class);
+
   private ExecutorService executor;
 
   ParallelScheduler(String name, int maxConcurrency) {
@@ -38,4 +45,15 @@ public class ParallelScheduler extends AbstractScheduler {
     // submit this job to a FixedThreadPool so that at most maxConcurrencyJobs 
running
     executor.execute(() -> runJob(runningJob));
   }
+
+  @Override
+  public void stop() {
+    stop(2, TimeUnit.MINUTES);
+  }
+
+  @Override
+  public void stop(int stopTimeoutVal, TimeUnit stopTimeoutUnit) {
+    super.stop();
+    ExecutorUtil.softShutdown(name, executor, stopTimeoutVal, stopTimeoutUnit);
+  }
 }
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java
index d2b68b3..820495b 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java
@@ -18,6 +18,7 @@
 package org.apache.zeppelin.scheduler;
 
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Interface for scheduler. Scheduler is used for manage the lifecycle of job.
@@ -40,4 +41,6 @@ public interface Scheduler extends Runnable {
 
   void stop();
 
+  void stop(int stopTimeoutVal, TimeUnit stopTimeoutUnit);
+
 }
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
index 16242be..8e76c0f 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
@@ -18,12 +18,15 @@
 package org.apache.zeppelin.scheduler;
 
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.util.ExecutorUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Factory class for creating schedulers except RemoteScheduler as 
RemoteScheduler runs in
@@ -37,38 +40,34 @@ public class SchedulerFactory {
   protected ExecutorService executor;
   protected Map<String, Scheduler> schedulers = new HashMap<>();
 
-  private static SchedulerFactory singleton;
-  private static Long singletonLock = new Long(0);
+  // Using the Initialization-on-demand holder idiom 
(https://en.wikipedia.org/wiki/Initialization-on-demand_holder_idiom)
+  private static final class InstanceHolder {
+    private static final SchedulerFactory INSTANCE = new SchedulerFactory();
+  }
 
   public static SchedulerFactory singleton() {
-    if (singleton == null) {
-      synchronized (singletonLock) {
-        if (singleton == null) {
-          try {
-            singleton = new SchedulerFactory();
-          } catch (Exception e) {
-            LOGGER.error(e.toString(), e);
-          }
-        }
-      }
-    }
-    return singleton;
+    return InstanceHolder.INSTANCE;
   }
 
-  SchedulerFactory() {
+  private SchedulerFactory() {
     ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
     int threadPoolSize =
         
zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE);
-    LOGGER.info("Scheduler Thread Pool Size: " + threadPoolSize);
+    LOGGER.info("Scheduler Thread Pool Size: {}", threadPoolSize);
     executor = 
ExecutorFactory.singleton().createOrGet(SCHEDULER_EXECUTOR_NAME, 
threadPoolSize);
   }
 
   public void destroy() {
     LOGGER.info("Destroy all executors");
-    ExecutorFactory.singleton().shutdown(SCHEDULER_EXECUTOR_NAME);
-    this.executor.shutdownNow();
-    this.executor = null;
-    singleton = null;
+    synchronized (schedulers) {
+      // stop all child thread of schedulers
+      for (Entry<String, Scheduler> scheduler : schedulers.entrySet()) {
+        LOGGER.info("Stopping Scheduler {}", scheduler.getKey());
+        scheduler.getValue().stop();
+      }
+      schedulers.clear();
+    }
+    ExecutorUtil.softShutdown("SchedulerFactoryExecutor", executor, 60, 
TimeUnit.SECONDS);
   }
 
   public Scheduler createOrGetFIFOScheduler(String name) {
@@ -93,7 +92,7 @@ public class SchedulerFactory {
     }
   }
 
-  
+
   public Scheduler createOrGetScheduler(Scheduler scheduler) {
     synchronized (schedulers) {
       if (!schedulers.containsKey(scheduler.getName())) {
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/ExecutorUtil.java 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/ExecutorUtil.java
new file mode 100644
index 0000000..1e81877
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/ExecutorUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.util;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExecutorUtil {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ExecutorUtil.class);
+
+  private ExecutorUtil() {
+    // Util class
+  }
+
+  // This softshutdown is based on 
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html
+  public static void softShutdown(String name, ExecutorService executor, int 
stopTimeoutVal, TimeUnit stopTimeoutUnit) {
+    executor.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!executor.awaitTermination(stopTimeoutVal, stopTimeoutUnit)) {
+        LOGGER.warn("{} was not shut down in the given time {} {} - 
interrupting now", name, stopTimeoutVal, stopTimeoutUnit);
+        executor.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!executor.awaitTermination(stopTimeoutVal, stopTimeoutUnit)) {
+          LOGGER.error("executor {} did not terminate", name);
+        }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      executor.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
+  }
+}
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java
index e2d91ad..f383d41 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/FIFOSchedulerTest.java
@@ -17,30 +17,28 @@
 
 package org.apache.zeppelin.scheduler;
 
-import junit.framework.TestCase;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import org.apache.zeppelin.scheduler.Job.Status;
+import org.junit.Before;
 import org.junit.Test;
 
-public class FIFOSchedulerTest extends TestCase {
+public class FIFOSchedulerTest {
 
   private SchedulerFactory schedulerSvc;
 
-  @Override
-  public void setUp() throws Exception {
-    schedulerSvc = new SchedulerFactory();
-  }
-
-  @Override
-  public void tearDown() {
-    schedulerSvc.destroy();
+  @Before
+  public void setUp() {
+    schedulerSvc = SchedulerFactory.singleton();
   }
 
   @Test
   public void testRun() throws InterruptedException {
     Scheduler s = schedulerSvc.createOrGetFIFOScheduler("test");
 
-    Job job1 = new SleepingJob("job1", null, 500);
-    Job job2 = new SleepingJob("job2", null, 500);
+    Job<?> job1 = new SleepingJob("job1", null, 500);
+    Job<?> job2 = new SleepingJob("job2", null, 500);
 
     s.submit(job1);
     s.submit(job2);
@@ -53,15 +51,15 @@ public class FIFOSchedulerTest extends TestCase {
     assertEquals(Status.FINISHED, job1.getStatus());
     assertEquals(Status.RUNNING, job2.getStatus());
     assertTrue((500 < (Long) job1.getReturn()));
-    s.stop();
+    schedulerSvc.removeScheduler(s.getName());
   }
 
   @Test
   public void testAbort() throws InterruptedException {
     Scheduler s = schedulerSvc.createOrGetFIFOScheduler("test");
 
-    Job job1 = new SleepingJob("job1", null, 500);
-    Job job2 = new SleepingJob("job2", null, 500);
+    Job<?> job1 = new SleepingJob("job1", null, 500);
+    Job<?> job2 = new SleepingJob("job2", null, 500);
 
     s.submit(job1);
     s.submit(job2);
@@ -78,6 +76,6 @@ public class FIFOSchedulerTest extends TestCase {
 
     assertTrue((500 > (Long) job1.getReturn()));
     assertEquals(null, job2.getReturn());
-    s.stop();
+    schedulerSvc.removeScheduler(s.getName());
   }
 }
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/ParallelSchedulerTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/ParallelSchedulerTest.java
index 9b9b9ba..fc8fa88 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/ParallelSchedulerTest.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/ParallelSchedulerTest.java
@@ -17,32 +17,28 @@
 
 package org.apache.zeppelin.scheduler;
 
+import static org.junit.Assert.assertEquals;
 
-import junit.framework.TestCase;
 import org.apache.zeppelin.scheduler.Job.Status;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
-public class ParallelSchedulerTest extends TestCase {
+public class ParallelSchedulerTest {
 
-  private SchedulerFactory schedulerSvc;
+  private static SchedulerFactory schedulerSvc;
 
-  @Override
-  public void setUp() throws Exception {
-    schedulerSvc = new SchedulerFactory();
-  }
-
-  @Override
-  public void tearDown() {
-    schedulerSvc.destroy();
+  @BeforeClass
+  public static void setUp() {
+    schedulerSvc = SchedulerFactory.singleton();
   }
 
   @Test
   public void testRun() throws InterruptedException {
     Scheduler s = schedulerSvc.createOrGetParallelScheduler("test", 2);
 
-    Job job1 = new SleepingJob("job1", null, 500);
-    Job job2 = new SleepingJob("job2", null, 500);
-    Job job3 = new SleepingJob("job3", null, 500);
+    Job<?> job1 = new SleepingJob("job1", null, 500);
+    Job<?> job2 = new SleepingJob("job2", null, 500);
+    Job<?> job3 = new SleepingJob("job3", null, 500);
 
     s.submit(job1);
     s.submit(job2);
@@ -58,6 +54,6 @@ public class ParallelSchedulerTest extends TestCase {
     assertEquals(Status.FINISHED, job1.getStatus());
     assertEquals(Status.FINISHED, job2.getStatus());
     assertEquals(Status.RUNNING, job3.getStatus());
+    schedulerSvc.removeScheduler(s.getName());
   }
-
 }
diff --git a/zeppelin-interpreter/src/test/resources/log4j.properties 
b/zeppelin-interpreter/src/test/resources/log4j.properties
index b724845..4f78acf 100644
--- a/zeppelin-interpreter/src/test/resources/log4j.properties
+++ b/zeppelin-interpreter/src/test/resources/log4j.properties
@@ -19,7 +19,7 @@
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.Target=System.out
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n
+log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p [%t] %c:%L - 
%m%n
 #log4j.appender.stdout.layout.ConversionPattern=
 #%5p [%t] (%F:%L) - %m%n
 #%-4r [%t] %-5p %c %x - %m%n
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
index 3797c8b..d8797ff 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
@@ -19,10 +19,12 @@ package org.apache.zeppelin.scheduler;
 
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
 import org.apache.zeppelin.scheduler.Job.Status;
+import org.apache.zeppelin.util.ExecutorUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * RemoteScheduler runs in ZeppelinServer and proxies Scheduler running on 
RemoteInterpreter.
@@ -219,4 +221,10 @@ public class RemoteScheduler extends AbstractScheduler {
       }
     }
   }
+
+  @Override
+  public void stop(int stopTimeoutVal, TimeUnit stopTimeoutUnit) {
+    super.stop();
+  }
+
 }
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
index 84e7fbe..8b84243 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
@@ -48,13 +48,15 @@ public class RemoteSchedulerTest extends 
AbstractInterpreterTest
   private static final int TICK_WAIT = 100;
   private static final int MAX_WAIT_CYCLES = 100;
 
+  @Override
   @Before
   public void setUp() throws Exception {
     super.setUp();
-    schedulerSvc = new SchedulerFactory();
+    schedulerSvc = SchedulerFactory.singleton();
     interpreterSetting = 
interpreterSettingManager.getInterpreterSettingByName("test");
   }
 
+  @Override
   @After
   public void tearDown() {
     interpreterSetting.close();

Reply via email to