This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 8deaaef  [ZEPPELIN-4851]. Add property to flink interpreter to allow 
replace the yarn address
8deaaef is described below

commit 8deaaef07e400149513bf55777fcf81a8862b27a
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Thu Jun 11 11:55:54 2020 +0800

    [ZEPPELIN-4851]. Add property to flink interpreter to allow replace the 
yarn address
    
    ### What is this PR for?
    
    Trivial PR which add property `flink.webui.yarn.yarnAddress` that allow 
user to replace the yarn address. This is for some cases that cloud vender 
would map the internal resource manager address to other url. e.g. some cloud 
vender will map `http://resource-manager:8088` to 
`https://xxx-yarn.yy.cn/gateway/kkk/yarn`
    
    ### What type of PR is it?
    [Improvement]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4851
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? no
    * Is there breaking changes for older versions? no
    * Does this needs documentation? no
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #3797 from zjffdu/ZEPPELIN-4851 and squashes the following commits:
    
    c1be76217 [Jeff Zhang] [ZEPPELIN-4851]. Add property to flink interpreter 
to allow replace the yarn address
    
    (cherry picked from commit 2b505dcd2f1e880408bba4ea64dd4cca56924fe5)
    Signed-off-by: Jeff Zhang <zjf...@apache.org>
---
 docs/interpreter/flink.md                          |  7 +++-
 flink/interpreter/pom.xml                          | 13 +++++++
 .../src/main/resources/interpreter-setting.json    |  7 ++++
 .../zeppelin/flink/FlinkScalaInterpreter.scala     | 12 +++++++
 .../zeppelin/flink/FlinkScalaInterpreterTest.scala | 42 ++++++++++++++++++++++
 5 files changed, 80 insertions(+), 1 deletion(-)

diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md
index 23973a8..dc79969 100644
--- a/docs/interpreter/flink.md
+++ b/docs/interpreter/flink.md
@@ -139,7 +139,12 @@ You can also set other flink properties which are not 
listed in the table. For a
   <tr>
     <td>flink.webui.yarn.useProxy</td>
     <td>false</td>
-    <td>whether use yarn proxy url as flink weburl, e.g. 
http://localhost:8088/proxy/application_1583396598068_0004</td>
+    <td>whether use yarn proxy url as flink weburl, e.g. 
http://resource-manager:8088/proxy/application_1583396598068_0004</td>
+  </tr>
+  <tr>
+    <td>flink.webui.yarn.yarnAddress</td>
+    <td></td>
+    <td>Set this value only when your yarn address is mapped to some other 
address, e.g. some cloud vender will map `http://resource-manager:8088` to 
`https://xxx-yarn.yy.cn/gateway/kkk/yarn`</td>
   </tr>
   <tr>
     <td>flink.udf.jars</td>
diff --git a/flink/interpreter/pom.xml b/flink/interpreter/pom.xml
index bc6704a..3adc45b 100644
--- a/flink/interpreter/pom.xml
+++ b/flink/interpreter/pom.xml
@@ -625,6 +625,13 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_2.11</artifactId>
+      <version>3.0.8</version>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <build>
@@ -723,6 +730,12 @@
         </configuration>
       </plugin>
 
+      <!-- Scalatest runs all Scala tests -->
+      <plugin>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+      </plugin>
+
       <!-- Eclipse Integration -->
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
diff --git a/flink/interpreter/src/main/resources/interpreter-setting.json 
b/flink/interpreter/src/main/resources/interpreter-setting.json
index cba12c3..eea51a4 100644
--- a/flink/interpreter/src/main/resources/interpreter-setting.json
+++ b/flink/interpreter/src/main/resources/interpreter-setting.json
@@ -96,6 +96,13 @@
         "description": "Whether use yarn proxy url as flink weburl, e.g. 
http://localhost:8088/proxy/application_1583396598068_0004";,
         "type": "checkbox"
       },
+      "flink.webui.yarn.yarnAddress": {
+        "envName": null,
+        "propertyName": null,
+        "defaultValue": "",
+        "description": "Set this value only when your yarn address is mapped 
to some other address, e.g. some cloud vender will map 
`http://resource-manager:8088` to `https://xxx-yarn.yy.cn/gateway/kkk/yarn`";,
+        "type": "checkbox"
+      },
       "flink.udf.jars": {
         "envName": null,
         "propertyName": null,
diff --git 
a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
 
b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 3d12597..2431d01 100644
--- 
a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ 
b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -24,6 +24,7 @@ import java.nio.file.Files
 import java.util.Properties
 import java.util.concurrent.TimeUnit
 import java.util.jar.JarFile
+import java.util.regex.Pattern
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.commons.lang3.exception.ExceptionUtils
@@ -264,6 +265,11 @@ class FlinkScalaInterpreter(val properties: Properties) {
             LOGGER.info("Starting FlinkCluster in yarn mode")
             if (properties.getProperty("flink.webui.yarn.useProxy", 
"false").toBoolean) {
               this.jmWebUrl = HadoopUtils.getYarnAppTrackingUrl(clusterClient)
+              // for some cloud vender, the yarn address may be mapped to some 
other address.
+              val yarnAddress = 
properties.getProperty("flink.webui.yarn.address")
+              if (!StringUtils.isBlank(yarnAddress)) {
+                this.jmWebUrl = replaceYarnAddress(this.jmWebUrl, yarnAddress)
+              }
             } else {
               this.jmWebUrl = clusterClient.getWebInterfaceURL
             }
@@ -837,6 +843,12 @@ class FlinkScalaInterpreter(val properties: Properties) {
       }
     }
   }
+
+  def replaceYarnAddress(webURL: String, yarnAddress: String): String = {
+    val pattern = "(https?://.*:\\d+)(.*)".r
+    val pattern(prefix, remaining) = webURL
+    yarnAddress + remaining
+  }
 }
 
 
diff --git 
a/flink/interpreter/src/test/scala/org/apache/zeppelin/flink/FlinkScalaInterpreterTest.scala
 
b/flink/interpreter/src/test/scala/org/apache/zeppelin/flink/FlinkScalaInterpreterTest.scala
new file mode 100644
index 0000000..6c084c5
--- /dev/null
+++ 
b/flink/interpreter/src/test/scala/org/apache/zeppelin/flink/FlinkScalaInterpreterTest.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink
+
+
+import java.util.Properties
+
+import org.junit.Assert.assertEquals
+import org.scalatest.FunSuite
+
+class FlinkScalaInterpreterTest extends FunSuite {
+
+  test("testReplaceYarnAddress") {
+    val flinkScalaInterpreter = new FlinkScalaInterpreter(new Properties())
+    var targetURL = 
flinkScalaInterpreter.replaceYarnAddress("http://localhost:8081";,
+      "http://my-server:9090/gateway";)
+    assertEquals("http://my-server:9090/gateway";, targetURL)
+
+    targetURL = 
flinkScalaInterpreter.replaceYarnAddress("https://localhost:8081/";,
+      "https://my-server:9090/gateway";)
+    assertEquals("https://my-server:9090/gateway/";, targetURL)
+
+    targetURL = 
flinkScalaInterpreter.replaceYarnAddress("https://localhost:8081/proxy/app_1";,
+      "https://my-server:9090/gateway";)
+    assertEquals("https://my-server:9090/gateway/proxy/app_1";, targetURL)
+  }
+}

Reply via email to