This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push: new 35e9d2b [ZEPPELIN-5156]. Support flink 1.12 35e9d2b is described below commit 35e9d2b461036de8b5f19a77edd56d2df168e260 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Wed Dec 9 14:00:14 2020 +0800 [ZEPPELIN-5156]. Support flink 1.12 ### What is this PR for? Flink 1.12 is almost the same as flink 1.11 in api perpsctive, the only change is ExecutionConfig#disableSysoutLogging is removed. So in this PR I fix this issue by didn't call this method if flink version is newer or equal to 1.12. ### What type of PR is it? [ Improvement] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5156 ### How should this be tested? * Manually tested on flink 1.12 ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3990 from zjffdu/ZEPPELIN-5156 and squashes the following commits: f5f64cd4d [Jeff Zhang] [ZEPPELIN-5156]. Support flink 1.12 (cherry picked from commit 89f4ee77464030bf0aae6f07af6aa4ba0250041e) Signed-off-by: Jeff Zhang <zjf...@apache.org> --- .github/workflows/core.yml | 9 +- .../java/org/apache/zeppelin/flink/FlinkShims.java | 11 +- .../org/apache/zeppelin/flink/FlinkVersion.java | 7 + .../org/apache/zeppelin/flink/Flink110Shims.java | 34 +++- .../org/apache/zeppelin/flink/Flink111Shims.java | 30 ++- flink/flink1.12-shims/pom.xml | 221 +++++++++++++++++++++ .../org/apache/zeppelin/flink/Flink112Shims.java} | 46 ++++- .../flink/shims112/CollectStreamTableSink.java | 97 +++++++++ .../flink/shims112/Flink112ScalaShims.scala | 36 ++++ flink/interpreter/pom.xml | 21 +- .../zeppelin/flink/FlinkScalaInterpreter.scala | 7 +- .../org/apache/zeppelin/flink/FlinkShell.scala | 26 +-- .../flink/FlinkStreamSqlInterpreterTest.java | 9 +- flink/pom.xml | 2 + ...nk_1_10.yml => env_python_3_with_flink_110.yml} | 0 ...nk_1_11.yml => env_python_3_with_flink_111.yml} | 0 ...nk_1_11.yml => env_python_3_with_flink_112.yml} | 9 +- .../integration/FlinkIntegrationTest112.java | 39 ++++ .../integration/ZeppelinFlinkClusterTest112.java | 38 ++++ 19 files changed, 595 insertions(+), 47 deletions(-) diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index 4faf67b..9e680de 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -126,8 +126,9 @@ jobs: test-flink-and-flink-integration-test: runs-on: ubuntu-18.04 strategy: + fail-fast: false matrix: - flink: [ flink_1_10, flink_1_11] + flink: [ 110, 111, 112] steps: - name: Checkout uses: actions/checkout@v2 @@ -148,15 +149,15 @@ jobs: uses: conda-incubator/setup-miniconda@v2 with: activate-environment: python_3_with_flink - environment-file: testing/env_python_3 with_${{ matrix.flink }}.yml + environment-file: testing/env_python_3_with_flink_${{ matrix.flink }}.yml python-version: 3.7 auto-activate-base: false - name: install environment run: | - mvn install -DskipTests -DskipRat -am -pl flink/interpreter,zeppelin-interpreter-integration -Pflink-1.10 -Pintegration -B + mvn install -DskipTests -DskipRat -am -pl flink/interpreter,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Pintegration -B mvn clean package -T 2C -pl zeppelin-plugins -amd -DskipTests -B - name: run tests - run: mvn test -DskipRat -pl flink/interpreter,zeppelin-interpreter-integration -Pflink-1.10 -Pintegration -B -Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest110,ZeppelinFlinkClusterTest110 + run: mvn test -DskipRat -pl flink/interpreter,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Pintegration -B -Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest${{ matrix.flink }},ZeppelinFlinkClusterTest${{ matrix.flink }} run-spark-intergration-test: runs-on: ubuntu-18.04 steps: diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java index f7a7514..717ef3d 100644 --- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java +++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java @@ -56,9 +56,12 @@ public abstract class FlinkShims { if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 10) { LOGGER.info("Initializing shims for Flink 1.10"); flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink110Shims"); - } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() >= 11) { + } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 11) { LOGGER.info("Initializing shims for Flink 1.11"); flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink111Shims"); + } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 12) { + LOGGER.info("Initializing shims for Flink 1.12"); + flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink112Shims"); } else { throw new Exception("Flink version: '" + flinkVersion + "' is not supported yet"); } @@ -92,6 +95,10 @@ public abstract class FlinkShims { .toAttributedString(); } + public abstract void disableSysoutLogging(Object batchConfig, Object streamConfig); + + public abstract Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment); + public abstract Object createCatalogManager(Object config); public abstract String getPyFlinkPythonPath(Properties properties) throws IOException; @@ -134,7 +141,7 @@ public abstract class FlinkShims { Object parser, Object environmentSetting); - public abstract Object getCustomCli(Object cliFrontend, Object commandLine); + public abstract Object updateEffectiveConfig(Object cliFrontend, Object commandLine, Object executorConfig); public abstract Map extractTableConfigOptions(); } diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java index d593746..dcf32d0 100644 --- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java +++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java @@ -26,6 +26,7 @@ public class FlinkVersion { private int majorVersion; private int minorVersion; private int patchVersion; + private int version; private String versionString; FlinkVersion(String versionString) { @@ -46,6 +47,8 @@ public class FlinkVersion { this.patchVersion = Integer.parseInt(versions[2]); } + this.version = Integer.parseInt(String.format("%d%02d%02d", + majorVersion, minorVersion, patchVersion)); } catch (Exception e) { logger.error("Can not recognize Flink version " + versionString + ". Assume it's a future release", e); @@ -56,6 +59,10 @@ public class FlinkVersion { return majorVersion; } + public boolean olderThan(FlinkVersion versionToCompare) { + return this.version < versionToCompare.version; + } + public int getMinorVersion() { return minorVersion; } diff --git a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java index e917b3c..ee551ea 100644 --- a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java +++ b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java @@ -19,13 +19,18 @@ package org.apache.zeppelin.flink; import org.apache.commons.cli.CommandLine; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.scala.DataSet; import org.apache.flink.client.cli.CliFrontend; +import org.apache.flink.client.cli.CustomCommandLine; import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; import org.apache.flink.python.PythonOptions; import org.apache.flink.python.util.ResourceUtil; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableUtils; @@ -40,6 +45,7 @@ import org.apache.flink.table.functions.TableAggregateFunction; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.types.Row; +import org.apache.flink.util.FlinkException; import org.apache.zeppelin.flink.shims110.CollectStreamTableSink; import org.apache.zeppelin.flink.shims110.Flink110ScalaShims; import org.apache.zeppelin.flink.sql.SqlCommandParser; @@ -100,6 +106,22 @@ public class Flink110Shims extends FlinkShims { } @Override + public void disableSysoutLogging(Object batchConfig, Object streamConfig) { + ((ExecutionConfig) batchConfig).disableSysoutLogging(); + ((ExecutionConfig) streamConfig).disableSysoutLogging(); + } + + @Override + public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) { + return new StreamExecutionEnvironmentFactory() { + @Override + public StreamExecutionEnvironment createExecutionEnvironment() { + return (StreamExecutionEnvironment) streamExecutionEnvironment; + } + }; + } + + @Override public Object createCatalogManager(Object config) { return new CatalogManager("default_catalog", new GenericInMemoryCatalog("default_catalog", "default_database")); @@ -242,18 +264,24 @@ public class Flink110Shims extends FlinkShims { } @Override - public Object getCustomCli(Object cliFrontend, Object commandLine) { + public Object updateEffectiveConfig(Object cliFrontend, Object commandLine, Object effectiveConfig) { + CustomCommandLine customCommandLine = null; try { - return ((CliFrontend) cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine); + customCommandLine = ((CliFrontend) cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine); } catch (NoSuchMethodError e) { try { Method method = CliFrontend.class.getMethod("getActiveCustomCommandLine", CommandLine.class); - return method.invoke((CliFrontend) cliFrontend, commandLine); + customCommandLine = (CustomCommandLine) method.invoke((CliFrontend) cliFrontend, commandLine); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ex) { LOGGER.error("Fail to call getCustomCli", ex); throw new RuntimeException("Fail to call getCustomCli", ex); } } + try { + return customCommandLine.applyCommandLineOptionsToConfiguration((CommandLine) commandLine); + } catch (FlinkException e) { + throw new RuntimeException("Fail to call applyCommandLineOptionsToConfiguration", e); + } } @Override diff --git a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java index 8fad5d4..9c26346 100644 --- a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java +++ b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java @@ -20,15 +20,19 @@ package org.apache.zeppelin.flink; import org.apache.commons.cli.CommandLine; import org.apache.commons.compress.utils.Lists; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.scala.DataSet; import org.apache.flink.client.cli.CliFrontend; +import org.apache.flink.client.cli.CustomCommandLine; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.execution.JobClient; import org.apache.flink.python.PythonOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.Table; @@ -76,6 +80,7 @@ import org.apache.flink.table.operations.ddl.DropViewOperation; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; +import org.apache.flink.util.FlinkException; import org.apache.zeppelin.flink.shims111.CollectStreamTableSink; import org.apache.zeppelin.flink.shims111.Flink111ScalaShims; import org.apache.zeppelin.flink.sql.SqlCommandParser; @@ -139,6 +144,22 @@ public class Flink111Shims extends FlinkShims { } @Override + public void disableSysoutLogging(Object batchConfig, Object streamConfig) { + ((ExecutionConfig) batchConfig).disableSysoutLogging(); + ((ExecutionConfig) streamConfig).disableSysoutLogging(); + } + + @Override + public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) { + return new StreamExecutionEnvironmentFactory() { + @Override + public StreamExecutionEnvironment createExecutionEnvironment() { + return (StreamExecutionEnvironment) streamExecutionEnvironment; + } + }; + } + + @Override public Object createCatalogManager(Object config) { return CatalogManager.newBuilder() .classLoader(Thread.currentThread().getContextClassLoader()) @@ -402,8 +423,13 @@ public class Flink111Shims extends FlinkShims { } @Override - public Object getCustomCli(Object cliFrontend, Object commandLine) { - return ((CliFrontend)cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine); + public Object updateEffectiveConfig(Object cliFrontend, Object commandLine, Object effectiveConfig) { + CustomCommandLine customCommandLine = ((CliFrontend)cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine); + try { + return customCommandLine.applyCommandLineOptionsToConfiguration((CommandLine) commandLine); + } catch (FlinkException e) { + throw new RuntimeException("Fail to call applyCommandLineOptionsToConfiguration", e); + } } @Override diff --git a/flink/flink1.12-shims/pom.xml b/flink/flink1.12-shims/pom.xml new file mode 100644 index 0000000..e234b5c --- /dev/null +++ b/flink/flink1.12-shims/pom.xml @@ -0,0 +1,221 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-parent</artifactId> + <groupId>org.apache.zeppelin</groupId> + <version>0.9.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.zeppelin</groupId> + <artifactId>flink1.12-shims</artifactId> + <version>0.9.0-SNAPSHOT</version> + <packaging>jar</packaging> + <name>Zeppelin: Flink1.12 Shims</name> + + <properties> + <flink.version>${flink1.12.version}</flink.version> + <scala.binary.version>2.11</scala.binary.version> + <scala.version>2.11.12</scala.version> + </properties> + + <dependencies> + + <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>flink-shims</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_2.11</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_2.11</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala_2.11</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala-bridge_2.11</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge_2.11</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-scala_2.11</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.11</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-scala_2.11</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner-blink_2.11</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.reflections</groupId> + <artifactId>reflections</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_2.11</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-python_2.11</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-scala-shell_2.11</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + + <build> + <plugins> + + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <executions> + <execution> + <id>eclipse-add-source</id> + <goals> + <goal>add-source</goal> + </goals> + </execution> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + <execution> + <id>scala-test-compile-first</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + <configuration> + <scalaVersion>${scala.version}</scalaVersion> + <args> + <arg>-unchecked</arg> + <arg>-deprecation</arg> + <arg>-feature</arg> + <arg>-target:jvm-1.8</arg> + </args> + <jvmArgs> + <jvmArg>-Xms1024m</jvmArg> + <jvmArg>-Xmx1024m</jvmArg> + <jvmArg>-XX:MaxMetaspaceSize=${MaxMetaspace}</jvmArg> + </jvmArgs> + <javacArgs> + <javacArg>-source</javacArg> + <javacArg>${java.version}</javacArg> + <javacArg>-target</javacArg> + <javacArg>${java.version}</javacArg> + <javacArg>-Xlint:all,-serial,-path,-options</javacArg> + </javacArgs> + </configuration> + </plugin> + + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <executions> + <execution> + <id>copy-interpreter-setting</id> + <phase>none</phase> + <configuration> + <skip>true</skip> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> \ No newline at end of file diff --git a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java similarity index 92% copy from flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java copy to flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java index 8fad5d4..dc5a4db 100644 --- a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java +++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java @@ -20,15 +20,20 @@ package org.apache.zeppelin.flink; import org.apache.commons.cli.CommandLine; import org.apache.commons.compress.utils.Lists; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.scala.DataSet; import org.apache.flink.client.cli.CliFrontend; +import org.apache.flink.client.cli.CustomCommandLine; import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.execution.JobClient; import org.apache.flink.python.PythonOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.Table; @@ -76,8 +81,9 @@ import org.apache.flink.table.operations.ddl.DropViewOperation; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; +import org.apache.flink.util.FlinkException; import org.apache.zeppelin.flink.shims111.CollectStreamTableSink; -import org.apache.zeppelin.flink.shims111.Flink111ScalaShims; +import org.apache.zeppelin.flink.shims112.Flink112ScalaShims; import org.apache.zeppelin.flink.sql.SqlCommandParser; import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand; import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommandCall; @@ -103,11 +109,11 @@ import java.util.regex.Matcher; /** - * Shims for flink 1.11 + * Shims for flink 1.12 */ -public class Flink111Shims extends FlinkShims { +public class Flink112Shims extends FlinkShims { - private static final Logger LOGGER = LoggerFactory.getLogger(Flink111Shims.class); + private static final Logger LOGGER = LoggerFactory.getLogger(Flink112Shims.class); public static final AttributedString MESSAGE_HELP = new AttributedStringBuilder() .append("The following commands are available:\n\n") .append(formatCommand(SqlCommand.CREATE_TABLE, "Create table under current catalog and database.")) @@ -134,11 +140,27 @@ public class Flink111Shims extends FlinkShims { private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>(); - public Flink111Shims(Properties properties) { + public Flink112Shims(Properties properties) { super(properties); } @Override + public void disableSysoutLogging(Object batchConfig, Object streamConfig) { + // do nothing + } + + + @Override + public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) { + return new StreamExecutionEnvironmentFactory() { + @Override + public StreamExecutionEnvironment createExecutionEnvironment(Configuration configuration) { + return (StreamExecutionEnvironment) streamExecutionEnvironment; + } + }; + } + + @Override public Object createCatalogManager(Object config) { return CatalogManager.newBuilder() .classLoader(Thread.currentThread().getContextClassLoader()) @@ -211,12 +233,12 @@ public class Flink111Shims extends FlinkShims { @Override public Object fromDataSet(Object btenv, Object ds) { - return Flink111ScalaShims.fromDataSet((BatchTableEnvironment) btenv, (DataSet) ds); + return Flink112ScalaShims.fromDataSet((BatchTableEnvironment) btenv, (DataSet) ds); } @Override public Object toDataSet(Object btenv, Object table) { - return Flink111ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) table); + return Flink112ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) table); } @Override @@ -402,8 +424,14 @@ public class Flink111Shims extends FlinkShims { } @Override - public Object getCustomCli(Object cliFrontend, Object commandLine) { - return ((CliFrontend)cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine); + public Object updateEffectiveConfig(Object cliFrontend, Object commandLine, Object effectiveConfig) { + CustomCommandLine customCommandLine = ((CliFrontend)cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine); + try { + ((Configuration) effectiveConfig).addAll(customCommandLine.toConfiguration((CommandLine) commandLine)); + return effectiveConfig; + } catch (FlinkException e) { + throw new RuntimeException("Fail to call addAll", e); + } } @Override diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/shims112/CollectStreamTableSink.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/shims112/CollectStreamTableSink.java new file mode 100644 index 0000000..b98f406 --- /dev/null +++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/shims112/CollectStreamTableSink.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink.shims111; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.experimental.CollectSink; +import org.apache.flink.table.sinks.RetractStreamTableSink; +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.util.UUID; + +/** + * Table sink for collecting the results locally using sockets. + */ +public class CollectStreamTableSink implements RetractStreamTableSink<Row> { + + private static final Logger LOGGER = LoggerFactory.getLogger(CollectStreamTableSink.class); + + private final InetAddress targetAddress; + private final int targetPort; + private final TypeSerializer<Tuple2<Boolean, Row>> serializer; + + private String[] fieldNames; + private TypeInformation<?>[] fieldTypes; + + public CollectStreamTableSink(InetAddress targetAddress, + int targetPort, + TypeSerializer<Tuple2<Boolean, Row>> serializer) { + LOGGER.info("Use address: " + targetAddress.getHostAddress() + ":" + targetPort); + this.targetAddress = targetAddress; + this.targetPort = targetPort; + this.serializer = serializer; + } + + @Override + public String[] getFieldNames() { + return fieldNames; + } + + @Override + public TypeInformation<?>[] getFieldTypes() { + return fieldTypes; + } + + @Override + public CollectStreamTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { + final CollectStreamTableSink copy = + new CollectStreamTableSink(targetAddress, targetPort, serializer); + copy.fieldNames = fieldNames; + copy.fieldTypes = fieldTypes; + return copy; + } + + @Override + public TypeInformation<Row> getRecordType() { + return Types.ROW_NAMED(fieldNames, fieldTypes); + } + + @Override + public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> stream) { + // add sink + return stream + .addSink(new CollectSink<>(targetAddress, targetPort, serializer)) + .name("Zeppelin Flink Sql Stream Collect Sink " + UUID.randomUUID()) + .setParallelism(1); + } + + @Override + public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() { + return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType()); + } +} diff --git a/flink/flink1.12-shims/src/main/scala/org/apache/zeppelin/flink/shims112/Flink112ScalaShims.scala b/flink/flink1.12-shims/src/main/scala/org/apache/zeppelin/flink/shims112/Flink112ScalaShims.scala new file mode 100644 index 0000000..988ad4e --- /dev/null +++ b/flink/flink1.12-shims/src/main/scala/org/apache/zeppelin/flink/shims112/Flink112ScalaShims.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink.shims112 + +import org.apache.flink.api.scala.DataSet +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.table.api.Table +import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment +import org.apache.flink.types.Row + +object Flink112ScalaShims { + + def fromDataSet(btenv: BatchTableEnvironment, ds: DataSet[_]): Table = { + btenv.fromDataSet(ds) + } + + def toDataSet(btenv: BatchTableEnvironment, table: Table): DataSet[Row] = { + btenv.toDataSet[Row](table) + } +} diff --git a/flink/interpreter/pom.xml b/flink/interpreter/pom.xml index 8b34a19..2807552 100644 --- a/flink/interpreter/pom.xml +++ b/flink/interpreter/pom.xml @@ -77,6 +77,12 @@ <dependency> <groupId>org.apache.zeppelin</groupId> + <artifactId>flink1.12-shims</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.zeppelin</groupId> <artifactId>zeppelin-python</artifactId> <version>${project.version}</version> <exclusions> @@ -689,9 +695,9 @@ <!-- set sun.zip.disableMemoryMapping=true because of https://blogs.oracle.com/poonam/crashes-in-zipgetentry https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8191484 --> - <argLine>-Xmx4096m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true</argLine> + <argLine>-Xmx5120m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true</argLine> <!-- <argLine>-Xmx4096m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=6006</argLine>--> - +o <environmentVariables> <!-- <FLINK_HOME>/Users/jzhang/github/flink/build-target</FLINK_HOME>--> <FLINK_HOME>${project.build.directory}/flink-${flink.version}</FLINK_HOME> @@ -853,20 +859,27 @@ <profiles> <profile> - <id>flink-1.10</id> + <id>flink-110</id> <properties> <flink.version>${flink1.10.version}</flink.version> </properties> </profile> <profile> - <id>flink-1.11</id> + <id>flink-111</id> <properties> <flink.version>${flink1.11.version}</flink.version> </properties> </profile> <profile> + <id>flink-112</id> + <properties> + <flink.version>${flink1.12.version}</flink.version> + </properties> + </profile> + + <profile> <id>hive2</id> <activation> <activeByDefault>true</activeByDefault> diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala index 9865145..572426b 100644 --- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala +++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala @@ -445,8 +445,7 @@ class FlinkScalaInterpreter(val properties: Properties) { if (java.lang.Boolean.parseBoolean( properties.getProperty("zeppelin.flink.disableSysoutLogging", "true"))) { - this.benv.getConfig.disableSysoutLogging() - this.senv.getConfig.disableSysoutLogging() + flinkShims.disableSysoutLogging(this.benv.getConfig, this.senv.getConfig); } } @@ -511,9 +510,7 @@ class FlinkScalaInterpreter(val properties: Properties) { } private def setAsContext(): Unit = { - val streamFactory = new StreamExecutionEnvironmentFactory() { - override def createExecutionEnvironment = senv.getJavaEnv - } + val streamFactory = flinkShims.createStreamExecutionEnvironmentFactory(this.senv.getJavaEnv) //StreamExecutionEnvironment var method = classOf[JStreamExecutionEnvironment].getDeclaredMethod("initializeContextEnvironment", classOf[StreamExecutionEnvironmentFactory]) diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkShell.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkShell.scala index 3f814b9..e4bcf1b 100644 --- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkShell.scala +++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkShell.scala @@ -109,7 +109,7 @@ object FlinkShell { } private def deployNewYarnCluster(config: Config, flinkConfig: Configuration, flinkShims: FlinkShims) = { - val effectiveConfig = new Configuration(flinkConfig) + var effectiveConfig = new Configuration(flinkConfig) val args = parseArgList(config, "yarn-cluster") val configurationDirectory = getConfigDir(config) @@ -123,24 +123,25 @@ object FlinkShell { frontend.getCustomCommandLineOptions) val commandLine = CliFrontendParser.parse(commandLineOptions, args, true) - val customCLI = flinkShims.getCustomCli(frontend, commandLine).asInstanceOf[CustomCommandLine] - val executorConfig = customCLI.applyCommandLineOptionsToConfiguration(commandLine) + effectiveConfig = flinkShims + .updateEffectiveConfig(frontend, commandLine, effectiveConfig) + .asInstanceOf[Configuration] val serviceLoader = new DefaultClusterClientServiceLoader - val clientFactory = serviceLoader.getClusterClientFactory(executorConfig) - val clusterDescriptor = clientFactory.createClusterDescriptor(executorConfig) - val clusterSpecification = clientFactory.getClusterSpecification(executorConfig) + val clientFactory = serviceLoader.getClusterClientFactory(effectiveConfig) + val clusterDescriptor = clientFactory.createClusterDescriptor(effectiveConfig) + val clusterSpecification = clientFactory.getClusterSpecification(effectiveConfig) val clusterClient = try { clusterDescriptor .deploySessionCluster(clusterSpecification) .getClusterClient } finally { - executorConfig.set(DeploymentOptions.TARGET, "yarn-session") + effectiveConfig.set(DeploymentOptions.TARGET, "yarn-session") clusterDescriptor.close() } - (executorConfig, Some(clusterClient)) + (effectiveConfig, Some(clusterClient)) } private def fetchDeployedYarnClusterInfo( @@ -149,7 +150,7 @@ object FlinkShell { mode: String, flinkShims: FlinkShims) = { - val effectiveConfig = new Configuration(flinkConfig) + var effectiveConfig = new Configuration(flinkConfig) val args = parseArgList(config, mode) val configurationDirectory = getConfigDir(config) @@ -163,10 +164,11 @@ object FlinkShell { frontend.getCustomCommandLineOptions) val commandLine = CliFrontendParser.parse(commandLineOptions, args, true) - val customCLI = flinkShims.getCustomCli(frontend, commandLine).asInstanceOf[CustomCommandLine] - val executorConfig = customCLI.applyCommandLineOptionsToConfiguration(commandLine); + effectiveConfig = flinkShims + .updateEffectiveConfig(frontend, commandLine, effectiveConfig) + .asInstanceOf[Configuration] - (executorConfig, None) + (effectiveConfig, None) } def parseArgList(config: Config, mode: String): Array[String] = { diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java index 16e7ee1..d27f422 100644 --- a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java +++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java @@ -331,8 +331,13 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest { result = sqlInterpreter.interpret("select url, count(1) as pv from " + "log group by url", context); - assertEquals(InterpreterResult.Code.ERROR, result.code()); - assertTrue(context.out.toString(), context.out.toString().contains("Cannot find checkpoint or savepoint")); + if (flinkInterpreter.getFlinkVersion().olderThan(FlinkVersion.fromVersionString("1.12.0"))) { + assertEquals(InterpreterResult.Code.ERROR, result.code()); + assertTrue(context.out.toString(), context.out.toString().contains("Cannot find checkpoint or savepoint")); + } else { + // flink 1.12 would start from scratch if save point is not found. + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + } } @Test diff --git a/flink/pom.xml b/flink/pom.xml index 2ddcef3..c9e9b32 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -39,11 +39,13 @@ <module>flink-shims</module> <module>flink1.10-shims</module> <module>flink1.11-shims</module> + <module>flink1.12-shims</module> </modules> <properties> <flink1.10.version>1.10.2</flink1.10.version> <flink1.11.version>1.11.2</flink1.11.version> + <flink1.12.version>1.12.0</flink1.12.version> </properties> <dependencies> diff --git a/testing/env_python_3 with_flink_1_10.yml b/testing/env_python_3_with_flink_110.yml similarity index 100% rename from testing/env_python_3 with_flink_1_10.yml rename to testing/env_python_3_with_flink_110.yml diff --git a/testing/env_python_3 with_flink_1_11.yml b/testing/env_python_3_with_flink_111.yml similarity index 100% copy from testing/env_python_3 with_flink_1_11.yml copy to testing/env_python_3_with_flink_111.yml diff --git a/testing/env_python_3 with_flink_1_11.yml b/testing/env_python_3_with_flink_112.yml similarity index 86% rename from testing/env_python_3 with_flink_1_11.yml rename to testing/env_python_3_with_flink_112.yml index e23bedb..8af5119 100644 --- a/testing/env_python_3 with_flink_1_11.yml +++ b/testing/env_python_3_with_flink_112.yml @@ -3,6 +3,10 @@ channels: - conda-forge - defaults dependencies: + - pip + - pip: + - bkzep==0.6.1 + - apache-flink==1.12.0 - pycodestyle - numpy=1 - pandas=0.25 @@ -19,7 +23,4 @@ dependencies: - panel - holoviews - pyyaml=3 - - pip - - pip: - - bkzep==0.6.1 - - apache-flink==1.11.1 + diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest112.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest112.java new file mode 100644 index 0000000..56b318b --- /dev/null +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest112.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.integration; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.List; + +@RunWith(value = Parameterized.class) +public class FlinkIntegrationTest112 extends FlinkIntegrationTest { + + @Parameterized.Parameters + public static List<Object[]> data() { + return Arrays.asList(new Object[][]{ + {"1.12.0"} + }); + } + + public FlinkIntegrationTest112(String flinkVersion) { + super(flinkVersion); + } +} diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest112.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest112.java new file mode 100644 index 0000000..443b254 --- /dev/null +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest112.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.integration; + +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.List; + +//@RunWith(value = Parameterized.class) +public class ZeppelinFlinkClusterTest112 extends ZeppelinFlinkClusterTest { + + @Parameterized.Parameters + public static List<Object[]> data() { + return Arrays.asList(new Object[][]{ + {"1.12.0"} + }); + } + + public ZeppelinFlinkClusterTest112(String flinkVersion) throws Exception { + super(flinkVersion); + } +}