This is an automated email from the ASF dual-hosted git repository.
pdallig pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new d4f9e3eabd [ZEPPELIN-6066] Cleanups around Job and Scheduler (#4804)
d4f9e3eabd is described below
commit d4f9e3eabd08b14819d1f5290bbeaa81137a6c0b
Author: Philipp Dallig <[email protected]>
AuthorDate: Wed Aug 28 07:22:58 2024 +0200
[ZEPPELIN-6066] Cleanups around Job and Scheduler (#4804)
* Simple changes
* Rename SchedulerThreadFactory to NamedThreadFactory
* some small change in job - SimpleDateFormat is not threadsafe, and there
is no need to keep the object in memmory
* Remove change in InterpreterContext
---
.../zeppelin/interpreter/InterpreterContext.java | 2 +-
.../remote/RemoteInterpreterServer.java | 4 +--
.../zeppelin/scheduler/AbstractScheduler.java | 3 +-
.../apache/zeppelin/scheduler/ExecutorFactory.java | 34 ++++++++-------------
.../apache/zeppelin/scheduler/FIFOScheduler.java | 2 +-
.../java/org/apache/zeppelin/scheduler/Job.java | 9 +++---
...rThreadFactory.java => NamedThreadFactory.java} | 15 +++++-----
.../zeppelin/scheduler/ParallelScheduler.java | 6 +---
.../zeppelin/scheduler/SchedulerFactory.java | 35 ++++++++++------------
.../interpreter/ManagedInterpreterGroup.java | 2 +-
.../zeppelin/interpreter/YarnAppMonitor.java | 4 +--
.../zeppelin/notebook/NoteEventAsyncListener.java | 4 +--
.../org/apache/zeppelin/notebook/Notebook.java | 4 +--
.../apache/zeppelin/scheduler/RemoteScheduler.java | 2 +-
14 files changed, 56 insertions(+), 70 deletions(-)
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
index 89b1e542af..347e5bc57e 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -295,7 +295,7 @@ public class InterpreterContext {
if (progressMap != null) {
n = Math.max(n, 0);
n = Math.min(n, 100);
- progressMap.put(paragraphId, new Integer(n));
+ progressMap.put(paragraphId, Integer.valueOf(n));
}
}
}
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index f84aceb9bc..6ef35eec74 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -941,7 +941,7 @@ public class RemoteInterpreterServer extends Thread
LOGGER.info("cancel {} {}", className,
interpreterContext.getParagraphId());
Interpreter intp = getInterpreter(sessionId, className);
String jobId = interpreterContext.getParagraphId();
- Job job = intp.getScheduler().getJob(jobId);
+ Job<?> job = intp.getScheduler().getJob(jobId);
if (job != null && job.getStatus() == Status.PENDING) {
job.setStatus(Status.ABORT);
@@ -1105,7 +1105,7 @@ public class RemoteInterpreterServer extends Thread
for (Interpreter intp : interpreters) {
Scheduler scheduler = intp.getScheduler();
if (scheduler != null) {
- Job job = scheduler.getJob(jobId);
+ Job<?> job = scheduler.getJob(jobId);
if (job != null) {
return job.getStatus().name();
}
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
index ad21c3c63f..7e99095b7f 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
@@ -39,10 +39,11 @@ public abstract class AbstractScheduler implements
Scheduler {
protected final String name;
protected volatile boolean terminate = false;
protected BlockingQueue<Job<?>> queue = new LinkedBlockingQueue<>();
+ // JobId -> Job
protected Map<String, Job<?>> jobs = new ConcurrentHashMap<>();
private Thread schedulerThread;
- public AbstractScheduler(String name) {
+ protected AbstractScheduler(String name) {
this.name = name;
}
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java
index b20ccc75c4..00e1976bec 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ExecutorFactory.java
@@ -47,25 +47,19 @@ public class ExecutorFactory {
return InstanceHolder.INSTANCE;
}
- public ExecutorService createOrGet(String name, int numThread) {
+ public ExecutorService createOrGet(final String name, final int numThread) {
synchronized (executors) {
- if (!executors.containsKey(name)) {
- executors.put(name, Executors.newScheduledThreadPool(
- numThread,
- new SchedulerThreadFactory(name)));
- }
- return executors.get(name);
+ return executors.computeIfAbsent(name, k ->
(Executors.newScheduledThreadPool(
+ numThread,
+ new NamedThreadFactory(k))));
}
}
- public ScheduledExecutorService createOrGetScheduled(String name, int
numThread) {
+ public ScheduledExecutorService createOrGetScheduled(final String name,
final int numThread) {
synchronized (scheduledExecutors) {
- if (!scheduledExecutors.containsKey(name)) {
- scheduledExecutors.put(name, Executors.newScheduledThreadPool(
- numThread,
- new SchedulerThreadFactory(name)));
- }
- return scheduledExecutors.get(name);
+ return scheduledExecutors.computeIfAbsent(name, k ->
(Executors.newScheduledThreadPool(
+ numThread,
+ new NamedThreadFactory(k))));
}
}
@@ -75,22 +69,20 @@ public class ExecutorFactory {
* @return
*/
public ExecutorService getNoteJobExecutor() {
- return createOrGet("NoteJobThread-", 50);
+ return createOrGet("NoteJobThread", 50);
}
public void shutdown(String name) {
synchronized (executors) {
- if (executors.containsKey(name)) {
- ExecutorService e = executors.get(name);
+ ExecutorService e = executors.remove(name);
+ if (e != null) {
ExecutorUtil.softShutdown(name, e, 1, TimeUnit.MINUTES);
- executors.remove(name);
}
}
synchronized (scheduledExecutors) {
- if (scheduledExecutors.containsKey(name)) {
- ExecutorService e = scheduledExecutors.get(name);
+ ExecutorService e = scheduledExecutors.remove(name);
+ if (e != null) {
ExecutorUtil.softShutdown(name, e, 1, TimeUnit.MINUTES);
- scheduledExecutors.remove(name);
}
}
}
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
index 05519c3a49..f6c5dd5a29 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
@@ -33,7 +33,7 @@ public class FIFOScheduler extends AbstractScheduler {
FIFOScheduler(String name) {
super(name);
this.executor = Executors.newSingleThreadExecutor(
- new SchedulerThreadFactory("FIFOScheduler-" + name + "-Worker-"));
+ new NamedThreadFactory("FIFOScheduler-" + name + "-Worker"));
}
@Override
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
index 44014ad8c3..b0ed600f45 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
@@ -38,8 +38,9 @@ import java.util.Map;
* Changing/adding/deleting non transitive field name need consideration of
that.
*/
public abstract class Job<T> {
+
private static final Logger LOGGER = LoggerFactory.getLogger(Job.class);
- private static SimpleDateFormat JOB_DATE_FORMAT = new
SimpleDateFormat("yyyyMMdd-HHmmss");
+ private static final String DATE_FORMAT = "yyyyMMdd-HHmmss";
/**
* Job status.
@@ -93,15 +94,15 @@ public abstract class Job<T> {
private transient volatile Throwable exception;
private transient JobListener listener;
- public Job(String jobName, JobListener listener) {
+ protected Job(String jobName, JobListener listener) {
this.jobName = jobName;
this.listener = listener;
dateCreated = new Date();
- id = JOB_DATE_FORMAT.format(dateCreated) + "_" + jobName;
+ id = new SimpleDateFormat(DATE_FORMAT).format(dateCreated) + "_" + jobName;
setStatus(Status.READY);
}
- public Job(String jobId, String jobName, JobListener listener) {
+ protected Job(String jobId, String jobName, JobListener listener) {
this.jobName = jobName;
this.listener = listener;
dateCreated = new Date();
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerThreadFactory.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/NamedThreadFactory.java
similarity index 74%
rename from
zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerThreadFactory.java
rename to
zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/NamedThreadFactory.java
index fe0711e685..cd53160a7b 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerThreadFactory.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/NamedThreadFactory.java
@@ -21,19 +21,18 @@ package org.apache.zeppelin.scheduler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
-public class SchedulerThreadFactory implements ThreadFactory {
+public class NamedThreadFactory implements ThreadFactory {
- private String namePrefix;
- private AtomicLong count = new AtomicLong(1);
+ private final String name;
+ private final AtomicLong count = new AtomicLong(1);
- public SchedulerThreadFactory(String namePrefix) {
- this.namePrefix = namePrefix;
+ public NamedThreadFactory(String name) {
+ this.name = name;
}
@Override
public Thread newThread(Runnable r) {
- Thread thread = new Thread(r);
- thread.setName(namePrefix + count.getAndIncrement());
- return thread;
+ return new Thread(r, name + "-" + count.getAndIncrement());
+
}
}
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
index a58e349f90..9f0c27cccc 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
@@ -22,22 +22,18 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.zeppelin.util.ExecutorUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Parallel scheduler runs submitted job concurrently.
*/
public class ParallelScheduler extends AbstractScheduler {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ParallelScheduler.class);
-
private ExecutorService executor;
ParallelScheduler(String name, int maxConcurrency) {
super(name);
this.executor = Executors.newFixedThreadPool(maxConcurrency,
- new SchedulerThreadFactory("ParallelScheduler-Worker-"));
+ new NamedThreadFactory("ParallelScheduler-Worker"));
}
@Override
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
index 34609b4916..e3213467fc 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
@@ -69,38 +69,35 @@ public class SchedulerFactory {
ExecutorUtil.softShutdown("SchedulerFactoryExecutor", executor, 60,
TimeUnit.SECONDS);
}
- public Scheduler createOrGetFIFOScheduler(String name) {
+ public Scheduler createOrGetFIFOScheduler(final String name) {
synchronized (schedulers) {
- if (!schedulers.containsKey(name)) {
- LOGGER.info("Create FIFOScheduler: {}", name);
- FIFOScheduler s = new FIFOScheduler(name);
- schedulers.put(name, s);
+ return schedulers.computeIfAbsent(name, k -> {
+ LOGGER.info("Create FIFOScheduler: {}", k);
+ FIFOScheduler s = new FIFOScheduler(k);
executor.execute(s);
- }
- return schedulers.get(name);
+ return s;
+ });
}
}
- public Scheduler createOrGetParallelScheduler(String name, int
maxConcurrency) {
+ public Scheduler createOrGetParallelScheduler(final String name, final int
maxConcurrency) {
synchronized (schedulers) {
- if (!schedulers.containsKey(name)) {
- LOGGER.info("Create ParallelScheduler: {} with maxConcurrency: {}",
name, maxConcurrency);
- ParallelScheduler s = new ParallelScheduler(name, maxConcurrency);
- schedulers.put(name, s);
+ return schedulers.computeIfAbsent(name, k -> {
+ LOGGER.info("Create ParallelScheduler: {} with maxConcurrency: {}", k,
maxConcurrency);
+ ParallelScheduler s = new ParallelScheduler(k, maxConcurrency);
executor.execute(s);
- }
- return schedulers.get(name);
+ return s;
+ });
}
}
- public Scheduler createOrGetScheduler(Scheduler scheduler) {
+ public Scheduler createOrGetScheduler(final Scheduler scheduler) {
synchronized (schedulers) {
- if (!schedulers.containsKey(scheduler.getName())) {
- schedulers.put(scheduler.getName(), scheduler);
+ return schedulers.computeIfAbsent(scheduler.getName(), k -> {
executor.execute(scheduler);
- }
- return schedulers.get(scheduler.getName());
+ return scheduler;
+ });
}
}
diff --git
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
index c1b5076167..8f2c16c074 100644
---
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
+++
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
@@ -150,7 +150,7 @@ public class ManagedInterpreterGroup extends
InterpreterGroup {
try {
if (Boolean.parseBoolean(
interpreter.getProperty("zeppelin.interpreter.close.cancel_job",
"true"))) {
- for (final Job job : scheduler.getAllJobs()) {
+ for (final Job<?> job : scheduler.getAllJobs()) {
if (!job.isTerminated()) {
job.abort();
job.setStatus(Job.Status.ABORT);
diff --git
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/YarnAppMonitor.java
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/YarnAppMonitor.java
index 56dc7ea912..48956b337b 100644
---
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/YarnAppMonitor.java
+++
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/YarnAppMonitor.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
-import org.apache.zeppelin.scheduler.SchedulerThreadFactory;
+import org.apache.zeppelin.scheduler.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,7 +65,7 @@ public class YarnAppMonitor {
yarnConf.set("yarn.timeline-service.enabled", "false");
yarnClient.init(yarnConf);
yarnClient.start();
- this.executor = Executors.newSingleThreadScheduledExecutor(new
SchedulerThreadFactory("YarnAppsMonitor-Thread"));
+ this.executor = Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory("YarnAppsMonitor-Thread"));
this.apps = new ConcurrentHashMap<>();
this.executor.scheduleAtFixedRate(() -> {
try {
diff --git
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java
index dabc5dde40..a565b02ec3 100644
---
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java
+++
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java
@@ -18,7 +18,7 @@
package org.apache.zeppelin.notebook;
import org.apache.zeppelin.scheduler.Job;
-import org.apache.zeppelin.scheduler.SchedulerThreadFactory;
+import org.apache.zeppelin.scheduler.NamedThreadFactory;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.util.ExecutorUtil;
import org.slf4j.Logger;
@@ -43,7 +43,7 @@ public abstract class NoteEventAsyncListener implements
NoteEventListener, Close
protected NoteEventAsyncListener(String name) {
this.name = name;
executor = new ThreadPoolExecutor(0, 1, 1, TimeUnit.MINUTES,
- new LinkedBlockingQueue<>(), new SchedulerThreadFactory(name));
+ new LinkedBlockingQueue<>(), new NamedThreadFactory(name));
}
public abstract void handleNoteCreateEvent(NoteCreateEvent noteCreateEvent);
diff --git
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index 33d468931b..1f98c716f1 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -57,7 +57,7 @@ import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl;
import
org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl.Revision;
import org.apache.zeppelin.scheduler.Job;
-import org.apache.zeppelin.scheduler.SchedulerThreadFactory;
+import org.apache.zeppelin.scheduler.NamedThreadFactory;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.user.Credentials;
import org.apache.zeppelin.util.ExecutorUtil;
@@ -138,7 +138,7 @@ public class Notebook {
public void initNotebook() {
if (initExecutor == null || initExecutor.isShutdown() ||
initExecutor.isTerminated()) {
initExecutor = new ThreadPoolExecutor(0,
Runtime.getRuntime().availableProcessors(), 1, TimeUnit.MINUTES,
- new LinkedBlockingQueue<>(), new
SchedulerThreadFactory("NotebookInit"));
+ new LinkedBlockingQueue<>(), new
NamedThreadFactory("NotebookInit"));
}
for (NoteInfo noteInfo : getNotesInfo()) {
initExecutor.execute(() -> {
diff --git
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
index 6836eb33d1..e5807877f9 100644
---
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
+++
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
@@ -43,7 +43,7 @@ public class RemoteScheduler extends AbstractScheduler {
RemoteInterpreter remoteInterpreter) {
super(name);
this.executor =
- Executors.newSingleThreadExecutor(new SchedulerThreadFactory("FIFO-" +
name + "-"));
+ Executors.newSingleThreadExecutor(new NamedThreadFactory("FIFO-" +
name));
this.remoteInterpreter = remoteInterpreter;
}