This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push: new deddf75 [ZEPPELIN-4965]. Support flink 1.11.1 deddf75 is described below commit deddf75edcaf1c58b96726939bd1d085c0e803cd Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Wed Jul 22 20:28:28 2020 +0800 [ZEPPELIN-4965]. Support flink 1.11.1 ### What is this PR for? This PR is to support flink 1.11.1. * Update flink 1.11.0 to 1.11.1 * Flink 1.11.1 introduce new api for TableEnvironment, so this pr also update `TableEnvFactory`. * Split `FlinkIntegrationTest` to `FlinkIntegrationTest110` and `FlinkIntegrationTest111` ### What type of PR is it? [Improvement ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4965 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3863 from zjffdu/ZEPPELIN-4965 and squashes the following commits: 64a625b06 [Jeff Zhang] [ZEPPELIN-4965]. Support flink 1.11.1 (cherry picked from commit b523fb9e9eb1c45184a2e1b45fb3ba18e2808c52) Signed-off-by: Jeff Zhang <zjf...@apache.org> --- .travis.yml | 6 +- .../org/apache/zeppelin/flink/TableEnvFactory.java | 310 ++++++++++++++------- .../zeppelin/flink/FlinkScalaInterpreter.scala | 10 +- flink/pom.xml | 2 +- .../zeppelin/integration/FlinkIntegrationTest.java | 11 +- .../integration/FlinkIntegrationTest110.java | 40 +++ .../integration/FlinkIntegrationTest111.java | 40 +++ 7 files changed, 305 insertions(+), 114 deletions(-) diff --git a/.travis.yml b/.travis.yml index a458a70..23c1179 100644 --- a/.travis.yml +++ b/.travis.yml @@ -95,15 +95,15 @@ jobs: dist: xenial env: PYTHON="3" R="true" SCALA_VER="2.10" TENSORFLOW="1.13.1" PROFILE="-Pscala-2.10" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat" MODULES="-pl $(echo .,zeppelin-interpreter,zeppelin-interpreter-shaded,${INTERPRETERS} | sed 's/!//g')" TEST_PROJECTS="" - # Test flink 1.10 + # Test flink 1.10 & flink integration test - jdk: "openjdk8" dist: xenial - env: PYTHON="3" FLINK="1.10.1" PROFILE="-Pflink-1.10" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat" MODULES="-pl flink/interpreter" TEST_PROJECTS="-Dtest=org.apache.zeppelin.flink.*" + env: BUILD_PLUGINS="true" PYTHON="3" FLINK="1.10.1" PROFILE="-Pflink-1.10 -Pintegration" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat" MODULES="-pl flink/interpreter,zeppelin-interpreter-integration" TEST_PROJECTS="-Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest110" # Test flink 1.11 & flink integration test - jdk: "openjdk8" dist: xenial - env: BUILD_PLUGINS="true" PYTHON="3" FLINK="1.11.0" PROFILE="-Pflink-1.11 -Pintegration" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat" MODULES="-pl flink/interpreter,zeppelin-interpreter-integration" TEST_PROJECTS="-Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest" + env: BUILD_PLUGINS="true" PYTHON="3" FLINK="1.11.1" PROFILE="-Pflink-1.11 -Pintegration" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat" MODULES="-pl flink/interpreter,zeppelin-interpreter-integration" TEST_PROJECTS="-Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest111" # Run Spark integration test and unit test diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java index b514f49..9fb6efd 100644 --- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java @@ -105,7 +105,7 @@ public class TableEnvFactory { } } - public TableEnvironment createScalaFlinkStreamTableEnvironment(EnvironmentSettings settings) { + public TableEnvironment createScalaFlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) { try { Map<String, String> executorProperties = settings.toExecutorProperties(); Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv()); @@ -127,24 +127,48 @@ public class TableEnvFactory { clazz = Class .forName("org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl"); } - Constructor constructor = clazz - .getConstructor( - CatalogManager.class, - ModuleManager.class, - FunctionCatalog.class, - TableConfig.class, - org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class, - Planner.class, - Executor.class, - boolean.class); - return (TableEnvironment) constructor.newInstance(catalogManager, - moduleManager, - flinkFunctionCatalog, - tblConfig, - senv, - planner, - executor, - settings.isStreamingMode()); + try { + Constructor constructor = clazz + .getConstructor( + CatalogManager.class, + ModuleManager.class, + FunctionCatalog.class, + TableConfig.class, + org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class, + Planner.class, + Executor.class, + boolean.class); + return (TableEnvironment) constructor.newInstance(catalogManager, + moduleManager, + flinkFunctionCatalog, + tblConfig, + senv, + planner, + executor, + settings.isStreamingMode()); + } catch (NoSuchMethodException e) { + // Flink 1.11.1 change the constructor signature, FLINK-18419 + Constructor constructor = clazz + .getConstructor( + CatalogManager.class, + ModuleManager.class, + FunctionCatalog.class, + TableConfig.class, + org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class, + Planner.class, + Executor.class, + boolean.class, + ClassLoader.class); + return (TableEnvironment) constructor.newInstance(catalogManager, + moduleManager, + flinkFunctionCatalog, + tblConfig, + senv, + planner, + executor, + settings.isStreamingMode(), + classLoader); + } } catch (Exception e) { throw new TableException("Fail to createScalaFlinkStreamTableEnvironment", e); @@ -177,7 +201,7 @@ public class TableEnvFactory { } } - public TableEnvironment createJavaFlinkStreamTableEnvironment(EnvironmentSettings settings) { + public TableEnvironment createJavaFlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) { try { Map<String, String> executorProperties = settings.toExecutorProperties(); @@ -195,31 +219,55 @@ public class TableEnvFactory { clazz = Class .forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl"); } - Constructor constructor = clazz - .getConstructor( - CatalogManager.class, - ModuleManager.class, - FunctionCatalog.class, - TableConfig.class, - org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class, - Planner.class, - Executor.class, - boolean.class); - return (TableEnvironment) constructor.newInstance(catalogManager, - moduleManager, - flinkFunctionCatalog, - tblConfig, - senv.getJavaEnv(), - planner, - executor, - settings.isStreamingMode()); + try { + Constructor constructor = clazz + .getConstructor( + CatalogManager.class, + ModuleManager.class, + FunctionCatalog.class, + TableConfig.class, + org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class, + Planner.class, + Executor.class, + boolean.class); + return (TableEnvironment) constructor.newInstance(catalogManager, + moduleManager, + flinkFunctionCatalog, + tblConfig, + senv.getJavaEnv(), + planner, + executor, + settings.isStreamingMode()); + } catch (NoSuchMethodException e) { + // Flink 1.11.1 change the constructor signature, FLINK-18419 + Constructor constructor = clazz + .getConstructor( + CatalogManager.class, + ModuleManager.class, + FunctionCatalog.class, + TableConfig.class, + org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class, + Planner.class, + Executor.class, + boolean.class, + ClassLoader.class); + return (TableEnvironment) constructor.newInstance(catalogManager, + moduleManager, + flinkFunctionCatalog, + tblConfig, + senv.getJavaEnv(), + planner, + executor, + settings.isStreamingMode(), + classLoader); + } } catch (Exception e) { throw new TableException("Fail to createJavaFlinkStreamTableEnvironment", e); } } - public TableEnvironment createScalaBlinkStreamTableEnvironment(EnvironmentSettings settings) { + public TableEnvironment createScalaBlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) { try { Map<String, String> executorProperties = settings.toExecutorProperties(); @@ -243,30 +291,54 @@ public class TableEnvFactory { clazz = Class .forName("org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl"); } - Constructor constructor = clazz - .getConstructor( - CatalogManager.class, - ModuleManager.class, - FunctionCatalog.class, - TableConfig.class, - org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class, - Planner.class, - Executor.class, - boolean.class); - return (TableEnvironment) constructor.newInstance(catalogManager, - moduleManager, - blinkFunctionCatalog, - tblConfig, - senv, - planner, - executor, - settings.isStreamingMode()); + try { + Constructor constructor = clazz + .getConstructor( + CatalogManager.class, + ModuleManager.class, + FunctionCatalog.class, + TableConfig.class, + org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class, + Planner.class, + Executor.class, + boolean.class); + return (TableEnvironment) constructor.newInstance(catalogManager, + moduleManager, + blinkFunctionCatalog, + tblConfig, + senv, + planner, + executor, + settings.isStreamingMode()); + } catch (NoSuchMethodException e) { + // Flink 1.11.1 change the constructor signature, FLINK-18419 + Constructor constructor = clazz + .getConstructor( + CatalogManager.class, + ModuleManager.class, + FunctionCatalog.class, + TableConfig.class, + org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class, + Planner.class, + Executor.class, + boolean.class, + ClassLoader.class); + return (TableEnvironment) constructor.newInstance(catalogManager, + moduleManager, + blinkFunctionCatalog, + tblConfig, + senv, + planner, + executor, + settings.isStreamingMode(), + classLoader); + } } catch (Exception e) { throw new TableException("Fail to createScalaBlinkStreamTableEnvironment", e); } } - public TableEnvironment createJavaBlinkStreamTableEnvironment(EnvironmentSettings settings) { + public TableEnvironment createJavaBlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) { try { Map<String, String> executorProperties = settings.toExecutorProperties(); @@ -284,31 +356,55 @@ public class TableEnvFactory { clazz = Class .forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl"); } - Constructor constructor = clazz - .getConstructor( - CatalogManager.class, - ModuleManager.class, - FunctionCatalog.class, - TableConfig.class, - org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class, - Planner.class, - Executor.class, - boolean.class); - return (TableEnvironment) constructor.newInstance(catalogManager, - moduleManager, - blinkFunctionCatalog, - tblConfig, - senv.getJavaEnv(), - planner, - executor, - settings.isStreamingMode()); + try { + Constructor constructor = clazz + .getConstructor( + CatalogManager.class, + ModuleManager.class, + FunctionCatalog.class, + TableConfig.class, + org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class, + Planner.class, + Executor.class, + boolean.class); + return (TableEnvironment) constructor.newInstance(catalogManager, + moduleManager, + blinkFunctionCatalog, + tblConfig, + senv.getJavaEnv(), + planner, + executor, + settings.isStreamingMode()); + } catch (NoSuchMethodException e) { + // Flink 1.11.1 change the constructor signature, FLINK-18419 + Constructor constructor = clazz + .getConstructor( + CatalogManager.class, + ModuleManager.class, + FunctionCatalog.class, + TableConfig.class, + org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class, + Planner.class, + Executor.class, + boolean.class, + ClassLoader.class); + return (TableEnvironment) constructor.newInstance(catalogManager, + moduleManager, + blinkFunctionCatalog, + tblConfig, + senv.getJavaEnv(), + planner, + executor, + settings.isStreamingMode(), + classLoader); + } } catch (Exception e) { throw new TableException("Fail to createJavaBlinkStreamTableEnvironment", e); } } public TableEnvironment createJavaBlinkBatchTableEnvironment( - EnvironmentSettings settings) { + EnvironmentSettings settings, ClassLoader classLoader) { try { final Map<String, String> executorProperties = settings.toExecutorProperties(); executor = lookupExecutor(executorProperties, senv.getJavaEnv()); @@ -324,24 +420,48 @@ public class TableEnvFactory { clazz = Class .forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl"); } - Constructor constructor = clazz.getConstructor( - CatalogManager.class, - ModuleManager.class, - FunctionCatalog.class, - TableConfig.class, - StreamExecutionEnvironment.class, - Planner.class, - Executor.class, - boolean.class); - return (TableEnvironment) constructor.newInstance( - catalogManager, - moduleManager, - blinkFunctionCatalog, - tblConfig, - senv.getJavaEnv(), - planner, - executor, - settings.isStreamingMode()); + try { + Constructor constructor = clazz.getConstructor( + CatalogManager.class, + ModuleManager.class, + FunctionCatalog.class, + TableConfig.class, + StreamExecutionEnvironment.class, + Planner.class, + Executor.class, + boolean.class); + return (TableEnvironment) constructor.newInstance( + catalogManager, + moduleManager, + blinkFunctionCatalog, + tblConfig, + senv.getJavaEnv(), + planner, + executor, + settings.isStreamingMode()); + } catch (NoSuchMethodException e) { + // Flink 1.11.1 change the constructor signature, FLINK-18419 + Constructor constructor = clazz.getConstructor( + CatalogManager.class, + ModuleManager.class, + FunctionCatalog.class, + TableConfig.class, + StreamExecutionEnvironment.class, + Planner.class, + Executor.class, + boolean.class, + ClassLoader.class); + return (TableEnvironment) constructor.newInstance( + catalogManager, + moduleManager, + blinkFunctionCatalog, + tblConfig, + senv.getJavaEnv(), + planner, + executor, + settings.isStreamingMode(), + classLoader); + } } catch (Exception e) { LOGGER.info(ExceptionUtils.getStackTrace(e)); throw new TableException("Fail to createJavaBlinkBatchTableEnvironment", e); diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala index 30ec177..4fc4a18 100644 --- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala +++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala @@ -395,27 +395,27 @@ class FlinkScalaInterpreter(val properties: Properties) { // blink planner var btEnvSetting = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build() - this.btenv = tblEnvFactory.createJavaBlinkBatchTableEnvironment(btEnvSetting); + this.btenv = tblEnvFactory.createJavaBlinkBatchTableEnvironment(btEnvSetting, getFlinkClassLoader); flinkILoop.bind("btenv", btenv.getClass().getCanonicalName(), btenv, List("@transient")) this.java_btenv = this.btenv var stEnvSetting = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build() - this.stenv = tblEnvFactory.createScalaBlinkStreamTableEnvironment(stEnvSetting) + this.stenv = tblEnvFactory.createScalaBlinkStreamTableEnvironment(stEnvSetting, getFlinkClassLoader) flinkILoop.bind("stenv", stenv.getClass().getCanonicalName(), stenv, List("@transient")) - this.java_stenv = tblEnvFactory.createJavaBlinkStreamTableEnvironment(stEnvSetting) + this.java_stenv = tblEnvFactory.createJavaBlinkStreamTableEnvironment(stEnvSetting, getFlinkClassLoader) // flink planner this.btenv_2 = tblEnvFactory.createScalaFlinkBatchTableEnvironment() flinkILoop.bind("btenv_2", btenv_2.getClass().getCanonicalName(), btenv_2, List("@transient")) stEnvSetting = EnvironmentSettings.newInstance().inStreamingMode().useOldPlanner().build() - this.stenv_2 = tblEnvFactory.createScalaFlinkStreamTableEnvironment(stEnvSetting) + this.stenv_2 = tblEnvFactory.createScalaFlinkStreamTableEnvironment(stEnvSetting, getFlinkClassLoader) flinkILoop.bind("stenv_2", stenv_2.getClass().getCanonicalName(), stenv_2, List("@transient")) this.java_btenv_2 = tblEnvFactory.createJavaFlinkBatchTableEnvironment() btEnvSetting = EnvironmentSettings.newInstance.useOldPlanner.inStreamingMode.build - this.java_stenv_2 = tblEnvFactory.createJavaFlinkStreamTableEnvironment(btEnvSetting) + this.java_stenv_2 = tblEnvFactory.createJavaFlinkStreamTableEnvironment(btEnvSetting, getFlinkClassLoader) } finally { Thread.currentThread().setContextClassLoader(originalClassLoader) } diff --git a/flink/pom.xml b/flink/pom.xml index d2bf16a..5e0ec61 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -43,7 +43,7 @@ <properties> <flink1.10.version>1.10.1</flink1.10.version> - <flink1.11.version>1.11.0</flink1.11.version> + <flink1.11.version>1.11.1</flink1.11.version> </properties> <dependencies> diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java index d873571..6040df1 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java @@ -48,8 +48,7 @@ import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -@RunWith(value = Parameterized.class) -public class FlinkIntegrationTest { +public abstract class FlinkIntegrationTest { private static Logger LOGGER = LoggerFactory.getLogger(FlinkIntegrationTest.class); private static MiniHadoopCluster hadoopCluster; @@ -68,14 +67,6 @@ public class FlinkIntegrationTest { this.hadoopHome = DownloadUtils.downloadHadoop("2.7.7"); } - @Parameterized.Parameters - public static List<Object[]> data() { - return Arrays.asList(new Object[][]{ - {"1.10.1"}, - {"1.11.0"} - }); - } - @BeforeClass public static void setUp() throws IOException { Configuration conf = new Configuration(); diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java new file mode 100644 index 0000000..ca7e399 --- /dev/null +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.integration; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.List; + +@RunWith(value = Parameterized.class) +public class FlinkIntegrationTest110 extends FlinkIntegrationTest { + + @Parameterized.Parameters + public static List<Object[]> data() { + return Arrays.asList(new Object[][]{ + {"1.10.0"}, + {"1.10.1"} + }); + } + + public FlinkIntegrationTest110(String flinkVersion) { + super(flinkVersion); + } +} diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest111.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest111.java new file mode 100644 index 0000000..b495844 --- /dev/null +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest111.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.integration; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.List; + +@RunWith(value = Parameterized.class) +public class FlinkIntegrationTest111 extends FlinkIntegrationTest { + + @Parameterized.Parameters + public static List<Object[]> data() { + return Arrays.asList(new Object[][]{ + {"1.11.0"}, + {"1.11.1"} + }); + } + + public FlinkIntegrationTest111(String flinkVersion) { + super(flinkVersion); + } +}