This is an automated email from the ASF dual-hosted git repository. jongyoul pushed a commit to branch branch-0.10 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.10 by this push: new 8e2c92f [ZEPPELIN-5469] Support Flink 1.14 8e2c92f is described below commit 8e2c92f077c2f38e1fbedbefe048111a238f62f8 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Mon Jul 26 18:29:32 2021 +0800 [ZEPPELIN-5469] Support Flink 1.14 ### What is this PR for? This PR is to add support of Flink 1.14. main changes: * Add new module flink114-shims * Add new profile flink-114. (flink 1.14 change its module, so we move some flink dependency into profile. e.g. there's no module `flink-runtime_${flink.scala.binary.version}` in flink-1.14, it changes to `flink-runtime` * flink planner is not supported in flink-1.14, so there's no `bt_env2` and `st_env2` in flink-1.14 ### What type of PR is it? [ Feature ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5469 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #4195 from zjffdu/ZEPPELIN-5469 and squashes the following commits: 465b8778b [Jeff Zhang] [ZEPPELIN-5469] Support Flink 1.14 (cherry picked from commit 1e63b4ff8d3476391a6db5e28ba59b0fe460c8e2) Signed-off-by: Jongyoul Lee <jongy...@gmail.com> --- .github/workflows/core.yml | 2 +- flink/flink-scala-parent/pom.xml | 190 +++++++++++++++++--- .../apache/zeppelin/flink/IPyFlinkInterpreter.java | 4 + .../apache/zeppelin/flink/PyFlinkInterpreter.java | 4 + .../org/apache/zeppelin/flink/TableEnvFactory.java | 112 ++++-------- .../flink/YarnApplicationStreamEnvironment.java | 19 +- .../internal/ScalaShellStreamEnvironment.java | 31 +++- .../zeppelin/flink/sql/AbstractStreamSqlJob.java | 7 +- .../zeppelin/flink/sql/AppendStreamSqlJob.java | 33 ++-- .../src/main/resources/python/zeppelin_ipyflink.py | 12 +- .../src/main/resources/python/zeppelin_pyflink.py | 13 +- .../zeppelin/flink/FlinkScalaInterpreter.scala | 30 ++-- .../zeppelin/flink/FlinkZeppelinContext.scala | 14 +- .../zeppelin/flink/internal/FlinkILoop.scala | 1 + .../flink/FlinkBatchSqlInterpreterTest.java | 24 +-- .../zeppelin/flink/FlinkInterpreterTest.java | 11 +- .../zeppelin/flink/IPyFlinkInterpreterTest.java | 29 ++- .../zeppelin/flink/PyFlinkInterpreterTest.java | 18 +- .../java/org/apache/zeppelin/flink/FlinkShims.java | 22 ++- .../org/apache/zeppelin/flink/FlinkVersion.java | 4 + .../org/apache/zeppelin/flink/Flink110Shims.java | 55 +++++- .../org/apache/zeppelin/flink/Flink111Shims.java | 56 +++++- .../org/apache/zeppelin/flink/Flink112Shims.java | 55 +++++- .../org/apache/zeppelin/flink/Flink113Shims.java | 54 +++++- flink/flink1.14-shims/pom.xml | 199 +++++++++++++++++++++ .../org/apache/zeppelin/flink/Flink114Shims.java} | 72 ++++++-- .../flink/shims114/CollectStreamTableSink.java | 97 ++++++++++ flink/pom.xml | 2 + testing/env_python_3_with_flink_114.yml | 27 +++ .../integration/FlinkIntegrationTest114.java | 40 +++++ 30 files changed, 1023 insertions(+), 214 deletions(-) diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index 3f866fd..e33fe18 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -148,7 +148,7 @@ jobs: strategy: fail-fast: false matrix: - flink: [110, 111, 112, 113] + flink: [110, 111, 112, 113, 114] steps: - name: Checkout uses: actions/checkout@v2 diff --git a/flink/flink-scala-parent/pom.xml b/flink/flink-scala-parent/pom.xml index a539e10..ff3d09e 100644 --- a/flink/flink-scala-parent/pom.xml +++ b/flink/flink-scala-parent/pom.xml @@ -80,6 +80,12 @@ <dependency> <groupId>org.apache.zeppelin</groupId> + <artifactId>flink1.14-shims</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.zeppelin</groupId> <artifactId>zeppelin-python</artifactId> <version>${project.version}</version> <exclusions> @@ -160,13 +166,6 @@ <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> <artifactId>flink-yarn_${flink.scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> @@ -206,6 +205,13 @@ <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${flink.scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> @@ -321,26 +327,6 @@ </dependency> <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>org.reflections</groupId> - <artifactId>reflections</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${flink.hadoop.version}</version> @@ -624,7 +610,6 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> - </dependencies> <build> @@ -950,6 +935,38 @@ <properties> <flink.version>${flink1.10.version}</flink.version> </properties> + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.reflections</groupId> + <artifactId>reflections</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> </profile> <profile> @@ -957,6 +974,38 @@ <properties> <flink.version>${flink1.11.version}</flink.version> </properties> + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.reflections</groupId> + <artifactId>reflections</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> </profile> <profile> @@ -964,6 +1013,38 @@ <properties> <flink.version>${flink1.12.version}</flink.version> </properties> + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.reflections</groupId> + <artifactId>reflections</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> </profile> <profile> @@ -971,6 +1052,59 @@ <properties> <flink.version>${flink1.13.version}</flink.version> </properties> + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.reflections</groupId> + <artifactId>reflections</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + </profile> + + <profile> + <id>flink-114</id> + <properties> + <flink.version>${flink1.14.version}</flink.version> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> </profile> <profile> diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java index 763795f..25ee255 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java @@ -61,6 +61,10 @@ public class IPyFlinkInterpreter extends IPythonInterpreter { opened = true; } + public boolean isAfterFlink114() { + return flinkInterpreter.getFlinkVersion().isAfterFlink114(); + } + @Override public ZeppelinContext buildZeppelinContext() { return flinkInterpreter.getZeppelinContext(); diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java index d27f0fa..4162cc3 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java @@ -186,6 +186,10 @@ public class PyFlinkInterpreter extends PythonInterpreter { return flinkInterpreter.getFlinkVersion().isFlink110(); } + public boolean isAfterFlink114() { + return flinkInterpreter.getFlinkVersion().isAfterFlink114(); + } + public org.apache.flink.api.java.ExecutionEnvironment getJavaExecutionEnvironment() { return flinkInterpreter.getExecutionEnvironment().getJavaEnv(); } diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java index 46ad481..6c080fd 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java @@ -18,6 +18,7 @@ package org.apache.zeppelin.flink; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; @@ -27,17 +28,12 @@ import org.apache.flink.table.api.TableException; import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.delegation.Executor; -import org.apache.flink.table.delegation.ExecutorFactory; import org.apache.flink.table.delegation.Planner; -import org.apache.flink.table.delegation.PlannerFactory; -import org.apache.flink.table.factories.ComponentFactoryService; import org.apache.flink.table.module.ModuleManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.Constructor; -import java.lang.reflect.Method; -import java.util.Map; /** * Factory class for creating flink table env for different purpose: @@ -52,7 +48,6 @@ public class TableEnvFactory { private FlinkVersion flinkVersion; private FlinkShims flinkShims; - private Executor executor; private org.apache.flink.api.scala.ExecutionEnvironment benv; private org.apache.flink.streaming.api.scala.StreamExecutionEnvironment senv; @@ -133,17 +128,11 @@ public class TableEnvFactory { public TableEnvironment createScalaFlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) { try { - Map<String, String> executorProperties = settings.toExecutorProperties(); - Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv()); - - Map<String, String> plannerProperties = settings.toPlannerProperties(); - Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) - .create( - plannerProperties, - executor, - streamTableConfig, - oldPlannerFunctionCatalog, - catalogManager); + ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor( + classLoader, settings, senv.getJavaEnv(), + oldPlannerStreamTableConfig, functionCatalog, catalogManager); + Planner planner = (Planner) pair.left; + Executor executor = (Executor) pair.right; Class clazz = null; if (flinkVersion.isFlink110()) { @@ -231,15 +220,14 @@ public class TableEnvFactory { } } - public TableEnvironment createJavaFlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) { - + public TableEnvironment createJavaFlinkStreamTableEnvironment(EnvironmentSettings settings, + ClassLoader classLoader) { try { - Map<String, String> executorProperties = settings.toExecutorProperties(); - Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv()); - - Map<String, String> plannerProperties = settings.toPlannerProperties(); - Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) - .create(plannerProperties, executor, streamTableConfig, oldPlannerFunctionCatalog, catalogManager); + ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor( + classLoader, settings, senv.getJavaEnv(), + oldPlannerBatchTableConfig, functionCatalog, catalogManager); + Planner planner = (Planner) pair.left; + Executor executor = (Executor) pair.right; Class clazz = null; if (flinkVersion.isFlink110()) { @@ -303,18 +291,11 @@ public class TableEnvFactory { public TableEnvironment createScalaBlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) { try { - Map<String, String> executorProperties = settings.toExecutorProperties(); - Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv()); - - Map<String, String> plannerProperties = settings.toPlannerProperties(); - Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) - .create( - plannerProperties, - executor, - streamTableConfig, - functionCatalog, - catalogManager); - + ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor( + classLoader, settings, senv.getJavaEnv(), + streamTableConfig, functionCatalog, catalogManager); + Planner planner = (Planner) pair.left; + Executor executor = (Executor) pair.right; Class clazz = null; if (flinkVersion.isFlink110()) { @@ -372,14 +353,12 @@ public class TableEnvFactory { } public TableEnvironment createJavaBlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) { - try { - Map<String, String> executorProperties = settings.toExecutorProperties(); - Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv()); - - Map<String, String> plannerProperties = settings.toPlannerProperties(); - Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) - .create(plannerProperties, executor, streamTableConfig, functionCatalog, catalogManager); + ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor( + classLoader, settings, senv.getJavaEnv(), + streamTableConfig, functionCatalog, catalogManager); + Planner planner = (Planner) pair.left; + Executor executor = (Executor) pair.right; Class clazz = null; if (flinkVersion.isFlink110()) { @@ -439,11 +418,11 @@ public class TableEnvFactory { public TableEnvironment createJavaBlinkBatchTableEnvironment( EnvironmentSettings settings, ClassLoader classLoader) { try { - final Map<String, String> executorProperties = settings.toExecutorProperties(); - executor = lookupExecutor(executorProperties, senv.getJavaEnv()); - final Map<String, String> plannerProperties = settings.toPlannerProperties(); - final Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) - .create(plannerProperties, executor, batchTableConfig, functionCatalog, catalogManager); + ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor( + classLoader, settings, senv.getJavaEnv(), + batchTableConfig, functionCatalog, catalogManager); + Planner planner = (Planner) pair.left; + Executor executor = (Executor) pair.right; Class clazz = null; if (flinkVersion.isFlink110()) { @@ -501,38 +480,11 @@ public class TableEnvFactory { } } - public void createStreamPlanner(EnvironmentSettings settings) { - Map<String, String> executorProperties = settings.toExecutorProperties(); - Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv()); - - Map<String, String> plannerProperties = settings.toPlannerProperties(); - Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) - .create( - plannerProperties, - executor, - streamTableConfig, - functionCatalog, - catalogManager); + ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor( + Thread.currentThread().getContextClassLoader(), settings, senv.getJavaEnv(), + streamTableConfig, functionCatalog, catalogManager); + Planner planner = (Planner) pair.left; this.flinkShims.setCatalogManagerSchemaResolver(catalogManager, planner.getParser(), settings); } - - private static Executor lookupExecutor( - Map<String, String> executorProperties, - StreamExecutionEnvironment executionEnvironment) { - try { - ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties); - Method createMethod = executorFactory.getClass() - .getMethod("create", Map.class, StreamExecutionEnvironment.class); - - return (Executor) createMethod.invoke( - executorFactory, - executorProperties, - executionEnvironment); - } catch (Exception e) { - throw new TableException( - "Could not instantiate the executor. Make sure a planner module is on the classpath", - e); - } - } } diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java index ca9089f..7f2fc92 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java @@ -30,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.lang.reflect.Field; import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; @@ -72,7 +73,7 @@ public class YarnApplicationStreamEnvironment extends StreamExecutionEnvironment } private void updateDependencies() throws Exception { - final Configuration configuration = getConfiguration(); + final Configuration configuration = (Configuration) getFlinkConfiguration(); checkState( configuration.getBoolean(DeploymentOptions.ATTACHED), "Only ATTACHED mode is supported by the scala shell."); @@ -82,6 +83,22 @@ public class YarnApplicationStreamEnvironment extends StreamExecutionEnvironment configuration, PipelineOptions.JARS, updatedJarFiles, URL::toString); } + public Object getFlinkConfiguration() { + if (flinkScalaInterpreter.getFlinkVersion().isAfterFlink114()) { + // starting from Flink 1.14, getConfiguration() return the readonly copy of internal + // configuration, so we need to get the internal configuration object via reflection. + try { + Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration"); + configurationField.setAccessible(true); + return configurationField.get(this); + } catch (Exception e) { + throw new RuntimeException("Fail to get configuration from StreamExecutionEnvironment", e); + } + } else { + return super.getConfiguration(); + } + } + private List<URL> getUpdatedJarFiles() throws MalformedURLException { final URL jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL(); final List<URL> allJarFiles = new ArrayList<>(); diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java index 448c34b..6bba854 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java @@ -19,24 +19,22 @@ package org.apache.zeppelin.flink.internal; -import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.configuration.ConfigUtils; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.util.JarUtils; +import org.apache.zeppelin.flink.FlinkVersion; +import java.lang.reflect.Field; import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; import java.util.List; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** * This class is copied from flink project, the reason is that flink scala shell only supports @@ -54,12 +52,17 @@ public class ScalaShellStreamEnvironment extends StreamExecutionEnvironment { */ private final FlinkILoop flinkILoop; + private final FlinkVersion flinkVersion; + + public ScalaShellStreamEnvironment( final Configuration configuration, final FlinkILoop flinkILoop, + final FlinkVersion flinkVersion, final String... jarFiles) { super(configuration); this.flinkILoop = checkNotNull(flinkILoop); + this.flinkVersion = checkNotNull(flinkVersion); this.jarFiles = checkNotNull(JarUtils.getJarFiles(jarFiles)); } @@ -70,16 +73,28 @@ public class ScalaShellStreamEnvironment extends StreamExecutionEnvironment { } private void updateDependencies() throws Exception { - final Configuration configuration = getConfiguration(); final List<URL> updatedJarFiles = getUpdatedJarFiles(); ConfigUtils.encodeCollectionToConfig( - configuration, PipelineOptions.JARS, updatedJarFiles, URL::toString); + (Configuration) getFlinkConfiguration(), PipelineOptions.JARS, updatedJarFiles, URL::toString); } - public Configuration getClientConfiguration() { - return getConfiguration(); + public Object getFlinkConfiguration() { + if (flinkVersion.isAfterFlink114()) { + // starting from Flink 1.14, getConfiguration() return the readonly copy of internal + // configuration, so we need to get the internal configuration object via reflection. + try { + Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration"); + configurationField.setAccessible(true); + return configurationField.get(this); + } catch (Exception e) { + throw new RuntimeException("Fail to get configuration from StreamExecutionEnvironment", e); + } + } else { + return super.getConfiguration(); + } } + private List<URL> getUpdatedJarFiles() throws MalformedURLException { final URL jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL(); final List<URL> allJarFiles = new ArrayList<>(jarFiles); diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java index 1fb7832..8288615 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java @@ -29,7 +29,6 @@ import org.apache.flink.streaming.experimental.SocketStreamIterator; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.calcite.FlinkTypeFactory; import org.apache.flink.table.sinks.RetractStreamTableSink; import org.apache.flink.types.Row; import org.apache.zeppelin.flink.FlinkShims; @@ -86,12 +85,12 @@ public abstract class AbstractStreamSqlJob { this.flinkShims = flinkShims; } - private static TableSchema removeTimeAttributes(TableSchema schema) { + private static TableSchema removeTimeAttributes(FlinkShims flinkShims, TableSchema schema) { final TableSchema.Builder builder = TableSchema.builder(); for (int i = 0; i < schema.getFieldCount(); i++) { final TypeInformation<?> type = schema.getFieldTypes()[i]; final TypeInformation<?> convertedType; - if (FlinkTypeFactory.isTimeIndicatorType(type)) { + if (flinkShims.isTimeIndicatorType(type)) { convertedType = Types.SQL_TIMESTAMP; } else { convertedType = type; @@ -115,7 +114,7 @@ public abstract class AbstractStreamSqlJob { this.table = table; int parallelism = Integer.parseInt(context.getLocalProperties() .getOrDefault("parallelism", defaultParallelism + "")); - this.schema = removeTimeAttributes(table.getSchema()); + this.schema = removeTimeAttributes(flinkShims, table.getSchema()); checkTableSchema(schema); LOGGER.info("ResultTable Schema: " + this.schema); diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java index 7f636f3..c6791ea 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java @@ -31,6 +31,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.sql.Timestamp; +import java.time.temporal.TemporalField; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -98,16 +100,27 @@ public class AppendStreamSqlJob extends AbstractStreamSqlJob { }); if (materializedTable.size() != 0) { - long maxTimestamp = - ((java.sql.Timestamp) materializedTable.get(materializedTable.size() - 1) - .getField(0)).getTime(); - - materializedTable = materializedTable.stream() - .filter(row -> ((java.sql.Timestamp) row.getField(0)).getTime() > - maxTimestamp - tsWindowThreshold) - .collect(Collectors.toList()); - - builder.append(tableToString(materializedTable)); + // Timestamp type before/after Flink 1.14 has changed. + if (flinkShims.getFlinkVersion().isAfterFlink114()) { + java.time.LocalDateTime ldt = ((java.time.LocalDateTime) materializedTable + .get(materializedTable.size() - 1) + .getField(0)); + final long maxTimestamp = Timestamp.valueOf(ldt).getTime(); + materializedTable = materializedTable.stream() + .filter(row -> Timestamp.valueOf(((java.time.LocalDateTime) row.getField(0))).getTime() > + maxTimestamp - tsWindowThreshold) + .collect(Collectors.toList()); + builder.append(tableToString(materializedTable)); + } else { + final long maxTimestamp = + ((java.sql.Timestamp) materializedTable.get(materializedTable.size() - 1) + .getField(0)).getTime(); + materializedTable = materializedTable.stream() + .filter(row -> ((java.sql.Timestamp) row.getField(0)).getTime() > + maxTimestamp - tsWindowThreshold) + .collect(Collectors.toList()); + builder.append(tableToString(materializedTable)); + } } builder.append("\n%text "); return builder.toString(); diff --git a/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py b/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py index 50efbea..9249453 100644 --- a/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py +++ b/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py @@ -19,7 +19,6 @@ from py4j.java_gateway import java_import, JavaGateway, GatewayClient from pyflink.common import * -from pyflink.dataset import * from pyflink.datastream import * from pyflink.table import * from pyflink.table.catalog import * @@ -45,19 +44,24 @@ pyflink.java_gateway._gateway = gateway pyflink.java_gateway.import_flink_view(gateway) pyflink.java_gateway.install_exception_handler() -b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment()) s_env = StreamExecutionEnvironment(intp.getJavaStreamExecutionEnvironment()) if intp.isFlink110(): + from pyflink.dataset import * + b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment()) bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"), True) bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"), False) st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"), True) st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink"), False) -else: +elif not intp.isAfterFlink114(): + from pyflink.dataset import * + b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment()) bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink")) - bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink")) st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink")) + bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink")) st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink")) +else: + st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink")) class IPyFlinkZeppelinContext(PyZeppelinContext): diff --git a/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py b/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py index 542ab8f..06b99c9 100644 --- a/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py +++ b/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py @@ -16,7 +16,6 @@ # from pyflink.common import * -from pyflink.dataset import * from pyflink.datastream import * from pyflink.table import * from pyflink.table.catalog import * @@ -34,19 +33,25 @@ pyflink.java_gateway._gateway = gateway pyflink.java_gateway.import_flink_view(gateway) pyflink.java_gateway.install_exception_handler() -b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment()) s_env = StreamExecutionEnvironment(intp.getJavaStreamExecutionEnvironment()) if intp.isFlink110(): + from pyflink.dataset import * + b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment()) bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"), True) bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"), False) st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"), True) st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink"), False) -else: +elif not intp.isAfterFlink114(): + from pyflink.dataset import * + b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment()) bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink")) - bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink")) st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink")) + bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink")) st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink")) +else: + st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink")) + from zeppelin_context import PyZeppelinContext diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala index 1545bcd..0918084 100644 --- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala +++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala @@ -447,17 +447,19 @@ abstract class FlinkScalaInterpreter(val properties: Properties, flinkILoop.bind("stenv", stenv.getClass().getCanonicalName(), stenv, List("@transient")) this.java_stenv = tblEnvFactory.createJavaBlinkStreamTableEnvironment(stEnvSetting, getFlinkClassLoader) - // flink planner - this.btenv_2 = tblEnvFactory.createScalaFlinkBatchTableEnvironment() - flinkILoop.bind("btenv_2", btenv_2.getClass().getCanonicalName(), btenv_2, List("@transient")) - stEnvSetting = - EnvironmentSettings.newInstance().inStreamingMode().useOldPlanner().build() - this.stenv_2 = tblEnvFactory.createScalaFlinkStreamTableEnvironment(stEnvSetting, getFlinkClassLoader) - flinkILoop.bind("stenv_2", stenv_2.getClass().getCanonicalName(), stenv_2, List("@transient")) - - this.java_btenv_2 = tblEnvFactory.createJavaFlinkBatchTableEnvironment() - btEnvSetting = EnvironmentSettings.newInstance.useOldPlanner.inStreamingMode.build - this.java_stenv_2 = tblEnvFactory.createJavaFlinkStreamTableEnvironment(btEnvSetting, getFlinkClassLoader) + if (!flinkVersion.isAfterFlink114()) { + // flink planner is not supported after flink 1.14 + this.btenv_2 = tblEnvFactory.createScalaFlinkBatchTableEnvironment() + flinkILoop.bind("btenv_2", btenv_2.getClass().getCanonicalName(), btenv_2, List("@transient")) + stEnvSetting = + EnvironmentSettings.newInstance().inStreamingMode().useOldPlanner().build() + this.stenv_2 = tblEnvFactory.createScalaFlinkStreamTableEnvironment(stEnvSetting, getFlinkClassLoader) + flinkILoop.bind("stenv_2", stenv_2.getClass().getCanonicalName(), stenv_2, List("@transient")) + + this.java_btenv_2 = tblEnvFactory.createJavaFlinkBatchTableEnvironment() + btEnvSetting = EnvironmentSettings.newInstance.useOldPlanner.inStreamingMode.build + this.java_stenv_2 = tblEnvFactory.createJavaFlinkStreamTableEnvironment(btEnvSetting, getFlinkClassLoader) + } } finally { Thread.currentThread().setContextClassLoader(originalClassLoader) } @@ -919,8 +921,8 @@ abstract class FlinkScalaInterpreter(val properties: Properties, def completion(buf: String, cursor: Int, context: InterpreterContext): java.util.List[InterpreterCompletion] private def getConfigurationOfStreamExecutionEnv(): Configuration = { - val getConfigurationMethod = classOf[JStreamExecutionEnvironment].getDeclaredMethod("getConfiguration") - getConfigurationMethod.setAccessible(true) - getConfigurationMethod.invoke(this.senv.getJavaEnv).asInstanceOf[Configuration] + val configurationField = classOf[JStreamExecutionEnvironment].getDeclaredField("configuration") + configurationField.setAccessible(true) + configurationField.get(this.senv.getJavaEnv).asInstanceOf[Configuration] } } diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala index 517c67a..b5dba22 100644 --- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala +++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala @@ -96,11 +96,15 @@ class FlinkZeppelinContext(val flinkInterpreter: FlinkScalaInterpreter, override def showData(obj: Any, maxResult: Int): String = { if (obj.isInstanceOf[DataSet[_]]) { val ds = obj.asInstanceOf[DataSet[_]] - val btenv = flinkInterpreter.getBatchTableEnvironment("flink") - val table = flinkInterpreter.getFlinkShims.fromDataSet(btenv, ds).asInstanceOf[Table] - val columnNames: Array[String] = table.getSchema.getFieldNames - val dsRows: DataSet[Row] = flinkInterpreter.getFlinkShims.toDataSet(btenv, table).asInstanceOf[DataSet[Row]] - showTable(columnNames, dsRows.first(maxResult + 1).collect()) + if (flinkInterpreter.getFlinkVersion.isAfterFlink114) { + "z.show(DataSet) is not supported after Flink 1.14" + } else { + val btenv = flinkInterpreter.getBatchTableEnvironment("flink") + val table = flinkInterpreter.getFlinkShims.fromDataSet(btenv, ds).asInstanceOf[Table] + val columnNames: Array[String] = table.getSchema.getFieldNames + val dsRows: DataSet[Row] = flinkInterpreter.getFlinkShims.toDataSet(btenv, table).asInstanceOf[DataSet[Row]] + showTable(columnNames, dsRows.first(maxResult + 1).collect()) + } } else if (obj.isInstanceOf[Table]) { val rows = JavaConversions.asScalaBuffer( flinkInterpreter.getFlinkShims.collectToList(obj.asInstanceOf[TableImpl]).asInstanceOf[java.util.List[Row]]).toSeq diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala index 27ab381..86abeb5 100644 --- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala +++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala @@ -60,6 +60,7 @@ class FlinkILoop( val remoteSenv = new ScalaShellStreamEnvironment( flinkConfig, this, + flinkScalaInterpreter.getFlinkVersion, getExternalJars(): _*) (remoteBenv,remoteSenv) diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java index e09bd1b..875756f 100644 --- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java +++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java @@ -71,14 +71,16 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest { assertEquals("id\tname\n1\ta\n2\tb\n", resultMessages.get(0).getData()); // z.show - context = getInterpreterContext(); - result = - flinkInterpreter.interpret("z.show(btenv.sqlQuery(\"select * from source_table\"))", context); - resultMessages = context.out.toInterpreterResultMessage(); - assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(1, resultMessages.size()); - assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType()); - assertEquals("id\tname\n1\ta\n2\tb\n", resultMessages.get(0).getData()); + if (!flinkInterpreter.getFlinkVersion().isAfterFlink114()) { + context = getInterpreterContext(); + result = + flinkInterpreter.interpret("z.show(btenv.sqlQuery(\"select * from source_table\"))", context); + resultMessages = context.out.toInterpreterResultMessage(); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, resultMessages.size()); + assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType()); + assertEquals("id\tname\n1\ta\n2\tb\n", resultMessages.get(0).getData()); + } // define scala udf result = flinkInterpreter.interpret( @@ -87,7 +89,7 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest { "}", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - result = flinkInterpreter.interpret("btenv.registerFunction(\"addOne\", new AddOne())", + result = flinkInterpreter.interpret("stenv.registerFunction(\"addOne\", new AddOne())", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); @@ -108,7 +110,7 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest { assertEquals(InterpreterResult.Code.SUCCESS, result.code()); context = getInterpreterContext(); - result = pyFlinkInterpreter.interpret("bt_env.register_function(\"python_upper\", " + + result = pyFlinkInterpreter.interpret("st_env.register_function(\"python_upper\", " + "udf(PythonUpper(), DataTypes.STRING(), DataTypes.STRING()))", context); assertEquals(result.toString(), InterpreterResult.Code.SUCCESS, result.code()); @@ -133,7 +135,7 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest { " return s.upper()", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - result = iPyFlinkInterpreter.interpret("bt_env.register_function(\"ipython_upper\", " + + result = iPyFlinkInterpreter.interpret("st_env.register_function(\"ipython_upper\", " + "udf(IPythonUpper(), DataTypes.STRING(), DataTypes.STRING()))", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java index 8649138..c3939db 100644 --- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java +++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java @@ -231,8 +231,13 @@ public class FlinkInterpreterTest { result = interpreter.interpret("z.show(data)", context); assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage(); - assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType()); - assertEquals("_1\t_2\n1\tjeff\n2\tandy\n3\tjames\n", resultMessages.get(0).getData()); + if (interpreter.getFlinkVersion().isAfterFlink114()) { + assertEquals(InterpreterResult.Type.TEXT, resultMessages.get(0).getType()); + assertEquals("z.show(DataSet) is not supported after Flink 1.14", resultMessages.get(0).getData()); + } else { + assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType()); + assertEquals("_1\t_2\n1\tjeff\n2\tandy\n3\tjames\n", resultMessages.get(0).getData()); + } } @Test @@ -263,7 +268,7 @@ public class FlinkInterpreterTest { " .groupBy(0)\n" + " .sum(1)\n" + " .print()", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); String[] expectedCounts = {"(hello,3)", "(world,1)", "(flink,1)", "(hadoop,1)"}; Arrays.sort(expectedCounts); diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java index ac8d65b..3f06453 100644 --- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java +++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java @@ -60,7 +60,7 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest { private RemoteInterpreterEventClient mockIntpEventClient = mock(RemoteInterpreterEventClient.class); private LazyOpenInterpreter flinkScalaInterpreter; - + private FlinkInterpreter flinkInnerInterpreter; public IPyFlinkInterpreterTest() { super(); @@ -85,8 +85,8 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest { context.setIntpEventClient(mockIntpEventClient); InterpreterContext.set(context); - this.flinkScalaInterpreter = new LazyOpenInterpreter( - new FlinkInterpreter(properties)); + this.flinkInnerInterpreter = new FlinkInterpreter(properties); + this.flinkScalaInterpreter = new LazyOpenInterpreter(flinkInnerInterpreter); intpGroup = new InterpreterGroup(); intpGroup.put("session_1", new ArrayList<Interpreter>()); intpGroup.get("session_1").add(flinkScalaInterpreter); @@ -119,12 +119,16 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest { @Test public void testBatchIPyFlink() throws InterpreterException, IOException { - testBatchPyFlink(interpreter, flinkScalaInterpreter); + if (!flinkInnerInterpreter.getFlinkVersion().isAfterFlink114()) { + testBatchPyFlink(interpreter, flinkScalaInterpreter); + } } @Test public void testStreamIPyFlink() throws InterpreterException, IOException { - testStreamPyFlink(interpreter, flinkScalaInterpreter); + if (!flinkInnerInterpreter.getFlinkVersion().isAfterFlink114()) { + testStreamPyFlink(interpreter, flinkScalaInterpreter); + } } @Test @@ -154,7 +158,8 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest { testResumeStreamSqlFromSavePoint(interpreter, flinkScalaInterpreter); } - public static void testBatchPyFlink(Interpreter pyflinkInterpreter, Interpreter flinkScalaInterpreter) throws InterpreterException, IOException { + public static void testBatchPyFlink(Interpreter pyflinkInterpreter, + LazyOpenInterpreter flinkScalaInterpreter) throws InterpreterException, IOException { InterpreterContext context = createInterpreterContext(); InterpreterResult result = pyflinkInterpreter.interpret( "import tempfile\n" + @@ -273,9 +278,15 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest { , context); assertEquals(result.toString(),InterpreterResult.Code.SUCCESS, result.code()); List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage(); - assertEquals(context.out.toString(), 1, resultMessages.size()); - assertEquals(context.out.toString(), InterpreterResult.Type.TABLE, resultMessages.get(0).getType()); - assertEquals(context.out.toString(), "a\tb\tc\n1\thi\thello\n2\thi\thello\n", resultMessages.get(0).getData()); + FlinkVersion flinkVersion = ((FlinkInterpreter) flinkScalaInterpreter.getInnerInterpreter()).getFlinkVersion(); + if (flinkVersion.isAfterFlink114()) { + assertEquals(InterpreterResult.Type.TEXT, resultMessages.get(0).getType()); + assertEquals("z.show(DataSet) is not supported after Flink 1.14", resultMessages.get(0).getData()); + } else { + assertEquals(context.out.toString(), 1, resultMessages.size()); + assertEquals(context.out.toString(), InterpreterResult.Type.TABLE, resultMessages.get(0).getType()); + assertEquals(context.out.toString(), "a\tb\tc\n1\thi\thello\n2\thi\thello\n", resultMessages.get(0).getData()); + } } @Override diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java index c891ec1..0ecccfd 100644 --- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java +++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java @@ -40,9 +40,10 @@ import static org.mockito.Mockito.mock; public class PyFlinkInterpreterTest extends PythonInterpreterTest { - private Interpreter flinkScalaInterpreter; - private Interpreter streamSqlInterpreter; - private Interpreter batchSqlInterpreter; + private FlinkInterpreter flinkInnerInterpreter; + private LazyOpenInterpreter flinkScalaInterpreter; + private LazyOpenInterpreter streamSqlInterpreter; + private LazyOpenInterpreter batchSqlInterpreter; @Override @@ -63,7 +64,8 @@ public class PyFlinkInterpreterTest extends PythonInterpreterTest { IPyFlinkInterpreterTest.angularObjectRegistry = new AngularObjectRegistry("flink", null); InterpreterContext context = getInterpreterContext(); InterpreterContext.set(context); - flinkScalaInterpreter = new LazyOpenInterpreter(new FlinkInterpreter(properties)); + this.flinkInnerInterpreter = new FlinkInterpreter(properties); + flinkScalaInterpreter = new LazyOpenInterpreter(flinkInnerInterpreter); intpGroup.get("session_1").add(flinkScalaInterpreter); flinkScalaInterpreter.setInterpreterGroup(intpGroup); @@ -95,12 +97,16 @@ public class PyFlinkInterpreterTest extends PythonInterpreterTest { @Test public void testBatchPyFlink() throws InterpreterException, IOException { - IPyFlinkInterpreterTest.testBatchPyFlink(interpreter, flinkScalaInterpreter); + if (!flinkInnerInterpreter.getFlinkVersion().isAfterFlink114()){ + IPyFlinkInterpreterTest.testBatchPyFlink(interpreter, flinkScalaInterpreter); + } } @Test public void testStreamIPyFlink() throws InterpreterException, IOException { - IPyFlinkInterpreterTest.testStreamPyFlink(interpreter, flinkScalaInterpreter); + if (!flinkInnerInterpreter.getFlinkVersion().isAfterFlink114()) { + IPyFlinkInterpreterTest.testStreamPyFlink(interpreter, flinkScalaInterpreter); + } } @Test diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java index c41488f..ba25ec9 100644 --- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java +++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java @@ -18,6 +18,7 @@ package org.apache.zeppelin.flink; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.zeppelin.flink.sql.SqlCommandParser; import org.apache.zeppelin.interpreter.InterpreterContext; import org.jline.utils.AttributedString; @@ -45,8 +46,10 @@ public abstract class FlinkShims { private static FlinkShims flinkShims; protected Properties properties; + protected FlinkVersion flinkVersion; - public FlinkShims(Properties properties) { + public FlinkShims(FlinkVersion flinkVersion, Properties properties) { + this.flinkVersion = flinkVersion; this.properties = properties; } @@ -65,12 +68,15 @@ public abstract class FlinkShims { } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 13) { LOGGER.info("Initializing shims for Flink 1.13"); flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink113Shims"); + } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 14) { + LOGGER.info("Initializing shims for Flink 1.14"); + flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink114Shims"); } else { throw new Exception("Flink version: '" + flinkVersion + "' is not supported yet"); } - Constructor c = flinkShimsClass.getConstructor(Properties.class); - return (FlinkShims) c.newInstance(properties); + Constructor c = flinkShimsClass.getConstructor(FlinkVersion.class, Properties.class); + return (FlinkShims) c.newInstance(flinkVersion, properties); } /** @@ -98,6 +104,10 @@ public abstract class FlinkShims { .toAttributedString(); } + public FlinkVersion getFlinkVersion() { + return flinkVersion; + } + public abstract void disableSysoutLogging(Object batchConfig, Object streamConfig); public abstract Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment); @@ -159,4 +169,10 @@ public abstract class FlinkShims { } public abstract String[] rowToString(Object row, Object table, Object tableConfig); + + public abstract boolean isTimeIndicatorType(Object type); + + public abstract ImmutablePair<Object, Object> createPlannerAndExecutor( + ClassLoader classLoader, Object environmentSettings, Object sEnv, + Object tableConfig, Object functionCatalog, Object catalogManager); } diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java index a042162..2e1f47e 100644 --- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java +++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java @@ -82,4 +82,8 @@ public class FlinkVersion { public boolean isFlink110() { return this.majorVersion == 1 && minorVersion == 10; } + + public boolean isAfterFlink114() { + return newerThanOrEqual(FlinkVersion.fromVersionString("1.14.0")); + } } diff --git a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java index 3faa52c..5711884 100644 --- a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java +++ b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java @@ -19,27 +19,38 @@ package org.apache.zeppelin.flink; import org.apache.commons.cli.CommandLine; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.scala.DataSet; import org.apache.flink.client.cli.CliFrontend; import org.apache.flink.client.cli.CustomCommandLine; import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.Configuration; import org.apache.flink.python.PythonOptions; import org.apache.flink.python.util.ResourceUtil; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory; +import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableUtils; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.config.OptimizerConfigOptions; import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl; import org.apache.flink.table.api.scala.BatchTableEnvironment; +import org.apache.flink.table.calcite.FlinkTypeFactory; import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.delegation.Executor; +import org.apache.flink.table.delegation.ExecutorFactory; +import org.apache.flink.table.delegation.Planner; +import org.apache.flink.table.delegation.PlannerFactory; +import org.apache.flink.table.factories.ComponentFactoryService; import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.TableAggregateFunction; @@ -103,8 +114,8 @@ public class Flink110Shims extends FlinkShims { .append(": Make sure that a statement ends with ';' for finalizing (multi-line) statements.") .toAttributedString(); - public Flink110Shims(Properties properties) { - super(properties); + public Flink110Shims(FlinkVersion flinkVersion, Properties properties) { + super(flinkVersion, properties); } @Override @@ -337,4 +348,42 @@ public class Flink110Shims extends FlinkShims { } return fields; } + + public boolean isTimeIndicatorType(Object type) { + return FlinkTypeFactory.isTimeIndicatorType((TypeInformation<?>) type); + } + + private Object lookupExecutor(ClassLoader classLoader, + Object settings, + Object sEnv) { + try { + Map<String, String> executorProperties = ((EnvironmentSettings) settings).toExecutorProperties(); + ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties); + Method createMethod = executorFactory.getClass() + .getMethod("create", Map.class, StreamExecutionEnvironment.class); + + return (Executor) createMethod.invoke( + executorFactory, + executorProperties, + (StreamExecutionEnvironment) sEnv); + } catch (Exception e) { + throw new TableException( + "Could not instantiate the executor. Make sure a planner module is on the classpath", + e); + } + } + + @Override + public ImmutablePair<Object, Object> createPlannerAndExecutor( + ClassLoader classLoader, Object environmentSettings, Object sEnv, + Object tableConfig, Object functionCatalog, Object catalogManager) { + EnvironmentSettings settings = (EnvironmentSettings) environmentSettings; + Executor executor = (Executor) lookupExecutor(classLoader, settings, sEnv); + Map<String, String> plannerProperties = settings.toPlannerProperties(); + Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) + .create(plannerProperties, executor, (TableConfig) tableConfig, + (FunctionCatalog) functionCatalog, + (CatalogManager) catalogManager); + return ImmutablePair.of(planner, executor); + } } diff --git a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java index c8db525..64979fd 100644 --- a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java +++ b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java @@ -20,8 +20,10 @@ package org.apache.zeppelin.flink; import org.apache.commons.cli.CommandLine; import org.apache.commons.compress.utils.Lists; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.scala.DataSet; @@ -36,7 +38,9 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFact import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment; @@ -46,8 +50,14 @@ import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentInternal; import org.apache.flink.table.api.internal.CatalogTableSchemaResolver; import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.delegation.Executor; +import org.apache.flink.table.delegation.ExecutorFactory; import org.apache.flink.table.delegation.Parser; +import org.apache.flink.table.delegation.Planner; +import org.apache.flink.table.delegation.PlannerFactory; +import org.apache.flink.table.factories.ComponentFactoryService; import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.TableAggregateFunction; @@ -78,6 +88,7 @@ import org.apache.flink.table.operations.ddl.DropDatabaseOperation; import org.apache.flink.table.operations.ddl.DropTableOperation; import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation; import org.apache.flink.table.operations.ddl.DropViewOperation; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.utils.PrintUtils; import org.apache.flink.types.Row; @@ -98,6 +109,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.net.InetAddress; import java.util.Arrays; import java.util.HashMap; @@ -141,10 +153,9 @@ public class Flink111Shims extends FlinkShims { private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>(); - public Flink111Shims(Properties properties) { - super(properties); + public Flink111Shims(FlinkVersion flinkVersion, Properties properties) { + super(flinkVersion, properties); } - @Override public void disableSysoutLogging(Object batchConfig, Object streamConfig) { ((ExecutionConfig) batchConfig).disableSysoutLogging(); @@ -473,4 +484,43 @@ public class Flink111Shims extends FlinkShims { public String[] rowToString(Object row, Object table, Object tableConfig) { return PrintUtils.rowToString((Row) row); } + + public boolean isTimeIndicatorType(Object type) { + return FlinkTypeFactory.isTimeIndicatorType((TypeInformation<?>) type); + } + + private Object lookupExecutor(ClassLoader classLoader, + Object settings, + Object sEnv) { + try { + Map<String, String> executorProperties = ((EnvironmentSettings) settings).toExecutorProperties(); + ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties); + Method createMethod = executorFactory.getClass() + .getMethod("create", Map.class, StreamExecutionEnvironment.class); + + return (Executor) createMethod.invoke( + executorFactory, + executorProperties, + (StreamExecutionEnvironment) sEnv); + } catch (Exception e) { + throw new TableException( + "Could not instantiate the executor. Make sure a planner module is on the classpath", + e); + } + } + + @Override + public ImmutablePair<Object, Object> createPlannerAndExecutor( + ClassLoader classLoader, Object environmentSettings, Object sEnv, + Object tableConfig, Object functionCatalog, Object catalogManager) { + EnvironmentSettings settings = (EnvironmentSettings) environmentSettings; + Executor executor = (Executor) lookupExecutor(classLoader, settings, sEnv); + Map<String, String> plannerProperties = settings.toPlannerProperties(); + Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) + .create(plannerProperties, executor, (TableConfig) tableConfig, + (FunctionCatalog) functionCatalog, + (CatalogManager) catalogManager); + return ImmutablePair.of(planner, executor); + + } } diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java index 648826a..a713d1c 100644 --- a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java +++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java @@ -21,7 +21,9 @@ package org.apache.zeppelin.flink; import org.apache.commons.cli.CommandLine; import org.apache.commons.compress.utils.Lists; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.scala.DataSet; @@ -37,7 +39,9 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFact import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment; @@ -47,8 +51,14 @@ import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentInternal; import org.apache.flink.table.api.internal.CatalogTableSchemaResolver; import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.delegation.Executor; +import org.apache.flink.table.delegation.ExecutorFactory; import org.apache.flink.table.delegation.Parser; +import org.apache.flink.table.delegation.Planner; +import org.apache.flink.table.delegation.PlannerFactory; +import org.apache.flink.table.factories.ComponentFactoryService; import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.TableAggregateFunction; @@ -79,6 +89,7 @@ import org.apache.flink.table.operations.ddl.DropDatabaseOperation; import org.apache.flink.table.operations.ddl.DropTableOperation; import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation; import org.apache.flink.table.operations.ddl.DropViewOperation; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.utils.PrintUtils; import org.apache.flink.types.Row; @@ -99,6 +110,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.net.InetAddress; import java.util.Arrays; import java.util.HashMap; @@ -142,10 +154,9 @@ public class Flink112Shims extends FlinkShims { private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>(); - public Flink112Shims(Properties properties) { - super(properties); + public Flink112Shims(FlinkVersion flinkVersion, Properties properties) { + super(flinkVersion, properties); } - @Override public void disableSysoutLogging(Object batchConfig, Object streamConfig) { // do nothing @@ -486,4 +497,42 @@ public class Flink112Shims extends FlinkShims { public String[] rowToString(Object row, Object table, Object tableConfig) { return PrintUtils.rowToString((Row) row); } + + public boolean isTimeIndicatorType(Object type) { + return FlinkTypeFactory.isTimeIndicatorType((TypeInformation<?>) type); + } + + private Object lookupExecutor(ClassLoader classLoader, + Object settings, + Object sEnv) { + try { + Map<String, String> executorProperties = ((EnvironmentSettings) settings).toExecutorProperties(); + ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties); + Method createMethod = executorFactory.getClass() + .getMethod("create", Map.class, StreamExecutionEnvironment.class); + + return (Executor) createMethod.invoke( + executorFactory, + executorProperties, + (StreamExecutionEnvironment) sEnv); + } catch (Exception e) { + throw new TableException( + "Could not instantiate the executor. Make sure a planner module is on the classpath", + e); + } + } + + @Override + public ImmutablePair<Object, Object> createPlannerAndExecutor( + ClassLoader classLoader, Object environmentSettings, Object sEnv, + Object tableConfig, Object functionCatalog, Object catalogManager) { + EnvironmentSettings settings = (EnvironmentSettings) environmentSettings; + Executor executor = (Executor) lookupExecutor(classLoader, settings, sEnv); + Map<String, String> plannerProperties = settings.toPlannerProperties(); + Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) + .create(plannerProperties, executor, (TableConfig) tableConfig, + (FunctionCatalog) functionCatalog, + (CatalogManager) catalogManager); + return ImmutablePair.of(planner, executor); + } } diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java index 054f07f..481bfee 100644 --- a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java +++ b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java @@ -21,8 +21,10 @@ package org.apache.zeppelin.flink; import org.apache.commons.cli.CommandLine; import org.apache.commons.compress.utils.Lists; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.scala.DataSet; @@ -36,11 +38,13 @@ import org.apache.flink.core.execution.JobClient; import org.apache.flink.python.PythonOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory; +import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.PlannerType; import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; @@ -50,9 +54,15 @@ import org.apache.flink.table.api.config.OptimizerConfigOptions; import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentInternal; import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.catalog.GenericInMemoryCatalog; import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.delegation.Executor; +import org.apache.flink.table.delegation.ExecutorFactory; import org.apache.flink.table.delegation.Parser; +import org.apache.flink.table.delegation.Planner; +import org.apache.flink.table.delegation.PlannerFactory; +import org.apache.flink.table.factories.ComponentFactoryService; import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.TableAggregateFunction; @@ -83,6 +93,7 @@ import org.apache.flink.table.operations.ddl.DropDatabaseOperation; import org.apache.flink.table.operations.ddl.DropTableOperation; import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation; import org.apache.flink.table.operations.ddl.DropViewOperation; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.utils.PrintUtils; import org.apache.flink.types.Row; @@ -103,6 +114,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.net.InetAddress; import java.time.ZoneId; import java.util.Arrays; @@ -147,8 +159,8 @@ public class Flink113Shims extends FlinkShims { private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>(); - public Flink113Shims(Properties properties) { - super(properties); + public Flink113Shims(FlinkVersion flinkVersion, Properties properties) { + super(flinkVersion, properties); } @Override @@ -508,4 +520,42 @@ public class Flink113Shims extends FlinkShims { ResolvedSchema resolvedSchema = ((Table) table).getResolvedSchema(); return PrintUtils.rowToString((Row) row, resolvedSchema, zoneId); } + + public boolean isTimeIndicatorType(Object type) { + return FlinkTypeFactory.isTimeIndicatorType((TypeInformation<?>) type); + } + + private Object lookupExecutor(ClassLoader classLoader, + Object settings, + Object sEnv) { + try { + Map<String, String> executorProperties = ((EnvironmentSettings) settings).toExecutorProperties(); + ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties); + Method createMethod = executorFactory.getClass() + .getMethod("create", Map.class, StreamExecutionEnvironment.class); + + return createMethod.invoke( + executorFactory, + executorProperties, + sEnv); + } catch (Exception e) { + throw new TableException( + "Could not instantiate the executor. Make sure a planner module is on the classpath", + e); + } + } + + @Override + public ImmutablePair<Object, Object> createPlannerAndExecutor( + ClassLoader classLoader, Object environmentSettings, Object sEnv, + Object tableConfig, Object functionCatalog, Object catalogManager) { + EnvironmentSettings settings = (EnvironmentSettings) environmentSettings; + Executor executor = (Executor) lookupExecutor(classLoader, settings, sEnv); + Map<String, String> plannerProperties = settings.toPlannerProperties(); + Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) + .create(plannerProperties, executor, (TableConfig) tableConfig, + (FunctionCatalog) functionCatalog, + (CatalogManager) catalogManager); + return ImmutablePair.of(planner, executor); + } } diff --git a/flink/flink1.14-shims/pom.xml b/flink/flink1.14-shims/pom.xml new file mode 100644 index 0000000..c160548 --- /dev/null +++ b/flink/flink1.14-shims/pom.xml @@ -0,0 +1,199 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-parent</artifactId> + <groupId>org.apache.zeppelin</groupId> + <version>0.11.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.zeppelin</groupId> + <artifactId>flink1.14-shims</artifactId> + <version>0.11.0-SNAPSHOT</version> + <packaging>jar</packaging> + <name>Zeppelin: Flink1.14 Shims</name> + + <properties> + <flink.version>${flink1.14.version}</flink.version> + </properties> + + <dependencies> + + <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>flink-shims</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala-bridge_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-scala_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-scala_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-python_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <executions> + <execution> + <id>eclipse-add-source</id> + <goals> + <goal>add-source</goal> + </goals> + </execution> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + <execution> + <id>scala-test-compile-first</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + <configuration> + <scalaVersion>${flink.scala.version}</scalaVersion> + <args> + <arg>-unchecked</arg> + <arg>-deprecation</arg> + <arg>-feature</arg> + <arg>-target:jvm-1.8</arg> + </args> + <jvmArgs> + <jvmArg>-Xms1024m</jvmArg> + <jvmArg>-Xmx1024m</jvmArg> + <jvmArg>-XX:MaxMetaspaceSize=${MaxMetaspace}</jvmArg> + </jvmArgs> + <javacArgs> + <javacArg>-source</javacArg> + <javacArg>${java.version}</javacArg> + <javacArg>-target</javacArg> + <javacArg>${java.version}</javacArg> + <javacArg>-Xlint:all,-serial,-path,-options</javacArg> + </javacArgs> + </configuration> + </plugin> + + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <executions> + <execution> + <id>copy-interpreter-setting</id> + <phase>none</phase> + <configuration> + <skip>true</skip> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> \ No newline at end of file diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java similarity index 88% copy from flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java copy to flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java index 054f07f..56fdc7e 100644 --- a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java +++ b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java @@ -21,11 +21,12 @@ package org.apache.zeppelin.flink; import org.apache.commons.cli.CommandLine; import org.apache.commons.compress.utils.Lists; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.scala.DataSet; import org.apache.flink.client.cli.CliFrontend; import org.apache.flink.client.cli.CustomCommandLine; import org.apache.flink.configuration.ConfigOption; @@ -36,23 +37,29 @@ import org.apache.flink.core.execution.JobClient; import org.apache.flink.python.PythonOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory; +import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.PlannerType; import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableResult; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; -import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.config.OptimizerConfigOptions; import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentInternal; import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.catalog.GenericInMemoryCatalog; import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.delegation.Executor; +import org.apache.flink.table.delegation.ExecutorFactory; import org.apache.flink.table.delegation.Parser; +import org.apache.flink.table.delegation.Planner; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.PlannerFactoryUtil; import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.TableAggregateFunction; @@ -83,13 +90,13 @@ import org.apache.flink.table.operations.ddl.DropDatabaseOperation; import org.apache.flink.table.operations.ddl.DropTableOperation; import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation; import org.apache.flink.table.operations.ddl.DropViewOperation; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.utils.PrintUtils; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.FlinkException; -import org.apache.zeppelin.flink.shims113.CollectStreamTableSink; -import org.apache.zeppelin.flink.shims113.Flink113ScalaShims; +import org.apache.zeppelin.flink.shims114.CollectStreamTableSink; import org.apache.zeppelin.flink.sql.SqlCommandParser; import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand; import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommandCall; @@ -103,6 +110,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.net.InetAddress; import java.time.ZoneId; import java.util.Arrays; @@ -116,11 +124,11 @@ import java.util.regex.Matcher; /** - * Shims for flink 1.13 + * Shims for flink 1.14 */ -public class Flink113Shims extends FlinkShims { +public class Flink114Shims extends FlinkShims { - private static final Logger LOGGER = LoggerFactory.getLogger(Flink113Shims.class); + private static final Logger LOGGER = LoggerFactory.getLogger(Flink114Shims.class); public static final AttributedString MESSAGE_HELP = new AttributedStringBuilder() .append("The following commands are available:\n\n") .append(formatCommand(SqlCommand.CREATE_TABLE, "Create table under current catalog and database.")) @@ -147,8 +155,8 @@ public class Flink113Shims extends FlinkShims { private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>(); - public Flink113Shims(Properties properties) { - super(properties); + public Flink114Shims(FlinkVersion flinkVersion, Properties properties) { + super(flinkVersion, properties); } @Override @@ -251,12 +259,14 @@ public class Flink113Shims extends FlinkShims { @Override public Object fromDataSet(Object btenv, Object ds) { - return Flink113ScalaShims.fromDataSet((BatchTableEnvironment) btenv, (DataSet) ds); + return null; + //return Flink114ScalaShims.fromDataSet((BatchTableEnvironment) btenv, (DataSet) ds); } @Override public Object toDataSet(Object btenv, Object table) { - return Flink113ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) table); + return null; + //return Flink114ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) table); } @Override @@ -508,4 +518,42 @@ public class Flink113Shims extends FlinkShims { ResolvedSchema resolvedSchema = ((Table) table).getResolvedSchema(); return PrintUtils.rowToString((Row) row, resolvedSchema, zoneId); } + + @Override + public boolean isTimeIndicatorType(Object type) { + return FlinkTypeFactory.isTimeIndicatorType((TypeInformation<?>) type); + } + + private Object lookupExecutor(ClassLoader classLoader, + Object settings, + Object sEnv) { + try { + final ExecutorFactory executorFactory = + FactoryUtil.discoverFactory( + classLoader, ExecutorFactory.class, ((EnvironmentSettings) settings).getExecutor()); + final Method createMethod = + executorFactory + .getClass() + .getMethod("create", StreamExecutionEnvironment.class); + + return createMethod.invoke(executorFactory, sEnv); + } catch (Exception e) { + throw new TableException( + "Could not instantiate the executor. Make sure a planner module is on the classpath", + e); + } + } + + @Override + public ImmutablePair<Object, Object> createPlannerAndExecutor( + ClassLoader classLoader, Object environmentSettings, Object sEnv, + Object tableConfig, Object functionCatalog, Object catalogManager) { + EnvironmentSettings settings = (EnvironmentSettings) environmentSettings; + Executor executor = (Executor) lookupExecutor(classLoader, environmentSettings, sEnv); + Planner planner = PlannerFactoryUtil.createPlanner(settings.getPlanner(), executor, + (TableConfig) tableConfig, + (CatalogManager) catalogManager, + (FunctionCatalog) functionCatalog); + return ImmutablePair.of(planner, executor); + } } diff --git a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/shims114/CollectStreamTableSink.java b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/shims114/CollectStreamTableSink.java new file mode 100644 index 0000000..7a224e1 --- /dev/null +++ b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/shims114/CollectStreamTableSink.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink.shims114; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.experimental.CollectSink; +import org.apache.flink.table.sinks.RetractStreamTableSink; +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.util.UUID; + +/** + * Table sink for collecting the results locally using sockets. + */ +public class CollectStreamTableSink implements RetractStreamTableSink<Row> { + + private static final Logger LOGGER = LoggerFactory.getLogger(CollectStreamTableSink.class); + + private final InetAddress targetAddress; + private final int targetPort; + private final TypeSerializer<Tuple2<Boolean, Row>> serializer; + + private String[] fieldNames; + private TypeInformation<?>[] fieldTypes; + + public CollectStreamTableSink(InetAddress targetAddress, + int targetPort, + TypeSerializer<Tuple2<Boolean, Row>> serializer) { + LOGGER.info("Use address: " + targetAddress.getHostAddress() + ":" + targetPort); + this.targetAddress = targetAddress; + this.targetPort = targetPort; + this.serializer = serializer; + } + + @Override + public String[] getFieldNames() { + return fieldNames; + } + + @Override + public TypeInformation<?>[] getFieldTypes() { + return fieldTypes; + } + + @Override + public CollectStreamTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { + final CollectStreamTableSink copy = + new CollectStreamTableSink(targetAddress, targetPort, serializer); + copy.fieldNames = fieldNames; + copy.fieldTypes = fieldTypes; + return copy; + } + + @Override + public TypeInformation<Row> getRecordType() { + return Types.ROW_NAMED(fieldNames, fieldTypes); + } + + @Override + public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> stream) { + // add sink + return stream + .addSink(new CollectSink<>(targetAddress, targetPort, serializer)) + .name("Zeppelin Flink Sql Stream Collect Sink " + UUID.randomUUID()) + .setParallelism(1); + } + + @Override + public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() { + return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType()); + } +} diff --git a/flink/pom.xml b/flink/pom.xml index 21868aa..b4df154 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -41,6 +41,7 @@ <module>flink1.11-shims</module> <module>flink1.12-shims</module> <module>flink1.13-shims</module> + <module>flink1.14-shims</module> </modules> <properties> @@ -48,6 +49,7 @@ <flink1.11.version>1.11.3</flink1.11.version> <flink1.12.version>1.12.4</flink1.12.version> <flink1.13.version>1.13.2</flink1.13.version> + <flink1.14.version>1.14.0</flink1.14.version> <flink.scala.version>2.11.12</flink.scala.version> <flink.scala.binary.version>2.11</flink.scala.binary.version> diff --git a/testing/env_python_3_with_flink_114.yml b/testing/env_python_3_with_flink_114.yml new file mode 100644 index 0000000..37c6fa8 --- /dev/null +++ b/testing/env_python_3_with_flink_114.yml @@ -0,0 +1,27 @@ +name: python_3_with_flink +channels: + - conda-forge + - defaults +dependencies: + - pycodestyle + - scipy + - numpy=1.19.5 + - grpcio + - protobuf + - pandasql + - ipython + - ipykernel + - jupyter_client=5 + - hvplot + - plotnine + - seaborn + - intake + - intake-parquet + - intake-xarray + - altair + - vega_datasets + - plotly + - pip + - pip: + - apache-flink==1.14.0 + diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest114.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest114.java new file mode 100644 index 0000000..68f6810 --- /dev/null +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest114.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.integration; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.List; + +@RunWith(value = Parameterized.class) +public class FlinkIntegrationTest114 extends FlinkIntegrationTest { + + @Parameterized.Parameters + public static List<Object[]> data() { + return Arrays.asList(new Object[][]{ + {"1.14.0", "2.11"}, + {"1.14.0", "2.12"} + }); + } + + public FlinkIntegrationTest114(String flinkVersion, String scalaVersion) { + super(flinkVersion, scalaVersion); + } +}