This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push: new 99e23b1 [ZEPPELIN-4873]. Display rich duration info for insert into flink job 99e23b1 is described below commit 99e23b181a1219498c98194ad711cbafa5c8c187 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Mon Jun 15 15:39:16 2020 +0800 [ZEPPELIN-4873]. Display rich duration info for insert into flink job Trivial PR which display rich duration info instead of just x seconds. See screenshot below. [ Improvement ] * [ ] - Task * https://issues.apache.org/jira/browse/ZEPPELIN-4873 * CI pass  * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3794 from zjffdu/ZEPPELIN-4873 and squashes the following commits: 459f181c5 [Jeff Zhang] add java doc 419818e4a [Jeff Zhang] address comment 9691eae82 [Jeff Zhang] [ZEPPELIN-4873]. Display rich duration info for insert into flink job (cherry picked from commit 3139ed6adbbb97246dd4c36ee1505755ed2df0fc) Signed-off-by: Jeff Zhang <zjf...@apache.org> --- .../java/org/apache/zeppelin/flink/JobManager.java | 65 +++++++++++++++++----- .../org/apache/zeppelin/flink/JobManagerTest.java | 43 ++++++++++++++ 2 files changed, 93 insertions(+), 15 deletions(-) diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java index ff8e7a7..f2ce9dd 100644 --- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java @@ -29,9 +29,11 @@ import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; public class JobManager { @@ -110,7 +112,7 @@ public class JobManager { } public void cancelJob(InterpreterContext context) throws InterpreterException { - LOGGER.info("Canceling job associated of paragraph: "+ context.getParagraphId()); + LOGGER.info("Canceling job associated of paragraph: {}", context.getParagraphId()); JobClient jobClient = this.jobs.get(context.getParagraphId()); if (jobClient == null) { LOGGER.warn("Unable to remove Job from paragraph {} as no job associated to this paragraph", @@ -178,8 +180,12 @@ public class JobManager { @Override public void run() { while (!Thread.currentThread().isInterrupted() && running.get()) { + JsonNode rootNode = null; try { - JsonNode rootNode = Unirest.get(flinkWebUI + "/jobs/" + jobId.toString()) + synchronized (running) { + running.wait(1000); + } + rootNode = Unirest.get(flinkWebUI + "/jobs/" + jobId.toString()) .asJson().getBody(); JSONArray vertices = rootNode.getObject().getJSONArray("vertices"); int totalTasks = 0; @@ -199,13 +205,12 @@ public class JobManager { if (jobState.equalsIgnoreCase("finished")) { break; } - synchronized (running) { - running.wait(1000); - } + long duration = rootNode.getObject().getLong("duration") / 1000; + if (isStreamingInsertInto) { if (isFirstPoll) { StringBuilder builder = new StringBuilder("%angular "); - builder.append("<h1>Duration: {{duration}} seconds"); + builder.append("<h1>Duration: {{duration}} </h1>"); builder.append("\n%text "); context.out.clear(false); context.out.write(builder.toString()); @@ -213,7 +218,7 @@ public class JobManager { isFirstPoll = false; } context.getAngularObjectRegistry().add("duration", - rootNode.getObject().getLong("duration") / 1000, + toRichTimeDuration(duration), context.getNoteId(), context.getParagraphId()); } @@ -223,15 +228,45 @@ public class JobManager { } } - public void cancel () { - this.running.set(false); - synchronized (running) { - running.notify(); - } + public void cancel() { + this.running.set(false); + synchronized (running) { + running.notify(); } + } - public int getProgress () { - return progress; - } + public int getProgress() { + return progress; + } + } + + /** + * Convert duration in seconds to rich time duration format. e.g. 2 days 3 hours 4 minutes 5 seconds + * + * @param duration in second + * @return + */ + static String toRichTimeDuration(long duration) { + long days = TimeUnit.SECONDS.toDays(duration); + duration -= TimeUnit.DAYS.toSeconds(days); + long hours = TimeUnit.SECONDS.toHours(duration); + duration -= TimeUnit.HOURS.toSeconds(hours); + long minutes = TimeUnit.SECONDS.toMinutes(duration); + duration -= TimeUnit.MINUTES.toSeconds(minutes); + long seconds = TimeUnit.SECONDS.toSeconds(duration); + + StringBuilder builder = new StringBuilder(); + if (days != 0) { + builder.append(days + " days "); + } + if (days != 0 || hours != 0) { + builder.append(hours + " hours "); } + if (days != 0 || hours != 0 || minutes != 0) { + builder.append(minutes + " minutes "); + } + builder.append(seconds + " seconds"); + return builder.toString(); } + +} diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JobManagerTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JobManagerTest.java new file mode 100644 index 0000000..fe196e9 --- /dev/null +++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JobManagerTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class JobManagerTest { + + @Test + public void testRichDuration() { + String richDuration = JobManager.toRichTimeDuration(18); + assertEquals("18 seconds", richDuration); + + richDuration = JobManager.toRichTimeDuration(120); + assertEquals("2 minutes 0 seconds", richDuration); + + richDuration = JobManager.toRichTimeDuration(60 * 60 + 1); + assertEquals("1 hours 0 minutes 1 seconds", richDuration); + + richDuration = JobManager.toRichTimeDuration(60 * 60 + 60 + 1); + assertEquals("1 hours 1 minutes 1 seconds", richDuration); + + richDuration = JobManager.toRichTimeDuration(24 * 60 * 60 + 60 + 1); + assertEquals("1 days 0 hours 1 minutes 1 seconds", richDuration); + } +}