This is an automated email from the ASF dual-hosted git repository.

pdallig pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new d4f9e3eabd [ZEPPELIN-6066] Cleanups around Job and Scheduler (#4804)
d4f9e3eabd is described below

commit d4f9e3eabd08b14819d1f5290bbeaa81137a6c0b
Author: Philipp Dallig <philipp.dal...@gmail.com>
AuthorDate: Wed Aug 28 07:22:58 2024 +0200

    [ZEPPELIN-6066] Cleanups around Job and Scheduler (#4804)
    
    * Simple changes
    
    * Rename SchedulerThreadFactory to NamedThreadFactory
    
    * some small change in job - SimpleDateFormat is not threadsafe, and there 
is no need to keep the object in memmory
    
    * Remove change in InterpreterContext
---
 .../zeppelin/interpreter/InterpreterContext.java   |  2 +-
 .../remote/RemoteInterpreterServer.java            |  4 +--
 .../zeppelin/scheduler/AbstractScheduler.java      |  3 +-
 .../apache/zeppelin/scheduler/ExecutorFactory.java | 34 ++++++++-------------
 .../apache/zeppelin/scheduler/FIFOScheduler.java   |  2 +-
 .../java/org/apache/zeppelin/scheduler/Job.java    |  9 +++---
 ...rThreadFactory.java => NamedThreadFactory.java} | 15 +++++-----
 .../zeppelin/scheduler/ParallelScheduler.java      |  6 +---
 .../zeppelin/scheduler/SchedulerFactory.java       | 35 ++++++++++------------
 .../interpreter/ManagedInterpreterGroup.java       |  2 +-
 .../zeppelin/interpreter/YarnAppMonitor.java       |  4 +--
 .../zeppelin/notebook/NoteEventAsyncListener.java  |  4 +--
 .../org/apache/zeppelin/notebook/Notebook.java     |  4 +--
 .../apache/zeppelin/scheduler/RemoteScheduler.java |  2 +-
 14 files changed, 56 insertions(+), 70 deletions(-)

diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
index 89b1e542af..347e5bc57e 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -295,7 +295,7 @@ public class InterpreterContext {
     if (progressMap != null) {
       n = Math.max(n, 0);
       n = Math.min(n, 100);
-      progressMap.put(paragraphId, new Integer(n));
+      progressMap.put(paragraphId, Integer.valueOf(n));
     }
   }
 }
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index f84aceb9bc..6ef35eec74 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -941,7 +941,7 @@ public class RemoteInterpreterServer extends Thread
     LOGGER.info("cancel {} {}", className, 
interpreterContext.getParagraphId());
     Interpreter intp = getInterpreter(sessionId, className);
     String jobId = interpreterContext.getParagraphId();
