This is an automated email from the ASF dual-hosted git repository. pdallig pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new d4f9e3eabd [ZEPPELIN-6066] Cleanups around Job and Scheduler (#4804) d4f9e3eabd is described below commit d4f9e3eabd08b14819d1f5290bbeaa81137a6c0b Author: Philipp Dallig <philipp.dal...@gmail.com> AuthorDate: Wed Aug 28 07:22:58 2024 +0200 [ZEPPELIN-6066] Cleanups around Job and Scheduler (#4804) * Simple changes * Rename SchedulerThreadFactory to NamedThreadFactory * some small change in job - SimpleDateFormat is not threadsafe, and there is no need to keep the object in memmory * Remove change in InterpreterContext --- .../zeppelin/interpreter/InterpreterContext.java | 2 +- .../remote/RemoteInterpreterServer.java | 4 +-- .../zeppelin/scheduler/AbstractScheduler.java | 3 +- .../apache/zeppelin/scheduler/ExecutorFactory.java | 34 ++++++++------------- .../apache/zeppelin/scheduler/FIFOScheduler.java | 2 +- .../java/org/apache/zeppelin/scheduler/Job.java | 9 +++--- ...rThreadFactory.java => NamedThreadFactory.java} | 15 +++++----- .../zeppelin/scheduler/ParallelScheduler.java | 6 +--- .../zeppelin/scheduler/SchedulerFactory.java | 35 ++++++++++------------ .../interpreter/ManagedInterpreterGroup.java | 2 +- .../zeppelin/interpreter/YarnAppMonitor.java | 4 +-- .../zeppelin/notebook/NoteEventAsyncListener.java | 4 +-- .../org/apache/zeppelin/notebook/Notebook.java | 4 +-- .../apache/zeppelin/scheduler/RemoteScheduler.java | 2 +- 14 files changed, 56 insertions(+), 70 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java index 89b1e542af..347e5bc57e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java @@ -295,7 +295,7 @@ public class InterpreterContext { if (progressMap != null) { n = Math.max(n, 0); n = Math.min(n, 100); - progressMap.put(paragraphId, new Integer(n)); + progressMap.put(paragraphId, Integer.valueOf(n)); } } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index f84aceb9bc..6ef35eec74 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -941,7 +941,7 @@ public class RemoteInterpreterServer extends Thread LOGGER.info("cancel {} {}", className, interpreterContext.getParagraphId()); Interpreter intp = getInterpreter(sessionId, className); String jobId = interpreterContext.getParagraphId(); - Job job = intp.getScheduler().getJob(jobId); + Job<?> job = intp.getScheduler().getJob(jobId); if (job != null && job.getStatus() == Status.PENDING) { job.setStatus(Status.ABORT); @@ -1105,7 +1105,7 @@ public class RemoteInterpreterServer extends Thread for (Interpreter intp : interpreters) { Scheduler scheduler = intp.getScheduler(); if (scheduler != null) { - Job job = scheduler.getJob(jobId); + Job<?> job = scheduler.getJob(jobId); if (job != null) { return job.getStatus().name(); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java index ad21c3c63f..7e99095b7f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java @@ -39,10 +39,11 @@ public abstract class AbstractScheduler implements Scheduler { protected final String name; protected volatile boolean terminate = false; protected BlockingQueue<Job<?>> queue = new LinkedBlockingQueue<>(); + // JobId -> Job protected Map<String, Job<?>> jobs = new ConcurrentHashMap<>(); private Thread schedulerThread; - public AbstractScheduler(String name) { + protected AbstractScheduler(String name) { this.name = name; } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java index b20ccc75c4..00e1976bec 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java @@ -47,25 +47,19 @@ public class ExecutorFactory { return InstanceHolder.INSTANCE; } - public ExecutorService createOrGet(String name, int numThread) { + public ExecutorService createOrGet(final String name, final int numThread) { synchronized (executors) { - if (!executors.containsKey(name)) { - executors.put(name, Executors.newScheduledThreadPool( - numThread, - new SchedulerThreadFactory(name))); - } - return executors.get(name); + return executors.computeIfAbsent(name, k -> (Executors.newScheduledThreadPool( + numThread, + new NamedThreadFactory(k)))); } } - public ScheduledExecutorService createOrGetScheduled(String name, int numThread) { + public ScheduledExecutorService createOrGetScheduled(final String name, final int numThread) { synchronized (scheduledExecutors) { - if (!scheduledExecutors.containsKey(name)) { - scheduledExecutors.put(name, Executors.newScheduledThreadPool( - numThread, - new SchedulerThreadFactory(name))); - } - return scheduledExecutors.get(name); + return scheduledExecutors.computeIfAbsent(name, k -> (Executors.newScheduledThreadPool( + numThread, + new NamedThreadFactory(k)))); } } @@ -75,22 +69,20 @@ public class ExecutorFactory { * @return */ public ExecutorService getNoteJobExecutor() { - return createOrGet("NoteJobThread-", 50); + return createOrGet("NoteJobThread", 50); } public void shutdown(String name) { synchronized (executors) { - if (executors.containsKey(name)) { - ExecutorService e = executors.get(name); + ExecutorService e = executors.remove(name); + if (e != null) { ExecutorUtil.softShutdown(name, e, 1, TimeUnit.MINUTES); - executors.remove(name); } } synchronized (scheduledExecutors) { - if (scheduledExecutors.containsKey(name)) { - ExecutorService e = scheduledExecutors.get(name); + ExecutorService e = scheduledExecutors.remove(name); + if (e != null) { ExecutorUtil.softShutdown(name, e, 1, TimeUnit.MINUTES); - scheduledExecutors.remove(name); } } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java index 05519c3a49..f6c5dd5a29 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java @@ -33,7 +33,7 @@ public class FIFOScheduler extends AbstractScheduler { FIFOScheduler(String name) { super(name); this.executor = Executors.newSingleThreadExecutor( - new SchedulerThreadFactory("FIFOScheduler-" + name + "-Worker-")); + new NamedThreadFactory("FIFOScheduler-" + name + "-Worker")); } @Override diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java index 44014ad8c3..b0ed600f45 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java @@ -38,8 +38,9 @@ import java.util.Map; * Changing/adding/deleting non transitive field name need consideration of that. */ public abstract class Job<T> { + private static final Logger LOGGER = LoggerFactory.getLogger(Job.class); - private static SimpleDateFormat JOB_DATE_FORMAT = new SimpleDateFormat("yyyyMMdd-HHmmss"); + private static final String DATE_FORMAT = "yyyyMMdd-HHmmss"; /** * Job status. @@ -93,15 +94,15 @@ public abstract class Job<T> { private transient volatile Throwable exception; private transient JobListener listener; - public Job(String jobName, JobListener listener) { + protected Job(String jobName, JobListener listener) { this.jobName = jobName; this.listener = listener; dateCreated = new Date(); - id = JOB_DATE_FORMAT.format(dateCreated) + "_" + jobName; + id = new SimpleDateFormat(DATE_FORMAT).format(dateCreated) + "_" + jobName; setStatus(Status.READY); } - public Job(String jobId, String jobName, JobListener listener) { + protected Job(String jobId, String jobName, JobListener listener) { this.jobName = jobName; this.listener = listener; dateCreated = new Date(); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerThreadFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/NamedThreadFactory.java similarity index 74% rename from zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerThreadFactory.java rename to zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/NamedThreadFactory.java index fe0711e685..cd53160a7b 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerThreadFactory.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/NamedThreadFactory.java @@ -21,19 +21,18 @@ package org.apache.zeppelin.scheduler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicLong; -public class SchedulerThreadFactory implements ThreadFactory { +public class NamedThreadFactory implements ThreadFactory { - private String namePrefix; - private AtomicLong count = new AtomicLong(1); + private final String name; + private final AtomicLong count = new AtomicLong(1); - public SchedulerThreadFactory(String namePrefix) { - this.namePrefix = namePrefix; + public NamedThreadFactory(String name) { + this.name = name; } @Override public Thread newThread(Runnable r) { - Thread thread = new Thread(r); - thread.setName(namePrefix + count.getAndIncrement()); - return thread; + return new Thread(r, name + "-" + count.getAndIncrement()); + } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java index a58e349f90..9f0c27cccc 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java @@ -22,22 +22,18 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.zeppelin.util.ExecutorUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Parallel scheduler runs submitted job concurrently. */ public class ParallelScheduler extends AbstractScheduler { - private static final Logger LOGGER = LoggerFactory.getLogger(ParallelScheduler.class); - private ExecutorService executor; ParallelScheduler(String name, int maxConcurrency) { super(name); this.executor = Executors.newFixedThreadPool(maxConcurrency, - new SchedulerThreadFactory("ParallelScheduler-Worker-")); + new NamedThreadFactory("ParallelScheduler-Worker")); } @Override diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java index 34609b4916..e3213467fc 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java @@ -69,38 +69,35 @@ public class SchedulerFactory { ExecutorUtil.softShutdown("SchedulerFactoryExecutor", executor, 60, TimeUnit.SECONDS); } - public Scheduler createOrGetFIFOScheduler(String name) { + public Scheduler createOrGetFIFOScheduler(final String name) { synchronized (schedulers) { - if (!schedulers.containsKey(name)) { - LOGGER.info("Create FIFOScheduler: {}", name); - FIFOScheduler s = new FIFOScheduler(name); - schedulers.put(name, s); + return schedulers.computeIfAbsent(name, k -> { + LOGGER.info("Create FIFOScheduler: {}", k); + FIFOScheduler s = new FIFOScheduler(k); executor.execute(s); - } - return schedulers.get(name); + return s; + }); } } - public Scheduler createOrGetParallelScheduler(String name, int maxConcurrency) { + public Scheduler createOrGetParallelScheduler(final String name, final int maxConcurrency) { synchronized (schedulers) { - if (!schedulers.containsKey(name)) { - LOGGER.info("Create ParallelScheduler: {} with maxConcurrency: {}", name, maxConcurrency); - ParallelScheduler s = new ParallelScheduler(name, maxConcurrency); - schedulers.put(name, s); + return schedulers.computeIfAbsent(name, k -> { + LOGGER.info("Create ParallelScheduler: {} with maxConcurrency: {}", k, maxConcurrency); + ParallelScheduler s = new ParallelScheduler(k, maxConcurrency); executor.execute(s); - } - return schedulers.get(name); + return s; + }); } } - public Scheduler createOrGetScheduler(Scheduler scheduler) { + public Scheduler createOrGetScheduler(final Scheduler scheduler) { synchronized (schedulers) { - if (!schedulers.containsKey(scheduler.getName())) { - schedulers.put(scheduler.getName(), scheduler); + return schedulers.computeIfAbsent(scheduler.getName(), k -> { executor.execute(scheduler); - } - return schedulers.get(scheduler.getName()); + return scheduler; + }); } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java index c1b5076167..8f2c16c074 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java @@ -150,7 +150,7 @@ public class ManagedInterpreterGroup extends InterpreterGroup { try { if (Boolean.parseBoolean( interpreter.getProperty("zeppelin.interpreter.close.cancel_job", "true"))) { - for (final Job job : scheduler.getAllJobs()) { + for (final Job<?> job : scheduler.getAllJobs()) { if (!job.isTerminated()) { job.abort(); job.setStatus(Job.Status.ABORT); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/YarnAppMonitor.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/YarnAppMonitor.java index 56dc7ea912..48956b337b 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/YarnAppMonitor.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/YarnAppMonitor.java @@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; -import org.apache.zeppelin.scheduler.SchedulerThreadFactory; +import org.apache.zeppelin.scheduler.NamedThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,7 +65,7 @@ public class YarnAppMonitor { yarnConf.set("yarn.timeline-service.enabled", "false"); yarnClient.init(yarnConf); yarnClient.start(); - this.executor = Executors.newSingleThreadScheduledExecutor(new SchedulerThreadFactory("YarnAppsMonitor-Thread")); + this.executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("YarnAppsMonitor-Thread")); this.apps = new ConcurrentHashMap<>(); this.executor.scheduleAtFixedRate(() -> { try { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java index dabc5dde40..a565b02ec3 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java @@ -18,7 +18,7 @@ package org.apache.zeppelin.notebook; import org.apache.zeppelin.scheduler.Job; -import org.apache.zeppelin.scheduler.SchedulerThreadFactory; +import org.apache.zeppelin.scheduler.NamedThreadFactory; import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.util.ExecutorUtil; import org.slf4j.Logger; @@ -43,7 +43,7 @@ public abstract class NoteEventAsyncListener implements NoteEventListener, Close protected NoteEventAsyncListener(String name) { this.name = name; executor = new ThreadPoolExecutor(0, 1, 1, TimeUnit.MINUTES, - new LinkedBlockingQueue<>(), new SchedulerThreadFactory(name)); + new LinkedBlockingQueue<>(), new NamedThreadFactory(name)); } public abstract void handleNoteCreateEvent(NoteCreateEvent noteCreateEvent); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 33d468931b..1f98c716f1 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -57,7 +57,7 @@ import org.apache.zeppelin.notebook.repo.NotebookRepoSync; import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl; import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl.Revision; import org.apache.zeppelin.scheduler.Job; -import org.apache.zeppelin.scheduler.SchedulerThreadFactory; +import org.apache.zeppelin.scheduler.NamedThreadFactory; import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.user.Credentials; import org.apache.zeppelin.util.ExecutorUtil; @@ -138,7 +138,7 @@ public class Notebook { public void initNotebook() { if (initExecutor == null || initExecutor.isShutdown() || initExecutor.isTerminated()) { initExecutor = new ThreadPoolExecutor(0, Runtime.getRuntime().availableProcessors(), 1, TimeUnit.MINUTES, - new LinkedBlockingQueue<>(), new SchedulerThreadFactory("NotebookInit")); + new LinkedBlockingQueue<>(), new NamedThreadFactory("NotebookInit")); } for (NoteInfo noteInfo : getNotesInfo()) { initExecutor.execute(() -> { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java index 6836eb33d1..e5807877f9 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java @@ -43,7 +43,7 @@ public class RemoteScheduler extends AbstractScheduler { RemoteInterpreter remoteInterpreter) { super(name); this.executor = - Executors.newSingleThreadExecutor(new SchedulerThreadFactory("FIFO-" + name + "-")); + Executors.newSingleThreadExecutor(new NamedThreadFactory("FIFO-" + name)); this.remoteInterpreter = remoteInterpreter; }