This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 174892d [ZEPPELIN-4922]. Use original flink web url for job progress polling 174892d is described below commit 174892d86471d9ca05885597bc6ceaf5abe96eee Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Sun Jun 28 10:44:58 2020 +0800 [ZEPPELIN-4922]. Use original flink web url for job progress polling ### What is this PR for? For flink job progress polling, we should still use the original web url. Otherwise the replaced flink job url won't work. ### What type of PR is it? [Bug Fix] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4922 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3826 from zjffdu/ZEPPELIN-4922 and squashes the following commits: 9228433bf [Jeff Zhang] [ZEPPELIN-4922]. Use original flink web url for job progress polling --- .../java/org/apache/zeppelin/flink/JobManager.java | 27 ++++++++++++++-------- .../zeppelin/flink/FlinkScalaInterpreter.scala | 5 ++-- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java index 914d8a6..0c553ba 100644 --- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java @@ -29,7 +29,6 @@ import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -44,18 +43,21 @@ public class JobManager { private ConcurrentHashMap<JobID, FlinkJobProgressPoller> jobProgressPollerMap = new ConcurrentHashMap<>(); private FlinkZeppelinContext z; - private String flinkWebUI; + private String flinkWebUrl; + private String replacedFlinkWebUrl; public JobManager(FlinkZeppelinContext z, - String flinkWebUI) { + String flinkWebUrl, + String replacedFlinkWebUrl) { this.z = z; - this.flinkWebUI = flinkWebUI; + this.flinkWebUrl = flinkWebUrl; + this.replacedFlinkWebUrl = replacedFlinkWebUrl; } public void addJob(InterpreterContext context, JobClient jobClient) { String paragraphId = context.getParagraphId(); JobClient previousJobClient = this.jobs.put(paragraphId, jobClient); - FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUI, jobClient.getJobID(), context); + FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUrl, jobClient.getJobID(), context); thread.setName("JobProgressPoller-Thread-" + paragraphId); thread.start(); this.jobProgressPollerMap.put(jobClient.getJobID(), thread); @@ -82,7 +84,12 @@ public class JobManager { public void sendFlinkJobUrl(InterpreterContext context) { JobClient jobClient = jobs.get(context.getParagraphId()); if (jobClient != null) { - String jobUrl = flinkWebUI + "#/job/" + jobClient.getJobID(); + String jobUrl = null; + if (replacedFlinkWebUrl != null) { + jobUrl = replacedFlinkWebUrl + "#/job/" + jobClient.getJobID(); + } else { + jobUrl = flinkWebUrl + "#/job/" + jobClient.getJobID(); + } Map<String, String> infos = new HashMap<>(); infos.put("jobUrl", jobUrl); infos.put("label", "FLINK JOB"); @@ -162,7 +169,7 @@ public class JobManager { class FlinkJobProgressPoller extends Thread { - private String flinkWebUI; + private String flinkWebUrl; private JobID jobId; private InterpreterContext context; private boolean isStreamingInsertInto; @@ -170,8 +177,8 @@ public class JobManager { private AtomicBoolean running = new AtomicBoolean(true); private boolean isFirstPoll = true; - FlinkJobProgressPoller(String flinkWebUI, JobID jobId, InterpreterContext context) { - this.flinkWebUI = flinkWebUI; + FlinkJobProgressPoller(String flinkWebUrl, JobID jobId, InterpreterContext context) { + this.flinkWebUrl = flinkWebUrl; this.jobId = jobId; this.context = context; this.isStreamingInsertInto = context.getLocalProperties().containsKey("flink.streaming.insert_into"); @@ -186,7 +193,7 @@ public class JobManager { synchronized (running) { running.wait(1000); } - rootNode = Unirest.get(flinkWebUI + "/jobs/" + jobId.toString()) + rootNode = Unirest.get(flinkWebUrl + "/jobs/" + jobId.toString()) .asJson().getBody(); JSONArray vertices = rootNode.getObject().getJSONArray("vertices"); int totalTasks = 0; diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala index ec87206..2f544b4 100644 --- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala +++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala @@ -102,6 +102,7 @@ class FlinkScalaInterpreter(val properties: Properties) { private var flinkVersion: FlinkVersion = _ private var flinkShims: FlinkShims = _ private var jmWebUrl: String = _ + private var replacedJMWebUrl: String = _ private var jobManager: JobManager = _ private var defaultParallelism = 1 private var defaultSqlParallelism = 1 @@ -120,7 +121,7 @@ class FlinkScalaInterpreter(val properties: Properties) { modifiers.add("@transient") this.bind("z", z.getClass().getCanonicalName(), z, modifiers); - this.jobManager = new JobManager(this.z, jmWebUrl) + this.jobManager = new JobManager(this.z, jmWebUrl, replacedJMWebUrl) // register JobListener val jobListener = new FlinkJobListener() @@ -268,7 +269,7 @@ class FlinkScalaInterpreter(val properties: Properties) { // for some cloud vender, the yarn address may be mapped to some other address. val yarnAddress = properties.getProperty("flink.webui.yarn.address") if (!StringUtils.isBlank(yarnAddress)) { - this.jmWebUrl = replaceYarnAddress(this.jmWebUrl, yarnAddress) + this.replacedJMWebUrl = replaceYarnAddress(this.jmWebUrl, yarnAddress) } } else { this.jmWebUrl = clusterClient.getWebInterfaceURL