-    Job job = intp.getScheduler().getJob(jobId);
+    Job<?> job = intp.getScheduler().getJob(jobId);
 
     if (job != null && job.getStatus() == Status.PENDING) {
       job.setStatus(Status.ABORT);
@@ -1105,7 +1105,7 @@ public class RemoteInterpreterServer extends Thread
       for (Interpreter intp : interpreters) {
         Scheduler scheduler = intp.getScheduler();
         if (scheduler != null) {
-          Job job = scheduler.getJob(jobId);
+          Job<?> job = scheduler.getJob(jobId);
           if (job != null) {
             return job.getStatus().name();
           }
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
index ad21c3c63f..7e99095b7f 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
@@ -39,10 +39,11 @@ public abstract class AbstractScheduler implements 
Scheduler {
   protected final String name;
   protected volatile boolean terminate = false;
   protected BlockingQueue<Job<?>> queue = new LinkedBlockingQueue<>();
+  // JobId -> Job
   protected Map<String, Job<?>> jobs = new ConcurrentHashMap<>();
   private Thread schedulerThread;
 
-  public AbstractScheduler(String name) {
+  protected AbstractScheduler(String name) {
     this.name = name;
   }
 
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java
index b20ccc75c4..00e1976bec 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java
@@ -47,25 +47,19 @@ public class ExecutorFactory {
     return InstanceHolder.INSTANCE;
   }
 
-  public ExecutorService createOrGet(String name, int numThread) {
+  public ExecutorService createOrGet(final String name, final int numThread) {
     synchronized (executors) {
-      if (!executors.containsKey(name)) {
-        executors.put(name, Executors.newScheduledThreadPool(
-            numThread,
-            new SchedulerThreadFactory(name)));
-      }
-      return executors.get(name);
+      return executors.computeIfAbsent(name, k -> 
(Executors.newScheduledThreadPool(
+          numThread,
+          new NamedThreadFactory(k))));
     }
   }
 
-  public ScheduledExecutorService createOrGetScheduled(String name, int 
numThread) {
+  public ScheduledExecutorService createOrGetScheduled(final String name, 
final int numThread) {
     synchronized (scheduledExecutors) {
-      if (!scheduledExecutors.containsKey(name)) {
-        scheduledExecutors.put(name, Executors.newScheduledThreadPool(
-            numThread,
-            new SchedulerThreadFactory(name)));
-      }
-      return scheduledExecutors.get(name);
+      return scheduledExecutors.computeIfAbsent(name, k -> 
(Executors.newScheduledThreadPool(
+          numThread,
+          new NamedThreadFactory(k))));
     }
   }
 
@@ -75,22 +69,20 @@ public class ExecutorFactory {
    * @return
    */
   public ExecutorService getNoteJobExecutor() {
-    return createOrGet("NoteJobThread-", 50);
+    return createOrGet("NoteJobThread", 50);
   }
 
   public void shutdown(String name) {
     synchronized (executors) {
-      if (executors.containsKey(name)) {
-        ExecutorService e = executors.get(name);
+      ExecutorService e = executors.remove(name);
+      if (e != null) {
         ExecutorUtil.softShutdown(name, e, 1, TimeUnit.MINUTES);
-        executors.remove(name);
       }
     }
     synchronized (scheduledExecutors) {
-      if (scheduledExecutors.containsKey(name)) {
-        ExecutorService e = scheduledExecutors.get(name);
+      ExecutorService e = scheduledExecutors.remove(name);
+      if (e != null) {
         ExecutorUtil.softShutdown(name, e, 1, TimeUnit.MINUTES);
-        scheduledExecutors.remove(name);
       }
     }
   }
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
index 05519c3a49..f6c5dd5a29 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
@@ -33,7 +33,7 @@ public class FIFOScheduler extends AbstractScheduler {
   FIFOScheduler(String name) {
     super(name);
     this.executor = Executors.newSingleThreadExecutor(
-        new SchedulerThreadFactory("FIFOScheduler-" + name + "-Worker-"));
+        new NamedThreadFactory("FIFOScheduler-" + name + "-Worker"));
   }
 
   @Override
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
index 44014ad8c3..b0ed600f45 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
@@ -38,8 +38,9 @@ import java.util.Map;
  * Changing/adding/deleting non transitive field name need consideration of 
that.
  */
 public abstract class Job<T> {
+
   private static final Logger LOGGER = LoggerFactory.getLogger(Job.class);
-  private static SimpleDateFormat JOB_DATE_FORMAT = new 
SimpleDateFormat("yyyyMMdd-HHmmss");
+  private static final String DATE_FORMAT = "yyyyMMdd-HHmmss";
 
   /**
    * Job status.
@@ -93,15 +94,15 @@ public abstract class Job<T> {
   private transient volatile Throwable exception;
   private transient JobListener listener;
 
-  public Job(String jobName, JobListener listener) {
+  protected Job(String jobName, JobListener listener) {
     this.jobName = jobName;
     this.listener = listener;
     dateCreated = new Date();
-    id = JOB_DATE_FORMAT.format(dateCreated) + "_" + jobName;
+    id = new SimpleDateFormat(DATE_FORMAT).format(dateCreated) + "_" + jobName;
     setStatus(Status.READY);
   }
 
-  public Job(String jobId, String jobName, JobListener listener) {
+  protected Job(String jobId, String jobName, JobListener listener) {
     this.jobName = jobName;
     this.listener = listener;
     dateCreated = new Date();
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerThreadFactory.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/NamedThreadFactory.java
similarity index 74%
rename from 
zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerThreadFactory.java
rename to 
zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/NamedThreadFactory.java
index fe0711e685..cd53160a7b 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerThreadFactory.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/NamedThreadFactory.java
@@ -21,19 +21,18 @@ package org.apache.zeppelin.scheduler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicLong;
 
-public class SchedulerThreadFactory implements ThreadFactory {
+public class NamedThreadFactory implements ThreadFactory {
 
-  private String namePrefix;
-  private AtomicLong count = new AtomicLong(1);
+  private final String name;
+  private final AtomicLong count = new AtomicLong(1);
 
-  public SchedulerThreadFactory(String namePrefix) {
-    this.namePrefix = namePrefix;
+  public NamedThreadFactory(String name) {
+    this.name = name;
   }
 
   @Override
   public Thread newThread(Runnable r) {
-    Thread thread = new Thread(r);
-    thread.setName(namePrefix + count.getAndIncrement());
-    return thread;
+    return new Thread(r, name + "-" + count.getAndIncrement());
+
   }
 }
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
index a58e349f90..9f0c27cccc 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
@@ -22,22 +22,18 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.zeppelin.util.ExecutorUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Parallel scheduler runs submitted job concurrently.
  */
 public class ParallelScheduler extends AbstractScheduler {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(ParallelScheduler.class);
-
   private ExecutorService executor;
 
   ParallelScheduler(String name, int maxConcurrency) {
     super(name);
     this.executor = Executors.newFixedThreadPool(maxConcurrency,
-        new SchedulerThreadFactory("ParallelScheduler-Worker-"));
+        new NamedThreadFactory("ParallelScheduler-Worker"));
   }
 
   @Override
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
index 34609b4916..e3213467fc 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
@@ -69,38 +69,35 @@ public class SchedulerFactory {
     ExecutorUtil.softShutdown("SchedulerFactoryExecutor", executor, 60, 
TimeUnit.SECONDS);
   }
 
-  public Scheduler createOrGetFIFOScheduler(String name) {
+  public Scheduler createOrGetFIFOScheduler(final String name) {
     synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
-        LOGGER.info("Create FIFOScheduler: {}", name);
-        FIFOScheduler s = new FIFOScheduler(name);
-        schedulers.put(name, s);
+      return schedulers.computeIfAbsent(name, k -> {
+        LOGGER.info("Create FIFOScheduler: {}", k);
+        FIFOScheduler s = new FIFOScheduler(k);
         executor.execute(s);
-      }
-      return schedulers.get(name);
+        return s;
+      });
     }
   }
 
-  public Scheduler createOrGetParallelScheduler(String name, int 
maxConcurrency) {
+  public Scheduler createOrGetParallelScheduler(final String name, final int 
maxConcurrency) {
     synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
-        LOGGER.info("Create ParallelScheduler: {} with maxConcurrency: {}", 
name, maxConcurrency);
-        ParallelScheduler s = new ParallelScheduler(name, maxConcurrency);
-        schedulers.put(name, s);
+      return schedulers.computeIfAbsent(name, k -> {
+        LOGGER.info("Create ParallelScheduler: {} with maxConcurrency: {}", k, 
maxConcurrency);
+        ParallelScheduler s = new ParallelScheduler(k, maxConcurrency);
         executor.execute(s);
-      }
-      return schedulers.get(name);
+        return s;
+      });
     }
   }
 
 
-  public Scheduler createOrGetScheduler(Scheduler scheduler) {
+  public Scheduler createOrGetScheduler(final Scheduler scheduler) {
     synchronized (schedulers) {
-      if (!schedulers.containsKey(scheduler.getName())) {
-        schedulers.put(scheduler.getName(), scheduler);
+      return schedulers.computeIfAbsent(scheduler.getName(), k -> {
         executor.execute(scheduler);
-      }
-      return schedulers.get(scheduler.getName());
+        return scheduler;
+      });
     }
   }
 
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
index c1b5076167..8f2c16c074 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
@@ -150,7 +150,7 @@ public class ManagedInterpreterGroup extends 
InterpreterGroup {
     try {
       if (Boolean.parseBoolean(
               interpreter.getProperty("zeppelin.interpreter.close.cancel_job", 
"true"))) {
-        for (final Job job : scheduler.getAllJobs()) {
+        for (final Job<?> job : scheduler.getAllJobs()) {
           if (!job.isTerminated()) {
             job.abort();
             job.setStatus(Job.Status.ABORT);
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/YarnAppMonitor.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/YarnAppMonitor.java
index 56dc7ea912..48956b337b 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/YarnAppMonitor.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/YarnAppMonitor.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
-import org.apache.zeppelin.scheduler.SchedulerThreadFactory;
+import org.apache.zeppelin.scheduler.NamedThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,7 +65,7 @@ public class YarnAppMonitor {
       yarnConf.set("yarn.timeline-service.enabled", "false");
       yarnClient.init(yarnConf);
       yarnClient.start();
-      this.executor = Executors.newSingleThreadScheduledExecutor(new 
SchedulerThreadFactory("YarnAppsMonitor-Thread"));
+      this.executor = Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory("YarnAppsMonitor-Thread"));
       this.apps = new ConcurrentHashMap<>();
       this.executor.scheduleAtFixedRate(() -> {
                 try {
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java
index dabc5dde40..a565b02ec3 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java
@@ -18,7 +18,7 @@
 package org.apache.zeppelin.notebook;
 
 import org.apache.zeppelin.scheduler.Job;
-import org.apache.zeppelin.scheduler.SchedulerThreadFactory;
+import org.apache.zeppelin.scheduler.NamedThreadFactory;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.apache.zeppelin.util.ExecutorUtil;
 import org.slf4j.Logger;
@@ -43,7 +43,7 @@ public abstract class NoteEventAsyncListener implements 
NoteEventListener, Close
   protected NoteEventAsyncListener(String name) {
     this.name = name;
     executor = new ThreadPoolExecutor(0, 1, 1, TimeUnit.MINUTES,
-            new LinkedBlockingQueue<>(), new SchedulerThreadFactory(name));
+            new LinkedBlockingQueue<>(), new NamedThreadFactory(name));
   }
 
   public abstract void handleNoteCreateEvent(NoteCreateEvent noteCreateEvent);
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index 33d468931b..1f98c716f1 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -57,7 +57,7 @@ import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
 import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl;
 import 
org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl.Revision;
 import org.apache.zeppelin.scheduler.Job;
-import org.apache.zeppelin.scheduler.SchedulerThreadFactory;
+import org.apache.zeppelin.scheduler.NamedThreadFactory;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.apache.zeppelin.user.Credentials;
 import org.apache.zeppelin.util.ExecutorUtil;
@@ -138,7 +138,7 @@ public class Notebook {
   public void initNotebook() {
     if (initExecutor == null || initExecutor.isShutdown() || 
initExecutor.isTerminated()) {
       initExecutor = new ThreadPoolExecutor(0, 
Runtime.getRuntime().availableProcessors(), 1, TimeUnit.MINUTES,
-                     new LinkedBlockingQueue<>(), new 
SchedulerThreadFactory("NotebookInit"));
+                     new LinkedBlockingQueue<>(), new 
NamedThreadFactory("NotebookInit"));
     }
     for (NoteInfo noteInfo : getNotesInfo()) {
       initExecutor.execute(() -> {
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
index 6836eb33d1..e5807877f9 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
@@ -43,7 +43,7 @@ public class RemoteScheduler extends AbstractScheduler {
                          RemoteInterpreter remoteInterpreter) {
     super(name);
     this.executor =
-        Executors.newSingleThreadExecutor(new SchedulerThreadFactory("FIFO-" + 
name + "-"));
+        Executors.newSingleThreadExecutor(new NamedThreadFactory("FIFO-" + 
name));
     this.remoteInterpreter = remoteInterpreter;
   }
 

Reply via email to