This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 34561b5 [ZEPPELIN-5415] Fix flink.webui.yarn.useProxy not working on yarn-application mode 34561b5 is described below commit 34561b5485c6b73dc635073f46be37cd1c547c20 Author: jiabao.sun <jiabao....@xtransfer.cn> AuthorDate: Mon Jun 21 10:39:03 2021 +0800 [ZEPPELIN-5415] Fix flink.webui.yarn.useProxy not working on yarn-application mode ### What is this PR for? Fix flink.webui.yarn.useProxy not working on yarn-application mode ### What type of PR is it? [Bug Fix] ### Todos * [ ] - Task ### What is the Jira issue? * [ZEPPELIN-5415] ### How should this be tested? * Strongly recommended: add automated unit tests for any new or changed behavior * Outline any manual steps to test the PR here. ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? * Is there breaking changes for older versions? * Does this needs documentation? Author: jiabao.sun <jiabao....@xtransfer.cn> Closes #4142 from Jiabao-Sun/fix-use-proxy-of-yarn-application and squashes the following commits: 37b37eca20 [jiabao.sun] Return None if yarn address is null or empty. 3bd2508aae [jiabao.sun] reduce duplicate code da41ed444b [jiabao.sun] Fix flink.webui.yarn.useProxy not works on yarn-application mode --- .../zeppelin/flink/FlinkScalaInterpreter.scala | 32 ++++++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala index e0e97ba..9a5645c 100644 --- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala +++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala @@ -298,13 +298,9 @@ abstract class FlinkScalaInterpreter(val properties: Properties, this.jmWebUrl = clusterClient.getWebInterfaceURL } else if (mode == ExecutionMode.YARN) { LOGGER.info("Starting FlinkCluster in yarn mode") - if (properties.getProperty("flink.webui.yarn.useProxy", "false").toBoolean) { + if (isYarnUseProxy()) { this.jmWebUrl = HadoopUtils.getYarnAppTrackingUrl(clusterClient) - // for some cloud vender, the yarn address may be mapped to some other address. - val yarnAddress = properties.getProperty("flink.webui.yarn.address") - if (!StringUtils.isBlank(yarnAddress)) { - this.displayedJMWebUrl = FlinkScalaInterpreter.replaceYarnAddress(this.jmWebUrl, yarnAddress) - } + this.displayedJMWebUrl = getJmWebUrlUnderProxy().getOrElse(this.jmWebUrl) } else { this.jmWebUrl = clusterClient.getWebInterfaceURL } @@ -314,10 +310,14 @@ abstract class FlinkScalaInterpreter(val properties: Properties, case None => // remote mode if (mode == ExecutionMode.YARN_APPLICATION) { - val yarnAppId = System.getenv("_APP_ID"); + val yarnAppId = System.getenv("_APP_ID") LOGGER.info("Use FlinkCluster in yarn application mode, appId: {}", yarnAppId) - this.jmWebUrl = "http://localhost:" + HadoopUtils.getFlinkRestPort(yarnAppId) - this.displayedJMWebUrl = HadoopUtils.getYarnAppTrackingUrl(yarnAppId) + if (isYarnUseProxy()) { + this.jmWebUrl = HadoopUtils.getYarnAppTrackingUrl(yarnAppId) + this.displayedJMWebUrl = getJmWebUrlUnderProxy().getOrElse(this.jmWebUrl) + } else { + this.jmWebUrl = "http://localhost:" + HadoopUtils.getFlinkRestPort(yarnAppId) + } } else { LOGGER.info("Use FlinkCluster in remote mode") this.jmWebUrl = "http://" + config.host.get + ":" + config.port.get @@ -843,6 +843,20 @@ abstract class FlinkScalaInterpreter(val properties: Properties, }) } + private def isYarnUseProxy(): Boolean = { + properties.getProperty("flink.webui.yarn.useProxy", "false").toBoolean + } + + private def getJmWebUrlUnderProxy(): Option[String] = { + // for some cloud vender, the yarn address may be mapped to some other address. + val yarnAddress = properties.getProperty("flink.webui.yarn.address") + if (StringUtils.isNotBlank(yarnAddress)) { + Some(FlinkScalaInterpreter.replaceYarnAddress(this.jmWebUrl, yarnAddress)) + } else { + None + } + } + def getJobManager = this.jobManager def getFlinkScalaShellLoader: ClassLoader = {