This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 951da42 [ZEPPELIN-4878]. Unable to run flink 1.10.1 in yarn mode due to FLINK-17788 951da42 is described below commit 951da42941b6e0e6bdbb8628160b08564b033955 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Tue Jun 16 13:38:52 2020 +0800 [ZEPPELIN-4878]. Unable to run flink 1.10.1 in yarn mode due to FLINK-17788 ### What is this PR for? Flink interpreter doesn't work with flink 1.10.1 in yarn mode due to FLINK-17788, this PR fix it in zeppelin side. Because in scala shell, flink only support yarn session mode, so when it is yarn mode, we can set deployment target to be yarn-session directly. ### What type of PR is it? [Bug Fix ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4878 ### How should this be tested? * CI pass and manually tested ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3795 from zjffdu/ZEPPELIN-4878 and squashes the following commits: 498325cc8 [Jeff Zhang] use flink1.10.version & flink1.11.version fb1a761b5 [Jeff Zhang] [ZEPPELIN-4878]. Unable to run flink 1.10.1 in yarn mode due to FLINK-17788 --- flink/flink1.10-shims/pom.xml | 2 +- flink/flink1.11-shims/pom.xml | 2 +- flink/interpreter/pom.xml | 7 +++---- .../scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala | 5 +++++ flink/pom.xml | 5 +++++ 5 files changed, 15 insertions(+), 6 deletions(-) diff --git a/flink/flink1.10-shims/pom.xml b/flink/flink1.10-shims/pom.xml index 8a60436..b1a448e 100644 --- a/flink/flink1.10-shims/pom.xml +++ b/flink/flink1.10-shims/pom.xml @@ -33,7 +33,7 @@ <name>Zeppelin: Flink1.10 Shims</name> <properties> - <flink.version>1.10.0</flink.version> + <flink.version>${flink1.10.version}</flink.version> <scala.binary.version>2.11</scala.binary.version> <scala.version>2.11.12</scala.version> </properties> diff --git a/flink/flink1.11-shims/pom.xml b/flink/flink1.11-shims/pom.xml index 458e560..43260ec 100644 --- a/flink/flink1.11-shims/pom.xml +++ b/flink/flink1.11-shims/pom.xml @@ -33,7 +33,7 @@ <name>Zeppelin: Flink1.11 Shims</name> <properties> - <flink.version>1.11-SNAPSHOT</flink.version> + <flink.version>${flink1.11.version}</flink.version> <scala.binary.version>2.11</scala.binary.version> <scala.version>2.11.12</scala.version> </properties> diff --git a/flink/interpreter/pom.xml b/flink/interpreter/pom.xml index bb991a3..bc6704a 100644 --- a/flink/interpreter/pom.xml +++ b/flink/interpreter/pom.xml @@ -37,8 +37,7 @@ <properties> <!--library versions--> <interpreter.name>flink</interpreter.name> -<!-- <flink.version>1.11-SNAPSHOT</flink.version>--> - <flink.version>1.10.0</flink.version> + <flink.version>${flink1.10.version}</flink.version> <flink.hadoop.version>2.6.5</flink.hadoop.version> <hive.version>2.3.4</hive.version> <hiverunner.version>4.0.0</hiverunner.version> @@ -876,14 +875,14 @@ <profile> <id>flink-1.10</id> <properties> - <flink.version>1.10.0</flink.version> + <flink.version>${flink1.10.version}</flink.version> </properties> </profile> <profile> <id>flink-1.11</id> <properties> - <flink.version>1.11-SNAPSHOT</flink.version> + <flink.version>${flink1.11.version}</flink.version> </properties> </profile> diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala index aaedc82..3d12597 100644 --- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala +++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala @@ -47,6 +47,7 @@ import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, Tabl import org.apache.flink.table.module.ModuleManager import org.apache.flink.table.module.hive.HiveModule import org.apache.flink.yarn.cli.FlinkYarnSessionCli +import org.apache.flink.yarn.executors.YarnSessionClusterExecutor import org.apache.zeppelin.flink.util.DependencyUtils import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion import org.apache.zeppelin.interpreter.util.InterpreterOutputStream @@ -223,6 +224,10 @@ class FlinkScalaInterpreter(val properties: Properties) { .copy(port = Some(Integer.parseInt(port))) } + if (config.executionMode == ExecutionMode.YARN) { + // workaround for FLINK-17788, otherwise it won't work with flink 1.10.1 which has been released. + configuration.set(DeploymentOptions.TARGET, YarnSessionClusterExecutor.NAME) + } config } diff --git a/flink/pom.xml b/flink/pom.xml index e197e22..d15e748 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -41,6 +41,11 @@ <module>flink1.11-shims</module> </modules> + <properties> + <flink1.10.version>1.10.1</flink1.10.version> + <flink1.11.version>1.11-SNAPSHOT</flink1.11.version> + </properties> + <dependencies> <dependency>