This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 3139ed6 [ZEPPELIN-4873]. Display rich duration info for insert into flink job 3139ed6 is described below commit 3139ed6adbbb97246dd4c36ee1505755ed2df0fc Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Mon Jun 15 15:39:16 2020 +0800 [ZEPPELIN-4873]. Display rich duration info for insert into flink job ### What is this PR for? Trivial PR which display rich duration info instead of just x seconds. See screenshot below. ### What type of PR is it? [ Improvement ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4873 ### How should this be tested? * CI pass ### Screenshots (if appropriate)  ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3794 from zjffdu/ZEPPELIN-4873 and squashes the following commits: 459f181c5 [Jeff Zhang] add java doc 419818e4a [Jeff Zhang] address comment 9691eae82 [Jeff Zhang] [ZEPPELIN-4873]. Display rich duration info for insert into flink job --- .../java/org/apache/zeppelin/flink/JobManager.java | 63 +++++++++++++++++----- .../org/apache/zeppelin/flink/JobManagerTest.java | 43 +++++++++++++++ 2 files changed, 92 insertions(+), 14 deletions(-) diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java index ccbfe39..914d8a6 100644 --- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java @@ -29,9 +29,11 @@ import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; public class JobManager { @@ -110,7 +112,7 @@ public class JobManager { } public void cancelJob(InterpreterContext context) throws InterpreterException { - LOGGER.info("Canceling job associated of paragraph: "+ context.getParagraphId()); + LOGGER.info("Canceling job associated of paragraph: {}", context.getParagraphId()); JobClient jobClient = this.jobs.get(context.getParagraphId()); if (jobClient == null) { LOGGER.warn("Unable to remove Job from paragraph {} as no job associated to this paragraph", @@ -178,8 +180,12 @@ public class JobManager { @Override public void run() { while (!Thread.currentThread().isInterrupted() && running.get()) { + JsonNode rootNode = null; try { + synchronized (running) { + running.wait(1000); + } rootNode = Unirest.get(flinkWebUI + "/jobs/" + jobId.toString()) .asJson().getBody(); JSONArray vertices = rootNode.getObject().getJSONArray("vertices"); @@ -200,13 +206,12 @@ public class JobManager { if (jobState.equalsIgnoreCase("finished")) { break; } - synchronized (running) { - running.wait(1000); - } + long duration = rootNode.getObject().getLong("duration") / 1000; + if (isStreamingInsertInto) { if (isFirstPoll) { StringBuilder builder = new StringBuilder("%angular "); - builder.append("<h1>Duration: {{duration}} seconds"); + builder.append("<h1>Duration: {{duration}} </h1>"); builder.append("\n%text "); context.out.clear(false); context.out.write(builder.toString()); @@ -214,7 +219,7 @@ public class JobManager { isFirstPoll = false; } context.getAngularObjectRegistry().add("duration", - rootNode.getObject().getLong("duration") / 1000, + toRichTimeDuration(duration), context.getNoteId(), context.getParagraphId()); } @@ -224,15 +229,45 @@ public class JobManager { } } - public void cancel () { - this.running.set(false); - synchronized (running) { - running.notify(); - } + public void cancel() { + this.running.set(false); + synchronized (running) { + running.notify(); } + } - public int getProgress () { - return progress; - } + public int getProgress() { + return progress; } } + + /** + * Convert duration in seconds to rich time duration format. e.g. 2 days 3 hours 4 minutes 5 seconds + * + * @param duration in second + * @return + */ + static String toRichTimeDuration(long duration) { + long days = TimeUnit.SECONDS.toDays(duration); + duration -= TimeUnit.DAYS.toSeconds(days); + long hours = TimeUnit.SECONDS.toHours(duration); + duration -= TimeUnit.HOURS.toSeconds(hours); + long minutes = TimeUnit.SECONDS.toMinutes(duration); + duration -= TimeUnit.MINUTES.toSeconds(minutes); + long seconds = TimeUnit.SECONDS.toSeconds(duration); + + StringBuilder builder = new StringBuilder(); + if (days != 0) { + builder.append(days + " days "); + } + if (days != 0 || hours != 0) { + builder.append(hours + " hours "); + } + if (days != 0 || hours != 0 || minutes != 0) { + builder.append(minutes + " minutes "); + } + builder.append(seconds + " seconds"); + return builder.toString(); + } + +} diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JobManagerTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JobManagerTest.java new file mode 100644 index 0000000..fe196e9 --- /dev/null +++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JobManagerTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class JobManagerTest { + + @Test + public void testRichDuration() { + String richDuration = JobManager.toRichTimeDuration(18); + assertEquals("18 seconds", richDuration); + + richDuration = JobManager.toRichTimeDuration(120); + assertEquals("2 minutes 0 seconds", richDuration); + + richDuration = JobManager.toRichTimeDuration(60 * 60 + 1); + assertEquals("1 hours 0 minutes 1 seconds", richDuration); + + richDuration = JobManager.toRichTimeDuration(60 * 60 + 60 + 1); + assertEquals("1 hours 1 minutes 1 seconds", richDuration); + + richDuration = JobManager.toRichTimeDuration(24 * 60 * 60 + 60 + 1); + assertEquals("1 days 0 hours 1 minutes 1 seconds", richDuration); + } +}