This is an automated email from the ASF dual-hosted git repository. jongyoul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 11052f6fb4 [ZEPPELIN-5846] Remove support flink 1.12 (#4558) 11052f6fb4 is described below commit 11052f6fb435c812788a5f30211b9d71842f9182 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Mon Feb 6 14:36:14 2023 +0800 [ZEPPELIN-5846] Remove support flink 1.12 (#4558) * save * Fix flink test * [ZEPPELIN-5846] Remove support of flink 1.12 --- .github/workflows/core.yml | 2 +- docs/interpreter/flink.md | 1 - flink/README.md | 3 +- flink/flink-scala-parent/pom.xml | 53 +- .../java/org/apache/zeppelin/flink/FlinkShims.java | 5 +- flink/flink1.12-shims/pom.xml | 211 -------- .../org/apache/zeppelin/flink/Flink112Shims.java | 360 ------------- .../zeppelin/flink/Flink112SqlInterpreter.java | 584 --------------------- .../flink/shims112/CollectStreamTableSink.java | 97 ---- .../zeppelin/flink/shims112/SqlCommandParser.java | 355 ------------- .../flink/shims112/Flink112ScalaShims.scala | 36 -- flink/pom.xml | 12 - .../integration/FlinkIntegrationTest112.java | 40 -- .../integration/ZSessionIntegrationTest.java | 2 +- .../integration/ZeppelinFlinkClusterTest112.java | 40 -- 15 files changed, 6 insertions(+), 1795 deletions(-) diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index 76380103da..d4cb767c90 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -233,7 +233,7 @@ jobs: strategy: fail-fast: false matrix: - flink: [112, 113, 114, 115, 116] + flink: [113, 114, 115, 116] steps: - name: Checkout uses: actions/checkout@v3 diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md index 0241ff9baf..0d70ef6491 100644 --- a/docs/interpreter/flink.md +++ b/docs/interpreter/flink.md @@ -151,7 +151,6 @@ Flink 1.15 is scala free and has changed its binary distribution. If you would l * Move FLINK_HOME/lib/flink-table-planner-loader-1.15.0.jar to FLINK_HOME/opt * Download flink-table-api-scala-bridge_2.12-1.15.0.jar and flink-table-api-scala_2.12-1.15.0.jar to FLINK_HOME/lib - ## Flink on Zeppelin Architecture <img src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/flink_architecture.png"> diff --git a/flink/README.md b/flink/README.md index 53618ad1b3..bb8aa74a06 100644 --- a/flink/README.md +++ b/flink/README.md @@ -8,9 +8,10 @@ This is the doc for Zeppelin developers who want to work on flink interpreter. Flink interpreter is more complex than other interpreter (such as jdbc, shell). Currently it has following 8 modules * flink-shims -* flink1.12-shims * flink1.13-shims * flink1.14-shims +* flink1.15-shims +* flink1.16-shims * flink-scala-parent * flink-scala-2.11 * flink-scala-2.12 diff --git a/flink/flink-scala-parent/pom.xml b/flink/flink-scala-parent/pom.xml index 8bbeebd26f..84223db389 100644 --- a/flink/flink-scala-parent/pom.xml +++ b/flink/flink-scala-parent/pom.xml @@ -35,7 +35,7 @@ <properties> <!--library versions--> <interpreter.name>flink</interpreter.name> - <flink.version>${flink1.12.version}</flink.version> + <flink.version>${flink1.13.version}</flink.version> <flink.hadoop.version>${hadoop2.7.version}</flink.hadoop.version> <hive.version>2.3.4</hive.version> <hiverunner.version>4.0.0</hiverunner.version> @@ -55,12 +55,6 @@ <version>${project.version}</version> </dependency> - <dependency> - <groupId>org.apache.zeppelin</groupId> - <artifactId>flink1.12-shims</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> <groupId>org.apache.zeppelin</groupId> <artifactId>flink1.13-shims</artifactId> @@ -894,51 +888,6 @@ <profiles> - <profile> - <id>flink-112</id> - <properties> - <flink.version>${flink1.12.version}</flink.version> - </properties> - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>org.reflections</groupId> - <artifactId>reflections</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-python_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - </dependencies> - </profile> - <profile> <id>flink-113</id> <properties> diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java index a7b6e9871a..916c3313dc 100644 --- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java +++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java @@ -54,10 +54,7 @@ public abstract class FlinkShims { Properties properties) throws Exception { Class<?> flinkShimsClass; - if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 12) { - LOGGER.info("Initializing shims for Flink 1.12"); - flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink112Shims"); - } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 13) { + if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 13) { LOGGER.info("Initializing shims for Flink 1.13"); flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink113Shims"); } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 14) { diff --git a/flink/flink1.12-shims/pom.xml b/flink/flink1.12-shims/pom.xml deleted file mode 100644 index e10f005c7d..0000000000 --- a/flink/flink1.12-shims/pom.xml +++ /dev/null @@ -1,211 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <artifactId>flink-parent</artifactId> - <groupId>org.apache.zeppelin</groupId> - <version>0.11.0-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - - <modelVersion>4.0.0</modelVersion> - <artifactId>flink1.12-shims</artifactId> - <packaging>jar</packaging> - <name>Zeppelin: Flink1.12 Shims</name> - - <properties> - <flink.version>${flink1.12.version}</flink.version> - </properties> - - <dependencies> - - <dependency> - <groupId>org.apache.zeppelin</groupId> - <artifactId>flink-shims</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-clients_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-api-scala_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-api-scala-bridge_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-api-java-bridge_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-scala_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-scala_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>org.reflections</groupId> - <artifactId>reflections</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-python_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - </dependencies> - - <build> - <plugins> - - <plugin> - <groupId>net.alchim31.maven</groupId> - <artifactId>scala-maven-plugin</artifactId> - <executions> - <execution> - <id>eclipse-add-source</id> - <goals> - <goal>add-source</goal> - </goals> - </execution> - <execution> - <id>scala-compile-first</id> - <phase>process-resources</phase> - <goals> - <goal>compile</goal> - </goals> - </execution> - <execution> - <id>scala-test-compile-first</id> - <phase>process-test-resources</phase> - <goals> - <goal>testCompile</goal> - </goals> - </execution> - </executions> - <configuration> - <scalaVersion>${flink.scala.version}</scalaVersion> - <args> - <arg>-unchecked</arg> - <arg>-deprecation</arg> - <arg>-feature</arg> - <arg>-target:jvm-1.8</arg> - </args> - <jvmArgs> - <jvmArg>-Xms1024m</jvmArg> - <jvmArg>-Xmx1024m</jvmArg> - <jvmArg>-XX:MaxMetaspaceSize=${MaxMetaspace}</jvmArg> - </jvmArgs> - <javacArgs> - <javacArg>-source</javacArg> - <javacArg>${java.version}</javacArg> - <javacArg>-target</javacArg> - <javacArg>${java.version}</javacArg> - <javacArg>-Xlint:all,-serial,-path,-options</javacArg> - </javacArgs> - </configuration> - </plugin> - - <plugin> - <artifactId>maven-resources-plugin</artifactId> - <executions> - <execution> - <id>copy-interpreter-setting</id> - <phase>none</phase> - <configuration> - <skip>true</skip> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - -</project> \ No newline at end of file diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java deleted file mode 100644 index 187eea0b4d..0000000000 --- a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java +++ /dev/null @@ -1,360 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.flink; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.compress.utils.Lists; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.scala.DataSet; -import org.apache.flink.client.cli.CliFrontend; -import org.apache.flink.client.cli.CustomCommandLine; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableException; -import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; -import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment; -import org.apache.flink.table.api.internal.CatalogTableSchemaResolver; -import org.apache.flink.table.catalog.CatalogManager; -import org.apache.flink.table.catalog.FunctionCatalog; -import org.apache.flink.table.catalog.GenericInMemoryCatalog; -import org.apache.flink.table.delegation.*; -import org.apache.flink.table.factories.ComponentFactoryService; -import org.apache.flink.table.functions.AggregateFunction; -import org.apache.flink.table.functions.ScalarFunction; -import org.apache.flink.table.functions.TableAggregateFunction; -import org.apache.flink.table.functions.TableFunction; -import org.apache.flink.table.module.ModuleManager; -import org.apache.flink.table.planner.calcite.FlinkTypeFactory; -import org.apache.flink.table.sinks.TableSink; -import org.apache.flink.table.utils.PrintUtils; -import org.apache.flink.types.Row; -import org.apache.flink.types.RowKind; -import org.apache.flink.util.FlinkException; -import org.apache.zeppelin.flink.shims112.CollectStreamTableSink; -import org.apache.zeppelin.flink.shims112.Flink112ScalaShims; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.lang.reflect.Method; -import java.net.InetAddress; -import java.net.URL; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Properties; - - -/** - * Shims for flink 1.12 - */ -public class Flink112Shims extends FlinkShims { - - private static final Logger LOGGER = LoggerFactory.getLogger(Flink112Shims.class); - - private Flink112SqlInterpreter batchSqlInterpreter; - private Flink112SqlInterpreter streamSqlInterpreter; - - - public Flink112Shims(FlinkVersion flinkVersion, Properties properties) { - super(flinkVersion, properties); - } - - public void initInnerBatchSqlInterpreter(FlinkSqlContext flinkSqlContext) { - this.batchSqlInterpreter = new Flink112SqlInterpreter(flinkSqlContext, true); - } - - public void initInnerStreamSqlInterpreter(FlinkSqlContext flinkSqlContext) { - this.streamSqlInterpreter = new Flink112SqlInterpreter(flinkSqlContext, false); - } - - @Override - public Object createResourceManager(List<URL> jars, Object tableConfig) { - return null; - } - - @Override - public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager, List<URL> jars) { - return new FunctionCatalog((TableConfig) tableConfig, (CatalogManager) catalogManager, (ModuleManager) moduleManager); - } - - @Override - public void disableSysoutLogging(Object batchConfig, Object streamConfig) { - // do nothing - } - - @Override - public Object createScalaBlinkStreamTableEnvironment(Object environmentSettingsObj, - Object senvObj, - Object tableConfigObj, - Object moduleManagerObj, - Object functionCatalogObj, - Object catalogManagerObj, - List<URL> jars, - ClassLoader classLoader) { - EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj; - StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj; - TableConfig tableConfig = (TableConfig) tableConfigObj; - ModuleManager moduleManager = (ModuleManager) moduleManagerObj; - FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj; - CatalogManager catalogManager = (CatalogManager) catalogManagerObj; - ImmutablePair<Object, Object> pair = createPlannerAndExecutor( - classLoader, environmentSettings, senv, - tableConfig, moduleManager, functionCatalog, catalogManager); - Planner planner = (Planner) pair.left; - Executor executor = (Executor) pair.right; - - return new org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl(catalogManager, - moduleManager, - functionCatalog, tableConfig, new org.apache.flink.streaming.api.scala.StreamExecutionEnvironment(senv), - planner, executor, environmentSettings.isStreamingMode(), classLoader); - } - - @Override - public Object createJavaBlinkStreamTableEnvironment(Object environmentSettingsObj, - Object senvObj, - Object tableConfigObj, - Object moduleManagerObj, - Object functionCatalogObj, - Object catalogManagerObj, - List<URL> jars, - ClassLoader classLoader) { - EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj; - StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj; - TableConfig tableConfig = (TableConfig) tableConfigObj; - ModuleManager moduleManager = (ModuleManager) moduleManagerObj; - FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj; - CatalogManager catalogManager = (CatalogManager) catalogManagerObj; - ImmutablePair<Object, Object> pair = createPlannerAndExecutor( - classLoader, environmentSettings, senv, - tableConfig, moduleManager, functionCatalog, catalogManager); - Planner planner = (Planner) pair.left; - Executor executor = (Executor) pair.right; - - return new StreamTableEnvironmentImpl(catalogManager, moduleManager, - functionCatalog, tableConfig, senv, planner, executor, environmentSettings.isStreamingMode(), classLoader); - } - @Override - public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) { - return new StreamExecutionEnvironmentFactory() { - @Override - public StreamExecutionEnvironment createExecutionEnvironment(Configuration configuration) { - return (StreamExecutionEnvironment) streamExecutionEnvironment; - } - }; - } - - @Override - public Object createCatalogManager(Object config) { - return CatalogManager.newBuilder() - .classLoader(Thread.currentThread().getContextClassLoader()) - .config((ReadableConfig) config) - .defaultCatalog("default_catalog", - new GenericInMemoryCatalog("default_catalog", "default_database")) - .build(); - } - - @Override - public String getPyFlinkPythonPath(Properties properties) throws IOException { - String mode = properties.getProperty("flink.execution.mode"); - if ("yarn-application".equalsIgnoreCase(mode)) { - // for yarn application mode, FLINK_HOME is container working directory - String flinkHome = new File(".").getAbsolutePath(); - return getPyFlinkPythonPath(new File(flinkHome + "/lib/python")); - } - - String flinkHome = System.getenv("FLINK_HOME"); - if (StringUtils.isNotBlank(flinkHome)) { - return getPyFlinkPythonPath(new File(flinkHome + "/opt/python")); - } else { - throw new IOException("No FLINK_HOME is specified"); - } - } - - private String getPyFlinkPythonPath(File pyFlinkFolder) throws IOException { - LOGGER.info("Getting pyflink lib from {}", pyFlinkFolder); - if (!pyFlinkFolder.exists() || !pyFlinkFolder.isDirectory()) { - throw new IOException(String.format("PyFlink folder %s does not exist or is not a folder", - pyFlinkFolder.getAbsolutePath())); - } - List<File> depFiles = Arrays.asList(pyFlinkFolder.listFiles()); - StringBuilder builder = new StringBuilder(); - for (File file : depFiles) { - LOGGER.info("Adding extracted file {} to PYTHONPATH", file.getAbsolutePath()); - builder.append(file.getAbsolutePath() + ":"); - } - return builder.toString(); - } - - @Override - public Object getCollectStreamTableSink(InetAddress targetAddress, int targetPort, Object serializer) { - return new CollectStreamTableSink(targetAddress, targetPort, (TypeSerializer<Tuple2<Boolean, Row>>) serializer); - } - - @Override - public List collectToList(Object table) throws Exception { - return Lists.newArrayList(((Table) table).execute().collect()); - } - - @Override - public boolean rowEquals(Object row1, Object row2) { - Row r1 = (Row) row1; - Row r2 = (Row) row2; - r1.setKind(RowKind.INSERT); - r2.setKind(RowKind.INSERT); - return r1.equals(r2); - } - - @Override - public Object fromDataSet(Object btenv, Object ds) { - return Flink112ScalaShims.fromDataSet((BatchTableEnvironment) btenv, (DataSet) ds); - } - - @Override - public Object toDataSet(Object btenv, Object table) { - return Flink112ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) table); - } - - @Override - public void registerTableSink(Object stenv, String tableName, Object collectTableSink) { - ((org.apache.flink.table.api.internal.TableEnvironmentInternal) stenv) - .registerTableSinkInternal(tableName, (TableSink) collectTableSink); - } - - @Override - public void registerScalarFunction(Object btenv, String name, Object scalarFunction) { - ((StreamTableEnvironmentImpl)(btenv)).createTemporarySystemFunction(name, (ScalarFunction) scalarFunction); - } - - @Override - public void registerTableFunction(Object btenv, String name, Object tableFunction) { - ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, (TableFunction) tableFunction); - } - - @Override - public void registerAggregateFunction(Object btenv, String name, Object aggregateFunction) { - ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, (AggregateFunction) aggregateFunction); - } - - @Override - public void registerTableAggregateFunction(Object btenv, String name, Object tableAggregateFunction) { - ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, (TableAggregateFunction) tableAggregateFunction); - } - - /** - * Flink 1.11 bind CatalogManager with parser which make blink and flink could not share the same CatalogManager. - * This is a workaround which always reset CatalogTableSchemaResolver before running any flink code. - * @param catalogManager - * @param parserObject - * @param environmentSetting - */ - @Override - public void setCatalogManagerSchemaResolver(Object catalogManager, - Object parserObject, - Object environmentSetting) { - ((CatalogManager) catalogManager).setCatalogTableSchemaResolver( - new CatalogTableSchemaResolver((Parser)parserObject, - ((EnvironmentSettings)environmentSetting).isStreamingMode())); - } - - @Override - public Object updateEffectiveConfig(Object cliFrontend, Object commandLine, Object effectiveConfig) { - CustomCommandLine customCommandLine = ((CliFrontend)cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine); - try { - ((Configuration) effectiveConfig).addAll(customCommandLine.toConfiguration((CommandLine) commandLine)); - return effectiveConfig; - } catch (FlinkException e) { - throw new RuntimeException("Fail to call addAll", e); - } - } - - @Override - public String[] rowToString(Object row, Object table, Object tableConfig) { - return PrintUtils.rowToString((Row) row); - } - - public boolean isTimeIndicatorType(Object type) { - return FlinkTypeFactory.isTimeIndicatorType((TypeInformation<?>) type); - } - - private Object lookupExecutor(ClassLoader classLoader, - Object settings, - Object sEnv) { - try { - Map<String, String> executorProperties = ((EnvironmentSettings) settings).toExecutorProperties(); - ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties); - Method createMethod = executorFactory.getClass() - .getMethod("create", Map.class, StreamExecutionEnvironment.class); - - return createMethod.invoke( - executorFactory, - executorProperties, - sEnv); - } catch (Exception e) { - throw new TableException( - "Could not instantiate the executor. Make sure a planner module is on the classpath", - e); - } - } - - @Override - public ImmutablePair<Object, Object> createPlannerAndExecutor( - ClassLoader classLoader, Object environmentSettings, Object sEnv, - Object tableConfig, Object moduleManager, Object functionCatalog, Object catalogManager) { - EnvironmentSettings settings = (EnvironmentSettings) environmentSettings; - Executor executor = (Executor) lookupExecutor(classLoader, settings, sEnv); - Map<String, String> plannerProperties = settings.toPlannerProperties(); - Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) - .create(plannerProperties, executor, (TableConfig) tableConfig, - (FunctionCatalog) functionCatalog, - (CatalogManager) catalogManager); - return ImmutablePair.of(planner, executor); - } - - @Override - public Object createBlinkPlannerEnvSettingBuilder() { - return EnvironmentSettings.newInstance().useBlinkPlanner(); - } - - @Override - public Object createOldPlannerEnvSettingBuilder() { - return EnvironmentSettings.newInstance().useOldPlanner(); - } - - @Override - public InterpreterResult runSqlList(String st, InterpreterContext context, boolean isBatch) { - if (isBatch) { - return batchSqlInterpreter.runSqlList(st, context); - } else { - return streamSqlInterpreter.runSqlList(st, context); - } - } -} diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112SqlInterpreter.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112SqlInterpreter.java deleted file mode 100644 index b72ef266f7..0000000000 --- a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112SqlInterpreter.java +++ /dev/null @@ -1,584 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.flink; - -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.JobStatus; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.core.execution.JobClient; -import org.apache.flink.core.execution.JobListener; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.StatementSet; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.TableResult; -import org.apache.flink.table.api.internal.TableEnvironmentInternal; -import org.apache.flink.table.utils.EncodingUtils; -import org.apache.flink.types.Row; -import org.apache.flink.util.CloseableIterator; -import org.apache.flink.util.CollectionUtil; -import org.apache.zeppelin.flink.shims112.SqlCommandParser; -import org.apache.zeppelin.flink.shims112.SqlCommandParser.SqlCommand; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.ZeppelinContext; -import org.apache.zeppelin.interpreter.util.SqlSplitter; -import org.jline.utils.AttributedString; -import org.jline.utils.AttributedStringBuilder; -import org.jline.utils.AttributedStyle; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -public class Flink112SqlInterpreter { - - private static final Logger LOGGER = LoggerFactory.getLogger(Flink112SqlInterpreter.class); - private static final AttributedString MESSAGE_HELP = - new AttributedStringBuilder() - .append("The following commands are available:\n\n") - .append( - formatCommand( - SqlCommand.CREATE_TABLE, - "Create table under current catalog and database.")) - .append( - formatCommand( - SqlCommand.DROP_TABLE, - "Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'")) - .append( - formatCommand( - SqlCommand.CREATE_VIEW, - "Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'")) - .append( - formatCommand( - SqlCommand.DESCRIBE, - "Describes the schema of a table with the given name.")) - .append( - formatCommand( - SqlCommand.DROP_VIEW, - "Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'")) - .append( - formatCommand( - SqlCommand.EXPLAIN, - "Describes the execution plan of a query or table with the given name.")) - .append(formatCommand(SqlCommand.HELP, "Prints the available commands.")) - .append( - formatCommand( - SqlCommand.INSERT_INTO, - "Inserts the results of a SQL SELECT query into a declared table sink.")) - .append( - formatCommand( - SqlCommand.INSERT_OVERWRITE, - "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.")) - .append( - formatCommand( - SqlCommand.SELECT, - "Executes a SQL SELECT query on the Flink cluster.")) - .append( - formatCommand( - SqlCommand.SET, - "Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties.")) - .append( - formatCommand( - SqlCommand.SHOW_FUNCTIONS, - "Shows all user-defined and built-in functions.")) - .append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered tables.")) - .append( - formatCommand( - SqlCommand.USE_CATALOG, - "Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'")) - .append( - formatCommand( - SqlCommand.USE, - "Sets the current default database. Experimental! Syntax: 'USE <name>;'")) - .style(AttributedStyle.DEFAULT.underline()) - .append("\nHint") - .style(AttributedStyle.DEFAULT) - .append( - ": Make sure that a statement ends with ';' for finalizing (multi-line) statements.") - .toAttributedString(); - - private static AttributedString formatCommand(SqlCommandParser.SqlCommand cmd, String description) { - return new AttributedStringBuilder() - .style(AttributedStyle.DEFAULT.bold()) - .append(cmd.toString()) - .append("\t\t") - .style(AttributedStyle.DEFAULT) - .append(description) - .append('\n') - .toAttributedString(); - } - - private FlinkSqlContext flinkSqlContext; - private TableEnvironment tbenv; - private ZeppelinContext z; - private SqlCommandParser sqlCommandParser; - private SqlSplitter sqlSplitter; - private boolean isBatch; - private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock(); - // paragraphId -> StatementSet - private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>(); - - - public Flink112SqlInterpreter(FlinkSqlContext flinkSqlContext, boolean isBatch) { - this.flinkSqlContext = flinkSqlContext; - this.isBatch = isBatch; - if (isBatch) { - this.tbenv = (TableEnvironment) flinkSqlContext.getBtenv(); - } else { - this.tbenv = (TableEnvironment) flinkSqlContext.getStenv(); - } - this.z = (ZeppelinContext) flinkSqlContext.getZeppelinContext(); - this.sqlCommandParser = new SqlCommandParser((TableEnvironmentInternal) tbenv); - this.sqlSplitter = new SqlSplitter(); - JobListener jobListener = new JobListener() { - @Override - public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) { - if (lock.isHeldByCurrentThread()) { - lock.unlock(); - LOGGER.info("UnLock JobSubmitLock"); - } - } - - @Override - public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) { - - } - }; - - ((ExecutionEnvironment) flinkSqlContext.getBenv()).registerJobListener(jobListener); - ((StreamExecutionEnvironment) flinkSqlContext.getSenv()).registerJobListener(jobListener); - } - - public InterpreterResult runSqlList(String st, InterpreterContext context) { - try { - boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false")); - List<String> sqls = sqlSplitter.splitSql(st).stream().map(String::trim).collect(Collectors.toList()); - boolean isFirstInsert = true; - boolean hasInsert = false; - for (String sql : sqls) { - SqlCommandParser.SqlCommandCall sqlCommand = null; - try { - sqlCommand = sqlCommandParser.parse(sql); - } catch (Exception e1) { - try { - context.out.write("%text Invalid Sql statement: " + sql + "\n"); - context.out.write(e1.toString()); - context.out.write(MESSAGE_HELP.toString()); - } catch (IOException e2) { - return new InterpreterResult(InterpreterResult.Code.ERROR, e2.toString()); - } - return new InterpreterResult(InterpreterResult.Code.ERROR); - } - - try { - if (sqlCommand.command == SqlCommand.INSERT_INTO || - sqlCommand.command == SqlCommand.INSERT_OVERWRITE) { - hasInsert = true; - if (isFirstInsert && runAsOne) { - startMultipleInsert(context); - isFirstInsert = false; - } - } - callCommand(sqlCommand, sql, context); - context.out.flush(); - } catch (Throwable e) { - LOGGER.error("Fail to run sql:" + sql, e); - try { - context.out.write("%text Fail to run sql command: " + - sql + "\n" + ExceptionUtils.getStackTrace(e) + "\n"); - } catch (IOException ex) { - LOGGER.warn("Unexpected exception:", ex); - return new InterpreterResult(InterpreterResult.Code.ERROR, - ExceptionUtils.getStackTrace(e)); - } - return new InterpreterResult(InterpreterResult.Code.ERROR); - } - } - - if (runAsOne && hasInsert) { - try { - lock.lock(); - String jobName = context.getStringLocalProperty("jobName", st); - if (executeMultipleInsertInto(jobName, context)) { - context.out.write("Insertion successfully.\n"); - } - } catch (Exception e) { - LOGGER.error("Fail to execute sql as one job", e); - return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e)); - } finally { - if (lock.isHeldByCurrentThread()) { - lock.unlock(); - } - } - } - } catch (Exception e) { - LOGGER.error("Fail to execute sql", e); - return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e)); - } finally { - statementSetMap.remove(context.getParagraphId()); - } - - return new InterpreterResult(InterpreterResult.Code.SUCCESS); - } - - private void callCommand(SqlCommandParser.SqlCommandCall cmdCall, - String sql, - InterpreterContext context) throws Exception { - switch (cmdCall.command) { - case SET: - callSet(cmdCall, context); - break; - case HELP: - callHelp(context); - break; - case SHOW_CATALOGS: - callShowCatalogs(context); - break; - case SHOW_CURRENT_CATALOG: - callShowCurrentCatalog(context); - break; - case SHOW_DATABASES: - callShowDatabases(context); - break; - case SHOW_CURRENT_DATABASE: - callShowCurrentDatabase(context); - break; - case SHOW_TABLES: - callShowTables(context); - break; - case SHOW_FUNCTIONS: - callShowFunctions(context); - break; - case SHOW_MODULES: - callShowModules(context); - break; - case SHOW_PARTITIONS: - callShowPartitions(sql, context); - break; - case USE_CATALOG: - callUseCatalog(cmdCall.operands[0], context); - break; - case USE: - callUseDatabase(cmdCall.operands[0], context); - break; - case DESC: - case DESCRIBE: - callDescribe(cmdCall.operands[0], context); - break; - case EXPLAIN: - callExplain(cmdCall.operands[0], context); - break; - case SELECT: - callSelect(cmdCall.operands[0], context); - break; - case INSERT_INTO: - case INSERT_OVERWRITE: - callInsertInto(cmdCall.operands[0], context); - break; - case CREATE_TABLE: - callDDL(sql, context, "Table has been created."); - break; - case DROP_TABLE: - callDDL(sql, context, "Table has been dropped."); - break; - case ALTER_TABLE: - callDDL(sql, context, "Alter table succeeded!"); - break; - case CREATE_VIEW: - callDDL(sql, context, "View has been created."); - break; - case DROP_VIEW: - callDDL(sql, context, "View has been dropped."); - break; - case ALTER_VIEW: - callDDL(sql, context, "Alter view succeeded!"); - break; - case CREATE_FUNCTION: - callDDL(sql, context, "Function has been created."); - break; - case DROP_FUNCTION: - callDDL(sql, context, "Function has been removed."); - break; - case ALTER_FUNCTION: - callDDL(sql, context, "Alter function succeeded!"); - break; - case CREATE_DATABASE: - callDDL(sql, context, "Database has been created."); - break; - case DROP_DATABASE: - callDDL(sql, context, "Database has been removed."); - break; - case ALTER_DATABASE: - callDDL(sql, context, "Alter database succeeded!"); - break; - case CREATE_CATALOG: - callDDL(sql, context, "Catalog has been created."); - break; - case DROP_CATALOG: - callDDL(sql, context, "Catalog has been dropped."); - break; - default: - throw new Exception("Unsupported command: " + cmdCall.command); - } - } - - private void callDDL(String sql, InterpreterContext context, String message) throws IOException { - try { - lock.lock(); - this.tbenv.executeSql(sql); - } finally { - if (lock.isHeldByCurrentThread()) { - lock.unlock(); - } - } - context.out.write(message + "\n"); - } - - private void callUseCatalog(String catalog, InterpreterContext context) throws IOException { - tbenv.executeSql("USE CATALOG `" + catalog + "`"); - } - - private void callHelp(InterpreterContext context) throws IOException { - context.out.write(MESSAGE_HELP.toString() + "\n"); - } - - private void callShowCatalogs(InterpreterContext context) throws IOException { - TableResult tableResult = this.tbenv.executeSql("SHOW Catalogs"); - List<String> catalogs = CollectionUtil.iteratorToList(tableResult.collect()).stream() - .map(r -> checkNotNull(r.getField(0)).toString()) - .collect(Collectors.toList()); - context.out.write("%table catalog\n" + StringUtils.join(catalogs, "\n") + "\n"); - } - - private void callShowCurrentCatalog(InterpreterContext context) throws IOException { - TableResult tableResult = this.tbenv.executeSql("SHOW Current Catalog"); - String catalog = tableResult.collect().next().toString(); - context.out.write("%text current catalog: " + catalog + "\n"); - } - - private void callShowDatabases(InterpreterContext context) throws IOException { - TableResult tableResult = this.tbenv.executeSql("SHOW Databases"); - List<String> databases = CollectionUtil.iteratorToList(tableResult.collect()).stream() - .map(r -> checkNotNull(r.getField(0)).toString()) - .collect(Collectors.toList()); - context.out.write( - "%table database\n" + StringUtils.join(databases, "\n") + "\n"); - } - - private void callShowCurrentDatabase(InterpreterContext context) throws IOException { - TableResult tableResult = this.tbenv.executeSql("SHOW Current Database"); - String database = tableResult.collect().next().toString(); - context.out.write("%text current database: " + database + "\n"); - } - - private void callShowTables(InterpreterContext context) throws IOException { - TableResult tableResult = this.tbenv.executeSql("SHOW Tables"); - List<String> tables = CollectionUtil.iteratorToList(tableResult.collect()).stream() - .map(r -> checkNotNull(r.getField(0)).toString()) - .filter(tbl -> !tbl.startsWith("UnnamedTable")) - .collect(Collectors.toList()); - context.out.write( - "%table table\n" + StringUtils.join(tables, "\n") + "\n"); - } - - private void callShowFunctions(InterpreterContext context) throws IOException { - TableResult tableResult = this.tbenv.executeSql("SHOW Functions"); - List<String> functions = CollectionUtil.iteratorToList(tableResult.collect()).stream() - .map(r -> checkNotNull(r.getField(0)).toString()) - .collect(Collectors.toList()); - context.out.write( - "%table function\n" + StringUtils.join(functions, "\n") + "\n"); - } - - private void callShowModules(InterpreterContext context) throws IOException { - String[] modules = this.tbenv.listModules(); - context.out.write("%table modules\n" + StringUtils.join(modules, "\n") + "\n"); - } - - private void callShowPartitions(String sql, InterpreterContext context) throws IOException { - TableResult tableResult = this.tbenv.executeSql(sql); - List<String> functions = CollectionUtil.iteratorToList(tableResult.collect()).stream() - .map(r -> checkNotNull(r.getField(0)).toString()) - .collect(Collectors.toList()); - context.out.write( - "%table partitions\n" + StringUtils.join(functions, "\n") + "\n"); - } - - public void startMultipleInsert(InterpreterContext context) throws Exception { - StatementSet statementSet = tbenv.createStatementSet(); - statementSetMap.put(context.getParagraphId(), statementSet); - } - - public void addInsertStatement(String sql, InterpreterContext context) throws Exception { - statementSetMap.get(context.getParagraphId()).addInsertSql(sql); - } - - public boolean executeMultipleInsertInto(String jobName, InterpreterContext context) throws Exception { - JobClient jobClient = statementSetMap.get(context.getParagraphId()).execute().getJobClient().get(); - while (!jobClient.getJobStatus().get().isTerminalState()) { - LOGGER.debug("Wait for job to finish"); - Thread.sleep(1000 * 5); - } - if (jobClient.getJobStatus().get() == JobStatus.CANCELED) { - context.out.write("Job is cancelled.\n"); - return false; - } - return true; - } - - private void callUseDatabase(String databaseName, - InterpreterContext context) throws IOException { - this.tbenv.executeSql("USE `" + databaseName + "`"); - } - - private void callDescribe(String name, InterpreterContext context) throws IOException { - TableResult tableResult = null; - try { - tableResult = tbenv.executeSql("DESCRIBE " + name); - } catch (Exception e) { - throw new IOException("Fail to describe table: " + name, e); - } - CloseableIterator<Row> result = tableResult.collect(); - StringBuilder builder = new StringBuilder(); - builder.append("Column\tType\n"); - while (result.hasNext()) { - Row row = result.next(); - builder.append(row.getField(0) + "\t" + row.getField(1) + "\n"); - } - context.out.write("%table\n" + builder.toString()); - } - - private void callExplain(String sql, InterpreterContext context) throws IOException { - try { - lock.lock(); - TableResult tableResult = tbenv.executeSql(sql); - String result = tableResult.collect().next().getField(0).toString(); - context.out.write(result + "\n"); - } finally { - if (lock.isHeldByCurrentThread()) { - lock.unlock(); - } - } - } - - public void callSelect(String sql, InterpreterContext context) throws IOException { - try { - lock.lock(); - if (isBatch) { - callBatchInnerSelect(sql, context); - } else { - callStreamInnerSelect(sql, context); - } - } finally { - if (lock.isHeldByCurrentThread()) { - lock.unlock(); - } - } - } - - public void callBatchInnerSelect(String sql, InterpreterContext context) throws IOException { - Table table = this.tbenv.sqlQuery(sql); - String result = z.showData(table); - context.out.write(result); - } - - public void callStreamInnerSelect(String sql, InterpreterContext context) throws IOException { - flinkSqlContext.getStreamSqlSelectConsumer().accept(sql); - } - - private String removeSingleQuote(String value) { - value = value.trim(); - if (value.startsWith("'")) { - value = value.substring(1); - } - if (value.endsWith("'")) { - value = value.substring(0, value.length() - 1); - } - return value; - } - - public void callSet(SqlCommandParser.SqlCommandCall sqlCommand, InterpreterContext context) throws Exception { - if (sqlCommand.operands.length == 0) { - // show all properties - final Map<String, String> properties = this.tbenv.getConfig().getConfiguration().toMap(); - List<String> prettyEntries = new ArrayList<>(); - for (String k : properties.keySet()) { - prettyEntries.add( - String.format( - "'%s' = '%s'", - EncodingUtils.escapeSingleQuotes(k), - EncodingUtils.escapeSingleQuotes(properties.get(k)))); - } - prettyEntries.sort(String::compareTo); - prettyEntries.forEach(entry -> { - try { - context.out.write(entry + "\n"); - } catch (IOException e) { - LOGGER.warn("Fail to write output", e); - } - }); - } else { - String key = removeSingleQuote(sqlCommand.operands[0]); - String value = removeSingleQuote(sqlCommand.operands[1]); - if ("execution.runtime-mode".equals(key)) { - throw new UnsupportedOperationException("execution.runtime-mode is not supported to set, " + - "you can use %flink.ssql & %flink.bsql to switch between streaming mode and batch mode"); - } - LOGGER.info("Set table config: {}={}", key, value); - this.tbenv.getConfig().getConfiguration().setString(key, value); - } - } - - public void callInsertInto(String sql, - InterpreterContext context) throws IOException { - if (!isBatch) { - context.getLocalProperties().put("flink.streaming.insert_into", "true"); - } - try { - lock.lock(); - boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false")); - if (!runAsOne) { - this.tbenv.sqlUpdate(sql); - String jobName = context.getStringLocalProperty("jobName", sql); - this.tbenv.execute(jobName); - context.out.write("Insertion successfully.\n"); - } else { - addInsertStatement(sql, context); - } - } catch (Exception e) { - throw new IOException(e); - } finally { - if (lock.isHeldByCurrentThread()) { - lock.unlock(); - } - } - } -} diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/shims112/CollectStreamTableSink.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/shims112/CollectStreamTableSink.java deleted file mode 100644 index 3650254e02..0000000000 --- a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/shims112/CollectStreamTableSink.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.flink.shims112; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.experimental.CollectSink; -import org.apache.flink.table.sinks.RetractStreamTableSink; -import org.apache.flink.types.Row; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InetAddress; -import java.util.UUID; - -/** - * Table sink for collecting the results locally using sockets. - */ -public class CollectStreamTableSink implements RetractStreamTableSink<Row> { - - private static final Logger LOGGER = LoggerFactory.getLogger(CollectStreamTableSink.class); - - private final InetAddress targetAddress; - private final int targetPort; - private final TypeSerializer<Tuple2<Boolean, Row>> serializer; - - private String[] fieldNames; - private TypeInformation<?>[] fieldTypes; - - public CollectStreamTableSink(InetAddress targetAddress, - int targetPort, - TypeSerializer<Tuple2<Boolean, Row>> serializer) { - LOGGER.info("Use address: " + targetAddress.getHostAddress() + ":" + targetPort); - this.targetAddress = targetAddress; - this.targetPort = targetPort; - this.serializer = serializer; - } - - @Override - public String[] getFieldNames() { - return fieldNames; - } - - @Override - public TypeInformation<?>[] getFieldTypes() { - return fieldTypes; - } - - @Override - public CollectStreamTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { - final CollectStreamTableSink copy = - new CollectStreamTableSink(targetAddress, targetPort, serializer); - copy.fieldNames = fieldNames; - copy.fieldTypes = fieldTypes; - return copy; - } - - @Override - public TypeInformation<Row> getRecordType() { - return Types.ROW_NAMED(fieldNames, fieldTypes); - } - - @Override - public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> stream) { - // add sink - return stream - .addSink(new CollectSink<>(targetAddress, targetPort, serializer)) - .name("Zeppelin Flink Sql Stream Collect Sink " + UUID.randomUUID()) - .setParallelism(1); - } - - @Override - public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() { - return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType()); - } -} diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/shims112/SqlCommandParser.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/shims112/SqlCommandParser.java deleted file mode 100644 index 309250f583..0000000000 --- a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/shims112/SqlCommandParser.java +++ /dev/null @@ -1,355 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.flink.shims112; - -import org.apache.flink.table.api.internal.TableEnvironmentInternal; -import org.apache.flink.table.delegation.Parser; -import org.apache.flink.table.operations.*; -import org.apache.flink.table.operations.ddl.*; - -import javax.annotation.Nullable; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; -import java.util.Optional; -import java.util.function.Function; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * This is copied from flink project with minor modification. - * Simple parser for determining the type of command and its parameters. - * */ -public final class SqlCommandParser { - - private Parser parser; - - public SqlCommandParser(TableEnvironmentInternal tbenv) { - this.parser = tbenv.getParser(); - } - - /** - * Parse a sql statement and return corresponding {@link SqlCommandCall}. If the statement is - * invalid, a {@link Exception} will be thrown. - * - * @param stmt The statement to be parsed - * @return the corresponding SqlCommandCall. - */ - public SqlCommandCall parse(String stmt) throws Exception { - // normalize - stmt = stmt.trim(); - // remove ';' at the end - if (stmt.endsWith(";")) { - stmt = stmt.substring(0, stmt.length() - 1).trim(); - } - - // parse statement via regex matching first - Optional<SqlCommandCall> callOpt = parseByRegexMatching(stmt); - if (callOpt.isPresent()) { - return callOpt.get(); - } else { - return parseBySqlParser(stmt); - } - } - - private SqlCommandCall parseBySqlParser(String stmt) throws Exception { - List<Operation> operations; - try { - operations = parser.parse(stmt); - } catch (Throwable e) { - throw new Exception("Invalidate SQL statement.", e); - } - if (operations.size() != 1) { - throw new Exception("Only single statement is supported now."); - } - - final SqlCommand cmd; - String[] operands = new String[] {stmt}; - Operation operation = operations.get(0); - if (operation instanceof CatalogSinkModifyOperation) { - boolean overwrite = ((CatalogSinkModifyOperation) operation).isOverwrite(); - cmd = overwrite ? SqlCommand.INSERT_OVERWRITE : SqlCommand.INSERT_INTO; - } else if (operation instanceof CreateTableOperation) { - cmd = SqlCommand.CREATE_TABLE; - } else if (operation instanceof DropTableOperation) { - cmd = SqlCommand.DROP_TABLE; - } else if (operation instanceof AlterTableOperation) { - cmd = SqlCommand.ALTER_TABLE; - } else if (operation instanceof CreateViewOperation) { - cmd = SqlCommand.CREATE_VIEW; - } else if (operation instanceof DropViewOperation) { - cmd = SqlCommand.DROP_VIEW; - } else if (operation instanceof AlterViewOperation) { - cmd = SqlCommand.ALTER_VIEW; - } else if (operation instanceof CreateDatabaseOperation) { - cmd = SqlCommand.CREATE_DATABASE; - } else if (operation instanceof DropDatabaseOperation) { - cmd = SqlCommand.DROP_DATABASE; - } else if (operation instanceof AlterDatabaseOperation) { - cmd = SqlCommand.ALTER_DATABASE; - } else if (operation instanceof CreateCatalogOperation) { - cmd = SqlCommand.CREATE_CATALOG; - } else if (operation instanceof DropCatalogOperation) { - cmd = SqlCommand.DROP_CATALOG; - } else if (operation instanceof UseCatalogOperation) { - cmd = SqlCommand.USE_CATALOG; - operands = new String[] {((UseCatalogOperation) operation).getCatalogName()}; - } else if (operation instanceof UseDatabaseOperation) { - cmd = SqlCommand.USE; - operands = new String[] {((UseDatabaseOperation) operation).getDatabaseName()}; - } else if (operation instanceof ShowCatalogsOperation) { - cmd = SqlCommand.SHOW_CATALOGS; - operands = new String[0]; - } else if (operation instanceof ShowCurrentCatalogOperation) { - cmd = SqlCommand.SHOW_CURRENT_CATALOG; - operands = new String[0]; - } else if (operation instanceof ShowDatabasesOperation) { - cmd = SqlCommand.SHOW_DATABASES; - operands = new String[0]; - } else if (operation instanceof ShowCurrentDatabaseOperation) { - cmd = SqlCommand.SHOW_CURRENT_DATABASE; - operands = new String[0]; - } else if (operation instanceof ShowTablesOperation) { - cmd = SqlCommand.SHOW_TABLES; - operands = new String[0]; - } else if (operation instanceof ShowFunctionsOperation) { - cmd = SqlCommand.SHOW_FUNCTIONS; - operands = new String[0]; - } else if (operation instanceof ShowPartitionsOperation) { - cmd = SqlCommand.SHOW_PARTITIONS; - } else if (operation instanceof CreateCatalogFunctionOperation - || operation instanceof CreateTempSystemFunctionOperation) { - cmd = SqlCommand.CREATE_FUNCTION; - } else if (operation instanceof DropCatalogFunctionOperation - || operation instanceof DropTempSystemFunctionOperation) { - cmd = SqlCommand.DROP_FUNCTION; - } else if (operation instanceof AlterCatalogFunctionOperation) { - cmd = SqlCommand.ALTER_FUNCTION; - } else if (operation instanceof ExplainOperation) { - cmd = SqlCommand.EXPLAIN; - } else if (operation instanceof DescribeTableOperation) { - cmd = SqlCommand.DESCRIBE; - operands = - new String[] { - ((DescribeTableOperation) operation) - .getSqlIdentifier() - .asSerializableString() - }; - } else if (operation instanceof QueryOperation) { - cmd = SqlCommand.SELECT; - } else { - throw new Exception("Unknown operation: " + operation.asSummaryString()); - } - - return new SqlCommandCall(cmd, operands); - } - - private static Optional<SqlCommandCall> parseByRegexMatching(String stmt) { - // parse statement via regex matching - for (SqlCommand cmd : SqlCommand.values()) { - if (cmd.hasRegexPattern()) { - final Matcher matcher = cmd.pattern.matcher(stmt); - if (matcher.matches()) { - final String[] groups = new String[matcher.groupCount()]; - for (int i = 0; i < groups.length; i++) { - groups[i] = matcher.group(i + 1); - } - return cmd.operandConverter - .apply(groups) - .map( - (operands) -> { - String[] newOperands = operands; - if (cmd == SqlCommand.EXPLAIN) { - // convert `explain xx` to `explain plan for xx` - // which can execute through executeSql method - newOperands = - new String[] { - "EXPLAIN PLAN FOR " - + operands[0] - + " " - + operands[1] - }; - } - return new SqlCommandCall(cmd, newOperands); - }); - } - } - } - return Optional.empty(); - } - - // -------------------------------------------------------------------------------------------- - - private static final Function<String[], Optional<String[]>> NO_OPERANDS = - (operands) -> Optional.of(new String[0]); - - private static final Function<String[], Optional<String[]>> SINGLE_OPERAND = - (operands) -> Optional.of(new String[] {operands[0]}); - - private static final int DEFAULT_PATTERN_FLAGS = Pattern.CASE_INSENSITIVE | Pattern.DOTALL; - - /** Supported SQL commands. */ - public enum SqlCommand { - - HELP("HELP", NO_OPERANDS), - - SHOW_CATALOGS, - - SHOW_CURRENT_CATALOG, - - SHOW_DATABASES, - - SHOW_CURRENT_DATABASE, - - SHOW_TABLES, - - SHOW_FUNCTIONS, - - // FLINK-17396 - SHOW_MODULES("SHOW\\s+MODULES", NO_OPERANDS), - - SHOW_PARTITIONS, - - USE_CATALOG, - - USE, - - CREATE_CATALOG, - - DROP_CATALOG, - - DESC("DESC\\s+(.*)", SINGLE_OPERAND), - - DESCRIBE, - - // supports both `explain xx` and `explain plan for xx` now - // TODO should keep `explain xx` ? - // only match "EXPLAIN SELECT xx" and "EXPLAIN INSERT xx" here - // "EXPLAIN PLAN FOR xx" should be parsed via sql parser - EXPLAIN( - "EXPLAIN\\s+(SELECT|INSERT)\\s+(.*)", - (operands) -> { - return Optional.of(new String[] {operands[0], operands[1]}); - }), - - CREATE_DATABASE, - - DROP_DATABASE, - - ALTER_DATABASE, - - CREATE_TABLE, - - DROP_TABLE, - - ALTER_TABLE, - - CREATE_VIEW, - - DROP_VIEW, - - ALTER_VIEW, - - CREATE_FUNCTION, - - DROP_FUNCTION, - - ALTER_FUNCTION, - - SELECT, - - INSERT_INTO, - - INSERT_OVERWRITE, - - SET( - "SET(\\s+(\\S+)\\s*=(.*))?", // whitespace is only ignored on the left side of '=' - (operands) -> { - if (operands.length < 3) { - return Optional.empty(); - } else if (operands[0] == null) { - return Optional.of(new String[0]); - } - return Optional.of(new String[] {operands[1], operands[2]}); - }), - - SOURCE("SOURCE\\s+(.*)", SINGLE_OPERAND); - - public final @Nullable Pattern pattern; - public final @Nullable Function<String[], Optional<String[]>> operandConverter; - - SqlCommand() { - this.pattern = null; - this.operandConverter = null; - } - - SqlCommand(String matchingRegex, Function<String[], Optional<String[]>> operandConverter) { - this.pattern = Pattern.compile(matchingRegex, DEFAULT_PATTERN_FLAGS); - this.operandConverter = operandConverter; - } - - @Override - public String toString() { - return super.toString().replace('_', ' '); - } - - public boolean hasOperands() { - return operandConverter != NO_OPERANDS; - } - - public boolean hasRegexPattern() { - return pattern != null; - } - } - - /** Call of SQL command with operands and command type. */ - public static class SqlCommandCall { - public final SqlCommand command; - public final String[] operands; - - public SqlCommandCall(SqlCommand command, String[] operands) { - this.command = command; - this.operands = operands; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - SqlCommandCall that = (SqlCommandCall) o; - return command == that.command && Arrays.equals(operands, that.operands); - } - - @Override - public int hashCode() { - int result = Objects.hash(command); - result = 31 * result + Arrays.hashCode(operands); - return result; - } - - @Override - public String toString() { - return command + "(" + Arrays.toString(operands) + ")"; - } - } -} \ No newline at end of file diff --git a/flink/flink1.12-shims/src/main/scala/org/apache/zeppelin/flink/shims112/Flink112ScalaShims.scala b/flink/flink1.12-shims/src/main/scala/org/apache/zeppelin/flink/shims112/Flink112ScalaShims.scala deleted file mode 100644 index 988ad4e289..0000000000 --- a/flink/flink1.12-shims/src/main/scala/org/apache/zeppelin/flink/shims112/Flink112ScalaShims.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.flink.shims112 - -import org.apache.flink.api.scala.DataSet -import org.apache.flink.streaming.api.scala._ -import org.apache.flink.table.api.Table -import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment -import org.apache.flink.types.Row - -object Flink112ScalaShims { - - def fromDataSet(btenv: BatchTableEnvironment, ds: DataSet[_]): Table = { - btenv.fromDataSet(ds) - } - - def toDataSet(btenv: BatchTableEnvironment, table: Table): DataSet[Row] = { - btenv.toDataSet[Row](table) - } -} diff --git a/flink/pom.xml b/flink/pom.xml index 5ac374ce37..c0e17389c3 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -36,7 +36,6 @@ <modules> <module>flink-scala-parent</module> <module>flink-shims</module> - <module>flink1.12-shims</module> <module>flink1.13-shims</module> <module>flink1.14-shims</module> <module>flink1.15-shims</module> @@ -44,7 +43,6 @@ </modules> <properties> - <flink1.12.version>1.12.4</flink1.12.version> <flink1.13.version>1.13.2</flink1.13.version> <flink1.14.version>1.14.0</flink1.14.version> <flink1.15.version>1.15.1</flink1.15.version> @@ -87,16 +85,6 @@ </modules> </profile> - <profile> - <id>flink-112</id> - <activation> - <activeByDefault>true</activeByDefault> - </activation> - <modules> - <module>flink-scala-2.11</module> - <module>flink-scala-2.12</module> - </modules> - </profile> </profiles> </project> diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest112.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest112.java deleted file mode 100644 index 50bb29cf30..0000000000 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest112.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.integration; - -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.Arrays; -import java.util.List; - -@RunWith(value = Parameterized.class) -public class FlinkIntegrationTest112 extends FlinkIntegrationTest { - - @Parameterized.Parameters - public static List<Object[]> data() { - return Arrays.asList(new Object[][]{ - {"1.12.4", "2.11"}, - {"1.12.4", "2.12"} - }); - } - - public FlinkIntegrationTest112(String flinkVersion, String scalaVersion) { - super(flinkVersion, scalaVersion); - } -} diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java index 2b7e3a622a..1c3fb82eae 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java @@ -68,7 +68,7 @@ public class ZSessionIntegrationTest extends AbstractTestRestApi { notebook = TestUtils.getInstance(Notebook.class); sparkHome = DownloadUtils.downloadSpark("2.4.4", "2.7"); - flinkHome = DownloadUtils.downloadFlink("1.12.4", "2.11"); + flinkHome = DownloadUtils.downloadFlink("1.13.2", "2.11"); } @AfterClass diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest112.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest112.java deleted file mode 100644 index d0c6b46cde..0000000000 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest112.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.integration; - -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.Arrays; -import java.util.List; - -//@RunWith(value = Parameterized.class) -public class ZeppelinFlinkClusterTest112 extends ZeppelinFlinkClusterTest { - - @Parameterized.Parameters - public static List<Object[]> data() { - return Arrays.asList(new Object[][]{ - {"1.12.4", "2.11"}, - {"1.12.4", "2.12"} - }); - } - - public ZeppelinFlinkClusterTest112(String flinkVersion, String scalaVersion) throws Exception { - super(flinkVersion, scalaVersion); - } -}