This is an automated email from the ASF dual-hosted git repository.

jongyoul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new eaa9200cf0 Use wildcard generic for job and remove generic for 
JobListener (#4592)
eaa9200cf0 is described below

commit eaa9200cf05e0ee040afd705d4099d7cfbfb1bc1
Author: Philipp Dallig <philipp.dal...@gmail.com>
AuthorDate: Fri May 5 15:36:51 2023 +0200

    Use wildcard generic for job and remove generic for JobListener (#4592)
---
 .../apache/zeppelin/kotlin/KotlinInterpreter.java  |  6 +-
 .../remote/RemoteInterpreterServer.java            |  4 +-
 .../zeppelin/scheduler/AbstractScheduler.java      | 22 +++---
 .../apache/zeppelin/scheduler/FIFOScheduler.java   |  2 +-
 .../java/org/apache/zeppelin/scheduler/Job.java    | 10 ++-
 .../org/apache/zeppelin/scheduler/JobListener.java |  9 ++-
 .../zeppelin/scheduler/JobProgressPoller.java      |  4 +-
 .../zeppelin/scheduler/JobWithProgressPoller.java  |  1 -
 .../zeppelin/scheduler/ParallelScheduler.java      |  2 +-
 .../org/apache/zeppelin/scheduler/Scheduler.java   |  8 +--
 .../org/apache/zeppelin/socket/NotebookServer.java | 83 ++++++++++++----------
 .../org/apache/zeppelin/notebook/Paragraph.java    |  5 +-
 .../zeppelin/notebook/ParagraphJobListener.java    |  2 +-
 .../apache/zeppelin/scheduler/RemoteScheduler.java | 14 ++--
 .../org/apache/zeppelin/notebook/NotebookTest.java |  4 +-
 15 files changed, 96 insertions(+), 80 deletions(-)

diff --git 
a/kotlin/src/main/java/org/apache/zeppelin/kotlin/KotlinInterpreter.java 
b/kotlin/src/main/java/org/apache/zeppelin/kotlin/KotlinInterpreter.java
index 8276f5fbbf..26af7eef6c 100644
--- a/kotlin/src/main/java/org/apache/zeppelin/kotlin/KotlinInterpreter.java
+++ b/kotlin/src/main/java/org/apache/zeppelin/kotlin/KotlinInterpreter.java
@@ -141,9 +141,9 @@ public class KotlinInterpreter extends Interpreter {
   }
 
   private Job<?> getRunningJob(String paragraphId) {
-    Job foundJob = null;
-    Collection<Job> jobsRunning = getScheduler().getAllJobs();
-    for (Job job : jobsRunning) {
+    Job<?> foundJob = null;
+    Collection<Job<?>> jobsRunning = getScheduler().getAllJobs();
+    for (Job<?> job : jobsRunning) {
       if (job.getId().equals(paragraphId)) {
         foundJob = job;
       }
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 1388347f57..dad3074c51 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -755,11 +755,11 @@ public class RemoteInterpreterServer extends Thread
   class InterpretJobListener implements JobListener {
 
     @Override
-    public void onProgressUpdate(Job job, int progress) {
+    public void onProgressUpdate(Job<?> job, int progress) {
     }
 
     @Override
-    public void onStatusChange(Job job, Status before, Status after) {
+    public void onStatusChange(Job<?> job, Status before, Status after) {
       synchronized (this) {
         notifyAll();
       }
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
index 0f95088df3..ad21c3c63f 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
@@ -38,8 +38,8 @@ public abstract class AbstractScheduler implements Scheduler {
 
   protected final String name;
   protected volatile boolean terminate = false;
-  protected BlockingQueue<Job> queue = new LinkedBlockingQueue<>();
-  protected Map<String, Job> jobs = new ConcurrentHashMap<>();
+  protected BlockingQueue<Job<?>> queue = new LinkedBlockingQueue<>();
+  protected Map<String, Job<?>> jobs = new ConcurrentHashMap<>();
   private Thread schedulerThread;
 
   public AbstractScheduler(String name) {
@@ -52,17 +52,17 @@ public abstract class AbstractScheduler implements 
Scheduler {
   }
 
   @Override
-  public List<Job> getAllJobs() {
+  public List<Job<?>> getAllJobs() {
     return new ArrayList<>(jobs.values());
   }
 
   @Override
-  public Job getJob(String jobId) {
+  public Job<?> getJob(String jobId) {
     return jobs.get(jobId);
   }
 
   @Override
-  public void submit(Job job) {
+  public void submit(Job<?> job) {
     job.setStatus(Job.Status.PENDING);
     try {
       queue.put(job);
@@ -74,8 +74,8 @@ public abstract class AbstractScheduler implements Scheduler {
   }
 
   @Override
-  public Job cancel(String jobId) {
-    Job job = jobs.remove(jobId);
+  public Job<?> cancel(String jobId) {
+    Job<?> job = jobs.remove(jobId);
     job.abort();
     return job;
   }
@@ -84,7 +84,7 @@ public abstract class AbstractScheduler implements Scheduler {
   public void run() {
     schedulerThread = Thread.currentThread();
     while (!terminate && !schedulerThread.isInterrupted()) {
-      Job runningJob = null;
+      Job<?> runningJob = null;
       try {
         runningJob = queue.take();
       } catch (InterruptedException e) {
@@ -99,12 +99,12 @@ public abstract class AbstractScheduler implements 
Scheduler {
     stop();
   }
 
-  public abstract void runJobInScheduler(Job job);
+  public abstract void runJobInScheduler(Job<?> job);
 
   @Override
   public void stop() {
     terminate = true;
-    for (Job job : queue) {
+    for (Job<?> job : queue) {
       job.aborted = true;
       job.jobAbort();
     }
@@ -119,7 +119,7 @@ public abstract class AbstractScheduler implements 
Scheduler {
    *
    * @param runningJob
    */
-  protected void runJob(Job runningJob) {
+  protected void runJob(Job<?> runningJob) {
     if (runningJob.isAborted()) {
       LOGGER.info("Job {} is aborted", runningJob.getId());
       runningJob.setStatus(Job.Status.ABORT);
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
index 3448636ee9..05519c3a49 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
@@ -37,7 +37,7 @@ public class FIFOScheduler extends AbstractScheduler {
   }
 
   @Override
-  public void runJobInScheduler(final Job job) {
+  public void runJobInScheduler(final Job<?> job) {
     // run job in the SingleThreadExecutor since this is FIFO.
     executor.execute(() -> runJob(job));
   }
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
index 55d76d8e94..44014ad8c3 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
@@ -123,8 +123,14 @@ public abstract class Job<T> {
   }
 
   @Override
-  public boolean equals(Object o) {
-    return ((Job) o).id.equals(id);
+  public boolean equals(Object obj) {
+    if (obj == null)
+      return false;
+
+    if (!(obj instanceof Job))
+      return false;
+
+    return ((Job<?>) obj).id.equals(id);
   }
 
   public Status getStatus() {
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobListener.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobListener.java
index dba20040a6..cdfc201fda 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobListener.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobListener.java
@@ -17,11 +17,14 @@
 
 package org.apache.zeppelin.scheduler;
 
+import org.apache.zeppelin.scheduler.Job.Status;
+
 /**
  * Listener for job execution.
  */
-public interface JobListener<T extends Job> {
-  void onProgressUpdate(T job, int progress);
+public interface JobListener {
+  void onProgressUpdate(Job<?> job, int progress);
+
+  void onStatusChange(Job<?> job, Status before, Status after);
 
-  void onStatusChange(T job, Job.Status before, Job.Status after);
 }
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java
index 3d6ce12f48..e7ff7cd675 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java
@@ -32,10 +32,10 @@ public class JobProgressPoller extends Thread {
   public static final long DEFAULT_INTERVAL_MSEC = 500;
   private static final Logger logger = 
LoggerFactory.getLogger(JobProgressPoller.class);
 
-  private Job job;
+  private Job<?> job;
   private long intervalMs;
 
-  public JobProgressPoller(Job job, long intervalMs) {
+  public JobProgressPoller(Job<?> job, long intervalMs) {
     super("JobProgressPoller, jobId=" + job.getId());
     this.job = job;
     if (intervalMs < 0) {
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobWithProgressPoller.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobWithProgressPoller.java
index a10432f758..668985d92b 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobWithProgressPoller.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobWithProgressPoller.java
@@ -17,7 +17,6 @@
 
 package org.apache.zeppelin.scheduler;
 
-
 public abstract class JobWithProgressPoller<T> extends Job<T> {
 
   private transient JobProgressPoller progressPoller;
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
index 979435a9e7..a58e349f90 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ParallelScheduler.java
@@ -41,7 +41,7 @@ public class ParallelScheduler extends AbstractScheduler {
   }
 
   @Override
-  public void runJobInScheduler(final Job runningJob) {
+  public void runJobInScheduler(final Job<?> runningJob) {
     // submit this job to a FixedThreadPool so that at most maxConcurrencyJobs 
running
     executor.execute(() -> runJob(runningJob));
   }
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java
index 820495bb4f..16aaedcb93 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Scheduler.java
@@ -31,13 +31,13 @@ public interface Scheduler extends Runnable {
 
   String getName();
 
-  List<Job> getAllJobs();
+  List<Job<?>> getAllJobs();
 
-  Job getJob(String jobId);
+  Job<?> getJob(String jobId);
 
-  void submit(Job job);
+  void submit(Job<?> job);
 
-  Job cancel(String jobId);
+  Job<?> cancel(String jobId);
 
   void stop();
 
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java 
b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 6ef9cb293c..ef3cff10ca 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -86,6 +86,7 @@ import org.apache.zeppelin.notebook.Paragraph;
 import org.apache.zeppelin.notebook.ParagraphJobListener;
 import 
org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl.Revision;
 import org.apache.zeppelin.rest.exception.ForbiddenException;
+import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.service.ConfigurationService;
 import org.apache.zeppelin.service.JobManagerService;
@@ -2046,56 +2047,64 @@ public class NotebookServer implements 
AngularObjectRegistryListener,
   }
 
   @Override
-  public void onProgressUpdate(Paragraph p, int progress) {
-    if (!sendParagraphStatusToFrontend) {
-      return;
+  public void onProgressUpdate(Job<?> job, int progress) {
+    if (job instanceof Paragraph) {
+      final Paragraph p = (Paragraph) job;
+      if (!sendParagraphStatusToFrontend) {
+        return;
+      }
+      connectionManager.broadcast(p.getNote().getId(),
+          new Message(OP.PROGRESS).put("id", p.getId()).put("progress", 
progress));
     }
-    connectionManager.broadcast(p.getNote().getId(),
-        new Message(OP.PROGRESS).put("id", p.getId()).put("progress", 
progress));
   }
 
   @Override
-  public void onStatusChange(Paragraph p, Status before, Status after) {
-    if (after == Status.ERROR) {
-      if (p.getException() != null) {
-        LOG.error("Error", p.getException());
+  public void onStatusChange(Job<?> job, Status before, Status after) {
+    if (job instanceof Paragraph) {
+      final Paragraph p = (Paragraph) job;
+
+      if (after == Status.ERROR) {
+        if (p.getException() != null) {
+          LOG.error("Error", p.getException());
+        }
       }
-    }
 
-    if (p.isTerminated() || after == Status.RUNNING) {
-      if (p.getStatus() == Status.FINISHED) {
-        LOG.info("Job {} is finished successfully, status: {}", p.getId(), 
p.getStatus());
-      } else if (p.isTerminated()) {
-        LOG.warn("Job {} is finished, status: {}, exception: {}, result: {}", 
p.getId(),
-            p.getStatus(), p.getException(), p.getReturn());
-      } else {
-        LOG.info("Job {} starts to RUNNING", p.getId());
+      if (p.isTerminated() || after == Status.RUNNING) {
+        if (p.getStatus() == Status.FINISHED) {
+          LOG.info("Job {} is finished successfully, status: {}", p.getId(), 
p.getStatus());
+        } else if (p.isTerminated()) {
+          LOG.warn("Job {} is finished, status: {}, exception: {}, result: 
{}", p.getId(),
+              p.getStatus(), p.getException(), p.getReturn());
+        } else {
+          LOG.info("Job {} starts to RUNNING", p.getId());
+        }
+
+        try {
+          String noteId = p.getNote().getId();
+          getNotebook().processNote(noteId,
+              note -> {
+                if (note == null) {
+                  LOG.warn("Note {} doesn't existed.", noteId);
+                  return null;
+                } else {
+                  getNotebook().saveNote(p.getNote(), 
p.getAuthenticationInfo());
+                }
+                return null;
+              });
+        } catch (IOException e) {
+          LOG.error(e.toString(), e);
+        }
       }
 
+      p.setStatusToUserParagraph(p.getStatus());
+      broadcastParagraph(p.getNote(), p, MSG_ID_NOT_DEFINED);
       try {
-        String noteId = p.getNote().getId();
-        getNotebook().processNote(noteId,
-          note -> {
-            if (note == null) {
-              LOG.warn("Note {} doesn't existed.", noteId);
-              return null;
-            } else {
-              getNotebook().saveNote(p.getNote(), p.getAuthenticationInfo());
-            }
-            return null;
-          });
+        broadcastUpdateNoteJobInfo(p.getNote(), System.currentTimeMillis() - 
5000);
       } catch (IOException e) {
-        LOG.error(e.toString(), e);
+        LOG.error("can not broadcast for job manager", e);
       }
     }
 
-    p.setStatusToUserParagraph(p.getStatus());
-    broadcastParagraph(p.getNote(), p, MSG_ID_NOT_DEFINED);
-    try {
-      broadcastUpdateNoteJobInfo(p.getNote(), System.currentTimeMillis() - 
5000);
-    } catch (IOException e) {
-      LOG.error("can not broadcast for job manager", e);
-    }
   }
 
   @Override
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index e88111ff53..1faa9d42ce 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -53,7 +53,6 @@ import 
org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.resource.ResourcePool;
 import org.apache.zeppelin.scheduler.Job;
-import org.apache.zeppelin.scheduler.JobListener;
 import org.apache.zeppelin.scheduler.JobWithProgressPoller;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.apache.zeppelin.user.Credentials;
@@ -105,12 +104,12 @@ public class Paragraph extends 
JobWithProgressPoller<InterpreterResult> implemen
     super(generateId(), null);
   }
 
-  public Paragraph(String paragraphId, Note note, JobListener listener) {
+  public Paragraph(String paragraphId, Note note, ParagraphJobListener 
listener) {
     super(paragraphId, generateId(), listener);
     this.note = note;
   }
 
-  public Paragraph(Note note, JobListener listener) {
+  public Paragraph(Note note, ParagraphJobListener listener) {
     super(generateId(), listener);
     this.note = note;
   }
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java
index d0c99f37f1..de71165def 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphJobListener.java
@@ -23,7 +23,7 @@ import org.apache.zeppelin.scheduler.JobListener;
 /**
  * Listener for Paragraph Job.
  */
-public interface ParagraphJobListener extends JobListener<Paragraph> {
+public interface ParagraphJobListener extends JobListener {
   //TODO(savalek) Temporary solution. Need to refactor cron to be able to 
notify frontend directly.
   void noteRunningStatusChange(String noteId, boolean newStatus);
 }
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
index 8c068f9f60..ac1e237dd9 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
@@ -48,7 +48,7 @@ public class RemoteScheduler extends AbstractScheduler {
   }
 
   @Override
-  public void runJobInScheduler(Job job) {
+  public void runJobInScheduler(Job<?> job) {
     JobRunner jobRunner = new JobRunner(this, job);
     executor.execute(jobRunner);
     String executionMode =
@@ -92,10 +92,10 @@ public class RemoteScheduler extends AbstractScheduler {
     private final long checkIntervalMsec;
     private final AtomicBoolean terminate;
     private final JobListener listener;
-    private final Job job;
+    private final Job<?> job;
     private volatile Status lastStatus;
 
-    public JobStatusPoller(Job job,
+    public JobStatusPoller(Job<?> job,
                            JobListener listener,
                            long checkIntervalMsec) {
       setName("JobStatusPoller-" + job.getId());
@@ -160,11 +160,11 @@ public class RemoteScheduler extends AbstractScheduler {
   private class JobRunner implements Runnable, JobListener {
     private final Logger logger = LoggerFactory.getLogger(JobRunner.class);
     private final RemoteScheduler scheduler;
-    private final Job job;
+    private final Job<?> job;
     private volatile boolean jobExecuted;
     private volatile boolean jobSubmittedRemotely;
 
-    public JobRunner(RemoteScheduler scheduler, Job job) {
+    public JobRunner(RemoteScheduler scheduler, Job<?> job) {
       this.scheduler = scheduler;
       this.job = job;
       jobExecuted = false;
@@ -197,12 +197,12 @@ public class RemoteScheduler extends AbstractScheduler {
     }
 
     @Override
-    public void onProgressUpdate(Job job, int progress) {
+    public void onProgressUpdate(Job<?> job, int progress) {
     }
 
     // Call by JobStatusPoller thread, update status when JobStatusPoller get 
new status.
     @Override
-    public void onStatusChange(Job job, Status before, Status after) {
+    public void onStatusChange(Job<?> job, Status before, Status after) {
       if (!job.equals(this.job)) {
         logger.error("StatusChange for an unkown job. {} != {}", 
this.job.getId(), job.getId());
         return;
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
index 5869f3b5dd..b53d780ebb 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
@@ -1893,11 +1893,11 @@ public class NotebookTest extends 
AbstractInterpreterTest implements ParagraphJo
   }
 
   @Override
-  public void onProgressUpdate(Paragraph paragraph, int progress) {
+  public void onProgressUpdate(Job<?> paragraph, int progress) {
   }
 
   @Override
-  public void onStatusChange(Paragraph paragraph, Status before, Status after) 
{
+  public void onStatusChange(Job<?> paragraph, Status before, Status after) {
     if (afterStatusChangedListener != null) {
       afterStatusChangedListener.onStatusChanged(paragraph, before, after);
     }

Reply via email to