This is an automated email from the ASF dual-hosted git repository. jongyoul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new eaa9200cf0 Use wildcard generic for job and remove generic for JobListener (#4592) eaa9200cf0 is described below commit eaa9200cf05e0ee040afd705d4099d7cfbfb1bc1 Author: Philipp Dallig <philipp.dal...@gmail.com> AuthorDate: Fri May 5 15:36:51 2023 +0200 Use wildcard generic for job and remove generic for JobListener (#4592) --- .../apache/zeppelin/kotlin/KotlinInterpreter.java | 6 +- .../remote/RemoteInterpreterServer.java | 4 +- .../zeppelin/scheduler/AbstractScheduler.java | 22 +++--- .../apache/zeppelin/scheduler/FIFOScheduler.java | 2 +- .../java/org/apache/zeppelin/scheduler/Job.java | 10 ++- .../org/apache/zeppelin/scheduler/JobListener.java | 9 ++- .../zeppelin/scheduler/JobProgressPoller.java | 4 +- .../zeppelin/scheduler/JobWithProgressPoller.java | 1 - .../zeppelin/scheduler/ParallelScheduler.java | 2 +- .../org/apache/zeppelin/scheduler/Scheduler.java | 8 +-- .../org/apache/zeppelin/socket/NotebookServer.java | 83 ++++++++++++---------- .../org/apache/zeppelin/notebook/Paragraph.java | 5 +- .../zeppelin/notebook/ParagraphJobListener.java | 2 +- .../apache/zeppelin/scheduler/RemoteScheduler.java | 14 ++-- .../org/apache/zeppelin/notebook/NotebookTest.java | 4 +- 15 files changed, 96 insertions(+), 80 deletions(-) diff --git a/kotlin/src/main/java/org/apache/zeppelin/kotlin/KotlinInterpreter.java b/kotlin/src/main/java/org/apache/zeppelin/kotlin/KotlinInterpreter.java index 8276f5fbbf..26af7eef6c 100644 --- a/kotlin/src/main/java/org/apache/zeppelin/kotlin/KotlinInterpreter.java +++ b/kotlin/src/main/java/org/apache/zeppelin/kotlin/KotlinInterpreter.java @@ -141,9 +141,9 @@ public class KotlinInterpreter extends Interpreter { } private Job<?> getRunningJob(String paragraphId) { - Job foundJob = null; - Collection<Job> jobsRunning = getScheduler().getAllJobs(); - for (Job job : jobsRunning) { + Job<?> foundJob = null; + Collection<Job<?>> jobsRunning = getScheduler().getAllJobs(); + for (Job<?> job : jobsRunning) { if (job.getId().equals(paragraphId)) { foundJob = job; } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 1388347f57..dad3074c51 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -755,11 +755,11 @@ public class RemoteInterpreterServer extends Thread class InterpretJobListener implements JobListener { @Override - public void onProgressUpdate(Job job, int progress) { + public void onProgressUpdate(Job<?> job, int progress) { } @Override - public void onStatusChange(Job job, Status before, Status after) { + public void onStatusChange(Job<?> job, Status before, Status after) { synchronized (this) { notifyAll(); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java index 0f95088df3..ad21c3c63f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java @@ -38,8 +38,8 @@ public abstract class AbstractScheduler implements Scheduler { protected final String name; protected volatile boolean terminate = false; - protected BlockingQueue<Job> queue = new LinkedBlockingQueue<>(); - protected Map<String, Job> jobs = new ConcurrentHashMap<>(); + protected BlockingQueue<Job<?>> queue = new LinkedBlockingQueue<>(); + protected Map<String, Job<?>> jobs = new ConcurrentHashMap<>(); private Thread schedulerThread; public AbstractScheduler(String name) { @@ -52,17 +52,17 @@ public abstract class AbstractScheduler implements Scheduler { } @Override - public List<Job> getAllJobs() { + public List<Job<?>> getAllJobs() { return new ArrayList<>(jobs.values()); } @Override - public Job getJob(String jobId) { + public Job<?> getJob(String jobId) { return jobs.get(jobId); } @Override - public void submit(Job job) { + public void submit(Job<?> job) { job.setStatus(Job.Status.PENDING); try { queue.put(job); @@ -74,8 +74,8 @@ public abstract class AbstractScheduler implements Scheduler { } @Override - public Job cancel(String jobId) { - Job job = jobs.remove(jobId); + public Job<?> cancel(String jobId) { + Job<?> job = jobs.remove(jobId); job.abort(); return job; } @@ -84,7 +84,7 @@ public abstract class AbstractScheduler implements Scheduler { public void run() { schedulerThread = Thread.currentThread(); while (!terminate && !schedulerThread.isInterrupted()) { - Job runningJob = null; + Job<?> runningJob = null; try { runningJob = queue.take(); } catch (InterruptedException e) { @@ -99,12 +99,12 @@ public abstract class AbstractScheduler implements Scheduler { stop(); } - public abstract void runJobInScheduler(Job job); + public abstract void runJobInScheduler(Job<?> job); @Override public void stop() { terminate = true; - for (Job job : queue) { + for (Job<?> job : queue) { job.aborted = true; job.jobAbort(); } @@ -119,7 +119,7 @@ public abstract class AbstractScheduler implements Scheduler { * * @param runningJob */ - protected void runJob(Job runningJob) { + protected void runJob(Job<?> runningJob) { if (runningJob.isAborted()) { LOGGER.info("Job {} is aborted", runningJob.getId()); runningJob.setStatus(Job.Status.ABORT); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java index 3448636ee9..05519c3a49 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java @@ -37,7 +37,7 @@ public class FIFOScheduler extends AbstractScheduler { } @Override - public void runJobInScheduler(final Job job) { + public void runJobInScheduler(final Job<?> job) { // run job in the SingleThreadExecutor since this is FIFO. executor.execute(() -> runJob(job)); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java index 55d76d8e94..44014ad8c3 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java @@ -123,8 +123,14 @@ public abstract class Job<T> { } @Override - public boolean equals(Object o) { - return ((Job) o).id.equals(id); + public boolean equals(Object obj) { + if (obj == null) + return false; + + if (!(obj instanceof Job)) + return false; + + return ((Job<?>) obj).id.equals(id); } public Status getStatus() { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobListener.java index dba20040a6..cdfc201fda 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobListener.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobListener.java @@ -17,11 +17,14 @@ package org.apache.zeppelin.scheduler; +import org.apache.zeppelin.scheduler.Job.Status; + /** * Listener for job execution. */ -public interface JobListener<T extends Job> { - void onProgressUpdate(T job, int progress); +public interface JobListener { + void onProgressUpdate(Job<?> job, int progress); + + void onStatusChange(Job<?> job, Status before, Status after); - void onStatusChange(T job, Job.Status before, Job.Status after); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java index 3d6ce12f48..e7ff7cd675 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java @@ -32,10 +32,10 @@ public class JobProgressPoller extends Thread { public static final long DEFAULT_INTERVAL_MSEC = 500; private static final Logger logger = LoggerFactory.getLogger(JobProgressPoller.class); - private Job job; + private Job<?> job; private long intervalMs; - public JobProgressPoller(Job job, long intervalMs) { + public JobProgressPoller(Job<?> job, long intervalMs) { super("JobProgressPoller, jobId=" + job.getId()); this.job = job; if (intervalMs < 0) { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobWithProgressPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobWithProgressPoller.java index a10432f758..668985d92b 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobWithProgressPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobWithProgressPoller.java @@ -17,7 +17,6 @@ package org.apache.zeppelin.scheduler; - public abstract class JobWithProgressPoller<T> extends Job<T> { private transient JobProgressPoller progressPoller; diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java index 979435a9e7..a58e349f90 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java @@ -41,7 +41,7 @@ public class ParallelScheduler extends AbstractScheduler { } @Override - public void runJobInScheduler(final Job runningJob) { + public void runJobInScheduler(final Job<?> runningJob) { // submit this job to a FixedThreadPool so that at most maxConcurrencyJobs running executor.execute(() -> runJob(runningJob)); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java index 820495bb4f..16aaedcb93 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java @@ -31,13 +31,13 @@ public interface Scheduler extends Runnable { String getName(); - List<Job> getAllJobs(); + List<Job<?>> getAllJobs(); - Job getJob(String jobId); + Job<?> getJob(String jobId); - void submit(Job job); + void submit(Job<?> job); - Job cancel(String jobId); + Job<?> cancel(String jobId); void stop(); diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 6ef9cb293c..ef3cff10ca 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -86,6 +86,7 @@ import org.apache.zeppelin.notebook.Paragraph; import org.apache.zeppelin.notebook.ParagraphJobListener; import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl.Revision; import org.apache.zeppelin.rest.exception.ForbiddenException; +import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.service.ConfigurationService; import org.apache.zeppelin.service.JobManagerService; @@ -2046,56 +2047,64 @@ public class NotebookServer implements AngularObjectRegistryListener, } @Override - public void onProgressUpdate(Paragraph p, int progress) { - if (!sendParagraphStatusToFrontend) { - return; + public void onProgressUpdate(Job<?> job, int progress) { + if (job instanceof Paragraph) { + final Paragraph p = (Paragraph) job; + if (!sendParagraphStatusToFrontend) { + return; + } + connectionManager.broadcast(p.getNote().getId(), + new Message(OP.PROGRESS).put("id", p.getId()).put("progress", progress)); } - connectionManager.broadcast(p.getNote().getId(), - new Message(OP.PROGRESS).put("id", p.getId()).put("progress", progress)); } @Override - public void onStatusChange(Paragraph p, Status before, Status after) { - if (after == Status.ERROR) { - if (p.getException() != null) { - LOG.error("Error", p.getException()); + public void onStatusChange(Job<?> job, Status before, Status after) { + if (job instanceof Paragraph) { + final Paragraph p = (Paragraph) job; + + if (after == Status.ERROR) { + if (p.getException() != null) { + LOG.error("Error", p.getException()); + } } - } - if (p.isTerminated() || after == Status.RUNNING) { - if (p.getStatus() == Status.FINISHED) { - LOG.info("Job {} is finished successfully, status: {}", p.getId(), p.getStatus()); - } else if (p.isTerminated()) { - LOG.warn("Job {} is finished, status: {}, exception: {}, result: {}", p.getId(), - p.getStatus(), p.getException(), p.getReturn()); - } else { - LOG.info("Job {} starts to RUNNING", p.getId()); + if (p.isTerminated() || after == Status.RUNNING) { + if (p.getStatus() == Status.FINISHED) { + LOG.info("Job {} is finished successfully, status: {}", p.getId(), p.getStatus()); + } else if (p.isTerminated()) { + LOG.warn("Job {} is finished, status: {}, exception: {}, result: {}", p.getId(), + p.getStatus(), p.getException(), p.getReturn()); + } else { + LOG.info("Job {} starts to RUNNING", p.getId()); + } + + try { + String noteId = p.getNote().getId(); + getNotebook().processNote(noteId, + note -> { + if (note == null) { + LOG.warn("Note {} doesn't existed.", noteId); + return null; + } else { + getNotebook().saveNote(p.getNote(), p.getAuthenticationInfo()); + } + return null; + }); + } catch (IOException e) { + LOG.error(e.toString(), e); + } } + p.setStatusToUserParagraph(p.getStatus()); + broadcastParagraph(p.getNote(), p, MSG_ID_NOT_DEFINED); try { - String noteId = p.getNote().getId(); - getNotebook().processNote(noteId, - note -> { - if (note == null) { - LOG.warn("Note {} doesn't existed.", noteId); - return null; - } else { - getNotebook().saveNote(p.getNote(), p.getAuthenticationInfo()); - } - return null; - }); + broadcastUpdateNoteJobInfo(p.getNote(), System.currentTimeMillis() - 5000); } catch (IOException e) { - LOG.error(e.toString(), e); + LOG.error("can not broadcast for job manager", e); } } - p.setStatusToUserParagraph(p.getStatus()); - broadcastParagraph(p.getNote(), p, MSG_ID_NOT_DEFINED); - try { - broadcastUpdateNoteJobInfo(p.getNote(), System.currentTimeMillis() - 5000); - } catch (IOException e) { - LOG.error("can not broadcast for job manager", e); - } } @Override diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index e88111ff53..1faa9d42ce 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -53,7 +53,6 @@ import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.resource.ResourcePool; import org.apache.zeppelin.scheduler.Job; -import org.apache.zeppelin.scheduler.JobListener; import org.apache.zeppelin.scheduler.JobWithProgressPoller; import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.user.Credentials; @@ -105,12 +104,12 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen super(generateId(), null); } - public Paragraph(String paragraphId, Note note, JobListener listener) { + public Paragraph(String paragraphId, Note note, ParagraphJobListener listener) { super(paragraphId, generateId(), listener); this.note = note; } - public Paragraph(Note note, JobListener listener) { + public Paragraph(Note note, ParagraphJobListener listener) { super(generateId(), listener); this.note = note; } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java index d0c99f37f1..de71165def 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java @@ -23,7 +23,7 @@ import org.apache.zeppelin.scheduler.JobListener; /** * Listener for Paragraph Job. */ -public interface ParagraphJobListener extends JobListener<Paragraph> { +public interface ParagraphJobListener extends JobListener { //TODO(savalek) Temporary solution. Need to refactor cron to be able to notify frontend directly. void noteRunningStatusChange(String noteId, boolean newStatus); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java index 8c068f9f60..ac1e237dd9 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java @@ -48,7 +48,7 @@ public class RemoteScheduler extends AbstractScheduler { } @Override - public void runJobInScheduler(Job job) { + public void runJobInScheduler(Job<?> job) { JobRunner jobRunner = new JobRunner(this, job); executor.execute(jobRunner); String executionMode = @@ -92,10 +92,10 @@ public class RemoteScheduler extends AbstractScheduler { private final long checkIntervalMsec; private final AtomicBoolean terminate; private final JobListener listener; - private final Job job; + private final Job<?> job; private volatile Status lastStatus; - public JobStatusPoller(Job job, + public JobStatusPoller(Job<?> job, JobListener listener, long checkIntervalMsec) { setName("JobStatusPoller-" + job.getId()); @@ -160,11 +160,11 @@ public class RemoteScheduler extends AbstractScheduler { private class JobRunner implements Runnable, JobListener { private final Logger logger = LoggerFactory.getLogger(JobRunner.class); private final RemoteScheduler scheduler; - private final Job job; + private final Job<?> job; private volatile boolean jobExecuted; private volatile boolean jobSubmittedRemotely; - public JobRunner(RemoteScheduler scheduler, Job job) { + public JobRunner(RemoteScheduler scheduler, Job<?> job) { this.scheduler = scheduler; this.job = job; jobExecuted = false; @@ -197,12 +197,12 @@ public class RemoteScheduler extends AbstractScheduler { } @Override - public void onProgressUpdate(Job job, int progress) { + public void onProgressUpdate(Job<?> job, int progress) { } // Call by JobStatusPoller thread, update status when JobStatusPoller get new status. @Override - public void onStatusChange(Job job, Status before, Status after) { + public void onStatusChange(Job<?> job, Status before, Status after) { if (!job.equals(this.job)) { logger.error("StatusChange for an unkown job. {} != {}", this.job.getId(), job.getId()); return; diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java index 5869f3b5dd..b53d780ebb 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java @@ -1893,11 +1893,11 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo } @Override - public void onProgressUpdate(Paragraph paragraph, int progress) { + public void onProgressUpdate(Job<?> paragraph, int progress) { } @Override - public void onStatusChange(Paragraph paragraph, Status before, Status after) { + public void onStatusChange(Job<?> paragraph, Status before, Status after) { if (afterStatusChangedListener != null) { afterStatusChangedListener.onStatusChanged(paragraph, before, after); }