This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 99e23b1  [ZEPPELIN-4873]. Display rich duration info for insert into 
flink job
99e23b1 is described below

commit 99e23b181a1219498c98194ad711cbafa5c8c187
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Mon Jun 15 15:39:16 2020 +0800

    [ZEPPELIN-4873]. Display rich duration info for insert into flink job
    
    Trivial PR which display rich duration info instead of just x seconds. See 
screenshot below.
    
    [ Improvement ]
    
    * [ ] - Task
    
    * https://issues.apache.org/jira/browse/ZEPPELIN-4873
    
    * CI pass
    
    
![image](https://user-images.githubusercontent.com/164491/84286308-19291b80-ab71-11ea-96ef-b237d2463b8c.png)
    
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #3794 from zjffdu/ZEPPELIN-4873 and squashes the following commits:
    
    459f181c5 [Jeff Zhang] add java doc
    419818e4a [Jeff Zhang] address comment
    9691eae82 [Jeff Zhang] [ZEPPELIN-4873]. Display rich duration info for 
insert into flink job
    
    (cherry picked from commit 3139ed6adbbb97246dd4c36ee1505755ed2df0fc)
    Signed-off-by: Jeff Zhang <zjf...@apache.org>
---
 .../java/org/apache/zeppelin/flink/JobManager.java | 65 +++++++++++++++++-----
 .../org/apache/zeppelin/flink/JobManagerTest.java  | 43 ++++++++++++++
 2 files changed, 93 insertions(+), 15 deletions(-)

diff --git 
a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java 
b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
index ff8e7a7..f2ce9dd 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
@@ -29,9 +29,11 @@ import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class JobManager {
@@ -110,7 +112,7 @@ public class JobManager {
   }
 
   public void cancelJob(InterpreterContext context) throws 
InterpreterException {
-    LOGGER.info("Canceling job associated of paragraph: "+ 
context.getParagraphId());
+    LOGGER.info("Canceling job associated of paragraph: {}", 
context.getParagraphId());
     JobClient jobClient = this.jobs.get(context.getParagraphId());
     if (jobClient == null) {
       LOGGER.warn("Unable to remove Job from paragraph {} as no job associated 
to this paragraph",
@@ -178,8 +180,12 @@ public class JobManager {
     @Override
     public void run() {
       while (!Thread.currentThread().isInterrupted() && running.get()) {
+        JsonNode rootNode = null;
         try {
-          JsonNode rootNode = Unirest.get(flinkWebUI + "/jobs/" + 
jobId.toString())
+          synchronized (running) {
+            running.wait(1000);
+          }
+          rootNode = Unirest.get(flinkWebUI + "/jobs/" + jobId.toString())
                   .asJson().getBody();
           JSONArray vertices = rootNode.getObject().getJSONArray("vertices");
           int totalTasks = 0;
@@ -199,13 +205,12 @@ public class JobManager {
           if (jobState.equalsIgnoreCase("finished")) {
             break;
           }
-          synchronized (running) {
-            running.wait(1000);
-          }
+          long duration = rootNode.getObject().getLong("duration") / 1000;
+
           if (isStreamingInsertInto) {
             if (isFirstPoll) {
               StringBuilder builder = new StringBuilder("%angular ");
-              builder.append("<h1>Duration: {{duration}} seconds");
+              builder.append("<h1>Duration: {{duration}} </h1>");
               builder.append("\n%text ");
               context.out.clear(false);
               context.out.write(builder.toString());
@@ -213,7 +218,7 @@ public class JobManager {
               isFirstPoll = false;
             }
             context.getAngularObjectRegistry().add("duration",
-                    rootNode.getObject().getLong("duration") / 1000,
+                    toRichTimeDuration(duration),
                     context.getNoteId(),
                     context.getParagraphId());
           }
@@ -223,15 +228,45 @@ public class JobManager {
       }
     }
 
-      public void cancel () {
-        this.running.set(false);
-        synchronized (running) {
-          running.notify();
-        }
+    public void cancel() {
+      this.running.set(false);
+      synchronized (running) {
+        running.notify();
       }
+    }
 
-      public int getProgress () {
-        return progress;
-      }
+    public int getProgress() {
+      return progress;
+    }
+  }
+
+  /**
+   * Convert duration in seconds to rich time duration format. e.g. 2 days 3 
hours 4 minutes 5 seconds
+   *
+   * @param duration in second
+   * @return
+   */
+  static String toRichTimeDuration(long duration) {
+    long days = TimeUnit.SECONDS.toDays(duration);
+    duration -= TimeUnit.DAYS.toSeconds(days);
+    long hours = TimeUnit.SECONDS.toHours(duration);
+    duration -= TimeUnit.HOURS.toSeconds(hours);
+    long minutes = TimeUnit.SECONDS.toMinutes(duration);
+    duration -= TimeUnit.MINUTES.toSeconds(minutes);
+    long seconds = TimeUnit.SECONDS.toSeconds(duration);
+
+    StringBuilder builder = new StringBuilder();
+    if (days != 0) {
+      builder.append(days + " days ");
+    }
+    if (days != 0 || hours != 0) {
+      builder.append(hours + " hours ");
     }
+    if (days != 0 || hours != 0 || minutes != 0) {
+      builder.append(minutes + " minutes ");
+    }
+    builder.append(seconds + " seconds");
+    return builder.toString();
   }
+
+}
diff --git 
a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JobManagerTest.java 
b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JobManagerTest.java
new file mode 100644
index 0000000..fe196e9
--- /dev/null
+++ 
b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JobManagerTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class JobManagerTest {
+
+  @Test
+  public void testRichDuration() {
+    String richDuration = JobManager.toRichTimeDuration(18);
+    assertEquals("18 seconds", richDuration);
+
+    richDuration = JobManager.toRichTimeDuration(120);
+    assertEquals("2 minutes 0 seconds", richDuration);
+
+    richDuration = JobManager.toRichTimeDuration(60 * 60 + 1);
+    assertEquals("1 hours 0 minutes 1 seconds", richDuration);
+
+    richDuration = JobManager.toRichTimeDuration(60 * 60 + 60 + 1);
+    assertEquals("1 hours 1 minutes 1 seconds", richDuration);
+
+    richDuration = JobManager.toRichTimeDuration(24 * 60 * 60 + 60 + 1);
+    assertEquals("1 days 0 hours 1 minutes 1 seconds", richDuration);
+  }
+}

Reply via email to