This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 7e6c123 [ZEPPELIN-5553] Remove support of flink 1.10 & 1.11 7e6c123 is described below commit 7e6c1236ff84acd636325763eb86017e8a412d9c Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Mon Oct 18 11:15:09 2021 +0800 [ZEPPELIN-5553] Remove support of flink 1.10 & 1.11 ### What is this PR for? * Remove module `flink1.10-shims` & `flink1.11-shims` * Related tests are also removed ### What type of PR is it? [ Improvement ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5553 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #4251 from zjffdu/ZEPPELIN-5553 and squashes the following commits: 1160dc675f [Jeff Zhang] update 929df448f9 [Jeff Zhang] [ZEPPELIN-5553] Remove support of flink 1.10 & 1.11 --- .github/workflows/core.yml | 6 +- flink/README.md | 3 +- flink/flink-scala-parent/pom.xml | 92 +--- .../apache/zeppelin/flink/FlinkSqlInterpreter.java | 12 +- .../apache/zeppelin/flink/IPyFlinkInterpreter.java | 4 - .../java/org/apache/zeppelin/flink/JobManager.java | 1 + .../apache/zeppelin/flink/PyFlinkInterpreter.java | 4 - .../org/apache/zeppelin/flink/TableEnvFactory.java | 58 +-- .../src/main/resources/python/zeppelin_ipyflink.py | 9 +- .../src/main/resources/python/zeppelin_pyflink.py | 9 +- .../zeppelin/flink/FlinkScalaInterpreter.scala | 16 +- .../zeppelin/flink/internal/FlinkShell.scala | 3 - .../flink/FlinkBatchSqlInterpreterTest.java | 4 - .../apache/zeppelin/flink/SqlInterpreterTest.java | 212 ++++----- .../java/org/apache/zeppelin/flink/FlinkShims.java | 8 +- .../org/apache/zeppelin/flink/FlinkVersion.java | 4 - flink/flink1.10-shims/pom.xml | 217 --------- .../org/apache/zeppelin/flink/Flink110Shims.java | 389 --------------- .../flink/shims110/CollectStreamTableSink.java | 102 ---- .../flink/shims110/Flink110ScalaShims.scala | 37 -- flink/flink1.11-shims/pom.xml | 211 --------- .../org/apache/zeppelin/flink/Flink111Shims.java | 526 --------------------- .../flink/shims111/CollectStreamTableSink.java | 97 ---- .../flink/shims111/Flink111ScalaShims.scala | 36 -- flink/pom.xml | 4 - .../integration/FlinkIntegrationTest110.java | 40 -- .../integration/FlinkIntegrationTest111.java | 40 -- .../integration/ZSessionIntegrationTest.java | 2 +- .../integration/ZeppelinFlinkClusterTest.java | 15 +- .../integration/ZeppelinFlinkClusterTest112.java | 8 +- ...st111.java => ZeppelinFlinkClusterTest113.java} | 9 +- ...st110.java => ZeppelinFlinkClusterTest114.java} | 9 +- .../src/test/resources/init_stream.scala | 2 +- .../interpreter/RemoteInterpreterEventServer.java | 1 + 34 files changed, 140 insertions(+), 2050 deletions(-) diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index 1609ad1..866ed31 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -143,12 +143,13 @@ jobs: ./mvnw package -DskipRat -pl zeppelin-plugins -amd -DskipTests -B - name: run tests run: ./mvnw test -DskipRat -pl zeppelin-interpreter-integration -Pintegration -DfailIfNoTests=false -Dtest=ZeppelinClientIntegrationTest,ZeppelinClientWithAuthIntegrationTest,ZSessionIntegrationTest + test-flink-and-flink-integration-test: runs-on: ubuntu-20.04 strategy: fail-fast: false matrix: - flink: [110, 111, 112, 113, 114] + flink: [112, 113, 114] steps: - name: Checkout uses: actions/checkout@v2 @@ -180,7 +181,8 @@ jobs: ./mvnw install -DskipTests -DskipRat -am -pl flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Pintegration -B ./mvnw clean package -pl zeppelin-plugins -amd -DskipTests -B - name: run tests - run: ./mvnw test -DskipRat -pl flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Pintegration -DfailIfNoTests=false -B -Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest${{ matrix.flink }},ZeppelinFlinkClusterTest${{ matrix.flink }} + run: ./mvnw test -DskipRat -pl flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Pintegration -DfailIfNoTests=false -B -Dtest=ZeppelinFlinkClusterTest${{ matrix.flink }} + run-spark-intergration-test: runs-on: ubuntu-20.04 steps: diff --git a/flink/README.md b/flink/README.md index e8e7dd9..53618ad 100644 --- a/flink/README.md +++ b/flink/README.md @@ -8,10 +8,9 @@ This is the doc for Zeppelin developers who want to work on flink interpreter. Flink interpreter is more complex than other interpreter (such as jdbc, shell). Currently it has following 8 modules * flink-shims -* flink1.10-shims -* flink1.11-shims * flink1.12-shims * flink1.13-shims +* flink1.14-shims * flink-scala-parent * flink-scala-2.11 * flink-scala-2.12 diff --git a/flink/flink-scala-parent/pom.xml b/flink/flink-scala-parent/pom.xml index b51ed70..83a7ec7 100644 --- a/flink/flink-scala-parent/pom.xml +++ b/flink/flink-scala-parent/pom.xml @@ -36,7 +36,7 @@ <properties> <!--library versions--> <interpreter.name>flink</interpreter.name> - <flink.version>${flink1.10.version}</flink.version> + <flink.version>${flink1.12.version}</flink.version> <flink.hadoop.version>2.6.5</flink.hadoop.version> <hive.version>2.3.4</hive.version> <hiverunner.version>4.0.0</hiverunner.version> @@ -56,18 +56,6 @@ <dependency> <groupId>org.apache.zeppelin</groupId> - <artifactId>flink1.10-shims</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.zeppelin</groupId> - <artifactId>flink1.11-shims</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.zeppelin</groupId> <artifactId>flink1.12-shims</artifactId> <version>${project.version}</version> </dependency> @@ -931,84 +919,6 @@ <profiles> <profile> - <id>flink-110</id> - <properties> - <flink.version>${flink1.10.version}</flink.version> - </properties> - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>org.reflections</groupId> - <artifactId>reflections</artifactId> - </exclusion> - </exclusions> - </dependency> - </dependencies> - </profile> - - <profile> - <id>flink-111</id> - <properties> - <flink.version>${flink1.11.version}</flink.version> - </properties> - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>org.reflections</groupId> - <artifactId>reflections</artifactId> - </exclusion> - </exclusions> - </dependency> - </dependencies> - </profile> - - <profile> <id>flink-112</id> <properties> <flink.version>${flink1.12.version}</flink.version> diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java index 97abfdc..c3ec4cd 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java @@ -341,11 +341,7 @@ public abstract class FlinkSqlInterpreter extends AbstractInterpreter { private void callDropView(SqlCommandParser.SqlCommandCall sqlCommand, InterpreterContext context) throws IOException { try { lock.lock(); - if (flinkInterpreter.getFlinkVersion().isFlink110()) { - this.tbenv.dropTemporaryView(sqlCommand.operands[0]); - } else { - flinkInterpreter.getFlinkShims().executeSql(tbenv, sqlCommand.sql); - } + flinkInterpreter.getFlinkShims().executeSql(tbenv, sqlCommand.sql); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); @@ -357,11 +353,7 @@ public abstract class FlinkSqlInterpreter extends AbstractInterpreter { private void callCreateView(SqlCommandParser.SqlCommandCall sqlCommand, InterpreterContext context) throws IOException { try { lock.lock(); - if (flinkInterpreter.getFlinkVersion().isFlink110()) { - this.tbenv.createTemporaryView(sqlCommand.operands[0], tbenv.sqlQuery(sqlCommand.operands[1])); - } else { - flinkInterpreter.getFlinkShims().executeSql(tbenv, sqlCommand.sql); - } + flinkInterpreter.getFlinkShims().executeSql(tbenv, sqlCommand.sql); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java index 25ee255..12bda20 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java @@ -137,10 +137,6 @@ public class IPyFlinkInterpreter extends IPythonInterpreter { return flinkInterpreter.getProgress(context); } - public boolean isFlink110() { - return flinkInterpreter.getFlinkVersion().isFlink110(); - } - public org.apache.flink.api.java.ExecutionEnvironment getJavaExecutionEnvironment() { return flinkInterpreter.getExecutionEnvironment().getJavaEnv(); } diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java index 49c803a..c957fa7 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java @@ -268,6 +268,7 @@ public class JobManager { if (!StringUtils.isBlank(checkpointPath) && !checkpointPath.equals(latestCheckpointPath)) { Map<String, String> config = new HashMap<>(); config.put(LATEST_CHECKPOINT_PATH, checkpointPath); + LOGGER.info("Update latest checkpoint path: {}", checkpointPath); context.getIntpEventClient().updateParagraphConfig( context.getNoteId(), context.getParagraphId(), config); latestCheckpointPath = checkpointPath; diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java index 4162cc3..8c38d79 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java @@ -182,10 +182,6 @@ public class PyFlinkInterpreter extends PythonInterpreter { return flinkInterpreter.getProgress(context); } - public boolean isFlink110() { - return flinkInterpreter.getFlinkVersion().isFlink110(); - } - public boolean isAfterFlink114() { return flinkInterpreter.getFlinkVersion().isAfterFlink114(); } diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java index 6c080fd..1456a9f 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java @@ -104,14 +104,9 @@ public class TableEnvFactory { public TableEnvironment createScalaFlinkBatchTableEnvironment() { try { - Class clazz = null; - if (flinkVersion.isFlink110()) { - clazz = Class - .forName("org.apache.flink.table.api.scala.internal.BatchTableEnvironmentImpl"); - } else { - clazz = Class + Class clazz = Class .forName("org.apache.flink.table.api.bridge.scala.internal.BatchTableEnvironmentImpl"); - } + Constructor constructor = clazz .getConstructor( org.apache.flink.api.scala.ExecutionEnvironment.class, @@ -134,14 +129,8 @@ public class TableEnvFactory { Planner planner = (Planner) pair.left; Executor executor = (Executor) pair.right; - Class clazz = null; - if (flinkVersion.isFlink110()) { - clazz = Class - .forName("org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl"); - } else { - clazz = Class + Class clazz = Class .forName("org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl"); - } try { Constructor constructor = clazz @@ -195,14 +184,8 @@ public class TableEnvFactory { public TableEnvironment createJavaFlinkBatchTableEnvironment() { try { - Class<?> clazz = null; - if (flinkVersion.isFlink110()) { - clazz = Class - .forName("org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl"); - } else { - clazz = Class + Class<?> clazz = Class .forName("org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl"); - } Constructor con = clazz.getConstructor( ExecutionEnvironment.class, @@ -229,14 +212,8 @@ public class TableEnvFactory { Planner planner = (Planner) pair.left; Executor executor = (Executor) pair.right; - Class clazz = null; - if (flinkVersion.isFlink110()) { - clazz = Class - .forName("org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl"); - } else { - clazz = Class + Class clazz = Class .forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl"); - } try { Constructor constructor = clazz @@ -297,14 +274,8 @@ public class TableEnvFactory { Planner planner = (Planner) pair.left; Executor executor = (Executor) pair.right; - Class clazz = null; - if (flinkVersion.isFlink110()) { - clazz = Class - .forName("org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl"); - } else { - clazz = Class + Class clazz = Class .forName("org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl"); - } try { Constructor constructor = clazz .getConstructor( @@ -360,14 +331,9 @@ public class TableEnvFactory { Planner planner = (Planner) pair.left; Executor executor = (Executor) pair.right; - Class clazz = null; - if (flinkVersion.isFlink110()) { - clazz = Class - .forName("org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl"); - } else { - clazz = Class + Class clazz = Class .forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl"); - } + try { Constructor constructor = clazz .getConstructor( @@ -424,14 +390,8 @@ public class TableEnvFactory { Planner planner = (Planner) pair.left; Executor executor = (Executor) pair.right; - Class clazz = null; - if (flinkVersion.isFlink110()) { - clazz = Class - .forName("org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl"); - } else { - clazz = Class + Class clazz = Class .forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl"); - } try { Constructor constructor = clazz.getConstructor( CatalogManager.class, diff --git a/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py b/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py index 9249453..367b318 100644 --- a/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py +++ b/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py @@ -46,14 +46,7 @@ pyflink.java_gateway.install_exception_handler() s_env = StreamExecutionEnvironment(intp.getJavaStreamExecutionEnvironment()) -if intp.isFlink110(): - from pyflink.dataset import * - b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment()) - bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"), True) - bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"), False) - st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"), True) - st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink"), False) -elif not intp.isAfterFlink114(): +if not intp.isAfterFlink114(): from pyflink.dataset import * b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment()) bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink")) diff --git a/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py b/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py index 06b99c9..173c3b5 100644 --- a/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py +++ b/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py @@ -35,14 +35,7 @@ pyflink.java_gateway.install_exception_handler() s_env = StreamExecutionEnvironment(intp.getJavaStreamExecutionEnvironment()) -if intp.isFlink110(): - from pyflink.dataset import * - b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment()) - bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"), True) - bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"), False) - st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"), True) - st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink"), False) -elif not intp.isAfterFlink114(): +if not intp.isAfterFlink114(): from pyflink.dataset import * b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment()) bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink")) diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala index 12051dd..bab5327 100644 --- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala +++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala @@ -187,9 +187,6 @@ abstract class FlinkScalaInterpreter(val properties: Properties, .replace("-", "_") .toUpperCase) if (ExecutionMode.isYarnAppicationMode(mode)) { - if (flinkVersion.isFlink110) { - throw new Exception("yarn-application mode is only supported after Flink 1.11") - } // use current yarn container working directory as FLINK_HOME, FLINK_CONF_DIR and HIVE_CONF_DIR val workingDirectory = new File(".").getAbsolutePath flinkHome = workingDirectory @@ -197,9 +194,6 @@ abstract class FlinkScalaInterpreter(val properties: Properties, hiveConfDir = workingDirectory } if (ExecutionMode.isK8sApplicationMode(mode)) { - if (flinkVersion.isFlink110) { - throw new Exception("application mode is only supported after Flink 1.11") - } // use current pod working directory as FLINK_HOME val workingDirectory = new File(".").getAbsolutePath flinkHome = workingDirectory @@ -417,14 +411,8 @@ abstract class FlinkScalaInterpreter(val properties: Properties, ) flinkILoop.intp.interpret("import " + packageImports.mkString(", ")) - - if (flinkVersion.isFlink110) { - flinkILoop.intp.interpret("import org.apache.flink.table.api.scala._") - } else { - flinkILoop.intp.interpret("import org.apache.flink.table.api._") - flinkILoop.intp.interpret("import org.apache.flink.table.api.bridge.scala._") - } - + flinkILoop.intp.interpret("import org.apache.flink.table.api._") + flinkILoop.intp.interpret("import org.apache.flink.table.api.bridge.scala._") flinkILoop.intp.interpret("import org.apache.flink.table.functions.ScalarFunction") flinkILoop.intp.interpret("import org.apache.flink.table.functions.AggregateFunction") flinkILoop.intp.interpret("import org.apache.flink.table.functions.TableFunction") diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkShell.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkShell.scala index ab0f299..ae6d411 100644 --- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkShell.scala +++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkShell.scala @@ -112,9 +112,6 @@ object FlinkShell { case None => (flinkConfig, None) } - // workaround for FLINK-17788, otherwise it won't work with flink 1.10.1 which has been released. - flinkConfig.set(DeploymentOptions.TARGET, YarnSessionClusterExecutor.NAME) - val (effectiveConfig, _) = clusterClient match { case Some(_) => fetchDeployedYarnClusterInfo(config, clusterConfig, "yarn-cluster", flinkShims) case None => fetchDeployedYarnClusterInfo(config, clusterConfig, "default", flinkShims) diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java index 875756f..e3526eb 100644 --- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java +++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java @@ -406,10 +406,6 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest { @Test public void testFunctionHintRowType() throws InterpreterException, IOException { - if (flinkInterpreter.getFlinkVersion().isFlink110()) { - // Row Type hint is not supported in flink 1.10 - return; - } // define table function with TableHint of Row return type InterpreterContext context = getInterpreterContext(); InterpreterResult result = flinkInterpreter.interpret( diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java index e71ff93..12862d2 100644 --- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java +++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java @@ -368,15 +368,13 @@ public abstract class SqlInterpreterTest { assertEquals("table\nsource_table\n", resultMessages.get(0).getData()); // create temporary view - if (!flinkInterpreter.getFlinkVersion().isFlink110()) { - context = getInterpreterContext(); - result = sqlInterpreter.interpret("create temporary view my_temp_view as select int_col from source_table", context); - assertEquals(result.toString(), Code.SUCCESS, result.code()); - resultMessages = context.out.toInterpreterResultMessage(); - assertEquals(1, resultMessages.size()); - assertEquals(Type.TEXT, resultMessages.get(0).getType()); - assertEquals("View has been created.\n", resultMessages.get(0).getData()); - } + context = getInterpreterContext(); + result = sqlInterpreter.interpret("create temporary view my_temp_view as select int_col from source_table", context); + assertEquals(result.toString(), Code.SUCCESS, result.code()); + resultMessages = context.out.toInterpreterResultMessage(); + assertEquals(1, resultMessages.size()); + assertEquals(Type.TEXT, resultMessages.get(0).getType()); + assertEquals("View has been created.\n", resultMessages.get(0).getData()); } @Test @@ -432,133 +430,107 @@ public abstract class SqlInterpreterTest { @Test public void testFunction() throws IOException, InterpreterException { + InterpreterContext context = getInterpreterContext(); - FlinkVersion flinkVersion = flinkInterpreter.getFlinkVersion(); - if(!flinkVersion.isFlink110()){ - InterpreterContext context = getInterpreterContext(); - - // CREATE UDF - InterpreterResult result = sqlInterpreter.interpret( - "CREATE FUNCTION myudf AS 'org.apache.zeppelin.flink.JavaUpper' ;", context); - assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); - List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage(); - assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Function has been created.")); - - // SHOW UDF - context = getInterpreterContext(); - result = sqlInterpreter.interpret( - "SHOW FUNCTIONS ;", context); - assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); - resultMessages = context.out.toInterpreterResultMessage(); - assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("myudf")); + // CREATE UDF + InterpreterResult result = sqlInterpreter.interpret( + "CREATE FUNCTION myudf AS 'org.apache.zeppelin.flink.JavaUpper' ;", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage(); + assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Function has been created.")); + // SHOW UDF + context = getInterpreterContext(); + result = sqlInterpreter.interpret( + "SHOW FUNCTIONS ;", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + resultMessages = context.out.toInterpreterResultMessage(); + assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("myudf")); - // ALTER - context = getInterpreterContext(); - result = sqlInterpreter.interpret( - "ALTER FUNCTION myUDF AS 'org.apache.zeppelin.flink.JavaLower' ; ", context); - assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); - resultMessages = context.out.toInterpreterResultMessage(); - assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Function has been modified.")); + // ALTER + context = getInterpreterContext(); + result = sqlInterpreter.interpret( + "ALTER FUNCTION myUDF AS 'org.apache.zeppelin.flink.JavaLower' ; ", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + resultMessages = context.out.toInterpreterResultMessage(); + assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Function has been modified.")); - // DROP UDF - context = getInterpreterContext(); - result = sqlInterpreter.interpret("DROP FUNCTION myudf ;", context); - assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); - resultMessages = context.out.toInterpreterResultMessage(); - assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Function has been dropped.")); + // DROP UDF + context = getInterpreterContext(); + result = sqlInterpreter.interpret("DROP FUNCTION myudf ;", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + resultMessages = context.out.toInterpreterResultMessage(); + assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Function has been dropped.")); - // SHOW UDF. Due to drop UDF before, it shouldn't contain 'myudf' - result = sqlInterpreter.interpret( - "SHOW FUNCTIONS ;", context); - assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); - resultMessages = context.out.toInterpreterResultMessage(); - assertFalse(resultMessages.toString(), resultMessages.get(0).getData().contains("myudf")); - } else { - // Flink1.10 don't support ddl for function - assertTrue(flinkVersion.isFlink110()); - } + // SHOW UDF. Due to drop UDF before, it shouldn't contain 'myudf' + result = sqlInterpreter.interpret( + "SHOW FUNCTIONS ;", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + resultMessages = context.out.toInterpreterResultMessage(); + assertFalse(resultMessages.toString(), resultMessages.get(0).getData().contains("myudf")); } @Test public void testCatalog() throws IOException, InterpreterException{ - FlinkVersion flinkVersion = flinkInterpreter.getFlinkVersion(); - - if (!flinkVersion.isFlink110()){ - InterpreterContext context = getInterpreterContext(); + InterpreterContext context = getInterpreterContext(); - // CREATE CATALOG - InterpreterResult result = sqlInterpreter.interpret( - "CREATE CATALOG test_catalog \n" + - "WITH( \n" + - "'type'='generic_in_memory' \n" + - ");", context); - assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); - List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage(); - assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Catalog has been created.")); + // CREATE CATALOG + InterpreterResult result = sqlInterpreter.interpret( + "CREATE CATALOG test_catalog \n" + + "WITH( \n" + + "'type'='generic_in_memory' \n" + + ");", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage(); + assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Catalog has been created.")); - // USE CATALOG & SHOW DATABASES; - context = getInterpreterContext(); - result = sqlInterpreter.interpret( - "USE CATALOG test_catalog ;\n" + - "SHOW DATABASES;", context); - assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); - resultMessages = context.out.toInterpreterResultMessage(); - assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("default")); + // USE CATALOG & SHOW DATABASES; + context = getInterpreterContext(); + result = sqlInterpreter.interpret( + "USE CATALOG test_catalog ;\n" + + "SHOW DATABASES;", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + resultMessages = context.out.toInterpreterResultMessage(); + assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("default")); - // DROP CATALOG - context = getInterpreterContext(); - result = sqlInterpreter.interpret( - "DROP CATALOG test_catalog ;\n", context); - assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); - resultMessages = context.out.toInterpreterResultMessage(); - assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Catalog has been dropped.")); + // DROP CATALOG + context = getInterpreterContext(); + result = sqlInterpreter.interpret( + "DROP CATALOG test_catalog ;\n", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + resultMessages = context.out.toInterpreterResultMessage(); + assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Catalog has been dropped.")); - // SHOW CATALOG. Due to drop CATALOG before, it shouldn't contain 'test_catalog' - context = getInterpreterContext(); - result = sqlInterpreter.interpret( - "SHOW CATALOGS ;\n", context); - assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); - resultMessages = context.out.toInterpreterResultMessage(); - assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("default_catalog")); - assertFalse(resultMessages.toString(),resultMessages.get(0).getData().contains("test_catalog")); - } else { - // Flink1.10 don't support ddl for catalog - assertTrue(flinkVersion.isFlink110()); - } + // SHOW CATALOG. Due to drop CATALOG before, it shouldn't contain 'test_catalog' + context = getInterpreterContext(); + result = sqlInterpreter.interpret( + "SHOW CATALOGS ;\n", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + resultMessages = context.out.toInterpreterResultMessage(); + assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("default_catalog")); + assertFalse(resultMessages.toString(),resultMessages.get(0).getData().contains("test_catalog")); } @Test public void testSetProperty() throws InterpreterException { FlinkVersion flinkVersion = flinkInterpreter.getFlinkVersion(); + InterpreterContext context = getInterpreterContext(); + InterpreterResult result = sqlInterpreter.interpret( + "set table.sql-dialect=hive", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); - if (!flinkVersion.isFlink110()){ - InterpreterContext context = getInterpreterContext(); - InterpreterResult result = sqlInterpreter.interpret( - "set table.sql-dialect=hive", context); - assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); - - sqlInterpreter.interpret("create table test_hive_table(a string, b int)\n" + - "partitioned by (dt string)", context); - assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); - } else { - // Flink1.10 doesn't support set table.sql-dialet which is introduced in flink 1.11 - InterpreterContext context = getInterpreterContext(); - InterpreterResult result = sqlInterpreter.interpret( - "set table.sql-dialect=hive", context); - assertEquals(context.out.toString(), Code.ERROR, result.code()); - assertTrue(context.out.toString(), - context.out.toString().contains("table.sql-dialect is not a valid table/sql config")); - } + sqlInterpreter.interpret("create table test_hive_table(a string, b int)\n" + + "partitioned by (dt string)", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); // table.local-time-zone is only available from 1.12 if (flinkVersion.newerThanOrEqual(FlinkVersion.fromVersionString("1.12.0"))) { - InterpreterContext context = getInterpreterContext(); - InterpreterResult result = sqlInterpreter.interpret("SET 'table.local-time-zone' = 'UTC'", context); + context = getInterpreterContext(); + result = sqlInterpreter.interpret("SET 'table.local-time-zone' = 'UTC'", context); assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); } } @@ -566,20 +538,14 @@ public abstract class SqlInterpreterTest { @Test public void testShowModules() throws InterpreterException, IOException { FlinkVersion flinkVersion = flinkInterpreter.getFlinkVersion(); + InterpreterContext context = getInterpreterContext(); - if (!flinkVersion.isFlink110()) { - InterpreterContext context = getInterpreterContext(); - - // CREATE CATALOG - InterpreterResult result = sqlInterpreter.interpret( - "show modules", context); - assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); - List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage(); - assertTrue(resultMessages.toString(), resultMessages.get(0).getData().contains("core")); - } else { - // Flink1.10 don't support show modules - assertTrue(flinkVersion.isFlink110()); - } + // CREATE CATALOG + InterpreterResult result = sqlInterpreter.interpret( + "show modules", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage(); + assertTrue(resultMessages.toString(), resultMessages.get(0).getData().contains("core")); } diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java index ba25ec9..640eba6 100644 --- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java +++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java @@ -56,13 +56,7 @@ public abstract class FlinkShims { private static FlinkShims loadShims(FlinkVersion flinkVersion, Properties properties) throws Exception { Class<?> flinkShimsClass; - if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 10) { - LOGGER.info("Initializing shims for Flink 1.10"); - flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink110Shims"); - } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 11) { - LOGGER.info("Initializing shims for Flink 1.11"); - flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink111Shims"); - } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 12) { + if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 12) { LOGGER.info("Initializing shims for Flink 1.12"); flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink112Shims"); } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 13) { diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java index 2e1f47e..1c83645 100644 --- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java +++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java @@ -79,10 +79,6 @@ public class FlinkVersion { return new FlinkVersion(versionString); } - public boolean isFlink110() { - return this.majorVersion == 1 && minorVersion == 10; - } - public boolean isAfterFlink114() { return newerThanOrEqual(FlinkVersion.fromVersionString("1.14.0")); } diff --git a/flink/flink1.10-shims/pom.xml b/flink/flink1.10-shims/pom.xml deleted file mode 100644 index 14d2dd5..0000000 --- a/flink/flink1.10-shims/pom.xml +++ /dev/null @@ -1,217 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <artifactId>flink-parent</artifactId> - <groupId>org.apache.zeppelin</groupId> - <version>0.11.0-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - - <modelVersion>4.0.0</modelVersion> - <artifactId>flink1.10-shims</artifactId> - <packaging>jar</packaging> - <name>Zeppelin: Flink1.10 Shims</name> - - <properties> - <flink.version>${flink1.10.version}</flink.version> - </properties> - - <dependencies> - - <dependency> - <groupId>org.apache.zeppelin</groupId> - <artifactId>flink-shims</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-clients_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-api-java</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-api-scala_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-api-scala-bridge_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-api-java-bridge_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-scala_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_2.11</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-scala_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>org.reflections</groupId> - <artifactId>reflections</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-python_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - </dependencies> - - <build> - <plugins> - - <plugin> - <groupId>net.alchim31.maven</groupId> - <artifactId>scala-maven-plugin</artifactId> - <executions> - <execution> - <id>eclipse-add-source</id> - <goals> - <goal>add-source</goal> - </goals> - </execution> - <execution> - <id>scala-compile-first</id> - <phase>process-resources</phase> - <goals> - <goal>compile</goal> - </goals> - </execution> - <execution> - <id>scala-test-compile-first</id> - <phase>process-test-resources</phase> - <goals> - <goal>testCompile</goal> - </goals> - </execution> - </executions> - <configuration> - <scalaVersion>${flink.scala.version}</scalaVersion> - <args> - <arg>-unchecked</arg> - <arg>-deprecation</arg> - <arg>-feature</arg> - <arg>-target:jvm-1.8</arg> - </args> - <jvmArgs> - <jvmArg>-Xms1024m</jvmArg> - <jvmArg>-Xmx1024m</jvmArg> - <jvmArg>-XX:MaxMetaspaceSize=${MaxMetaspace}</jvmArg> - </jvmArgs> - <javacArgs> - <javacArg>-source</javacArg> - <javacArg>${java.version}</javacArg> - <javacArg>-target</javacArg> - <javacArg>${java.version}</javacArg> - <javacArg>-Xlint:all,-serial,-path,-options</javacArg> - </javacArgs> - </configuration> - </plugin> - - <plugin> - <artifactId>maven-resources-plugin</artifactId> - <executions> - <execution> - <id>copy-interpreter-setting</id> - <phase>none</phase> - <configuration> - <skip>true</skip> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - -</project> \ No newline at end of file diff --git a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java deleted file mode 100644 index 5711884..0000000 --- a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java +++ /dev/null @@ -1,389 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.flink; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.scala.DataSet; -import org.apache.flink.client.cli.CliFrontend; -import org.apache.flink.client.cli.CustomCommandLine; -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.python.PythonOptions; -import org.apache.flink.python.util.ResourceUtil; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.TableException; -import org.apache.flink.table.api.TableUtils; -import org.apache.flink.table.api.config.ExecutionConfigOptions; -import org.apache.flink.table.api.config.OptimizerConfigOptions; -import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl; -import org.apache.flink.table.api.scala.BatchTableEnvironment; -import org.apache.flink.table.calcite.FlinkTypeFactory; -import org.apache.flink.table.catalog.CatalogManager; -import org.apache.flink.table.catalog.FunctionCatalog; -import org.apache.flink.table.catalog.GenericInMemoryCatalog; -import org.apache.flink.table.delegation.Executor; -import org.apache.flink.table.delegation.ExecutorFactory; -import org.apache.flink.table.delegation.Planner; -import org.apache.flink.table.delegation.PlannerFactory; -import org.apache.flink.table.factories.ComponentFactoryService; -import org.apache.flink.table.functions.AggregateFunction; -import org.apache.flink.table.functions.ScalarFunction; -import org.apache.flink.table.functions.TableAggregateFunction; -import org.apache.flink.table.functions.TableFunction; -import org.apache.flink.table.sinks.TableSink; -import org.apache.flink.table.utils.EncodingUtils; -import org.apache.flink.types.Row; -import org.apache.flink.util.FlinkException; -import org.apache.zeppelin.flink.shims110.CollectStreamTableSink; -import org.apache.zeppelin.flink.shims110.Flink110ScalaShims; -import org.apache.zeppelin.flink.sql.SqlCommandParser; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.jline.utils.AttributedString; -import org.jline.utils.AttributedStringBuilder; -import org.jline.utils.AttributedStyle; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.net.InetAddress; -import java.nio.file.Files; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; -import java.util.regex.Matcher; - - -/** - * Shims for flink 1.10 - */ -public class Flink110Shims extends FlinkShims { - - private static final Logger LOGGER = LoggerFactory.getLogger(Flink110Shims.class); - public static final AttributedString MESSAGE_HELP = new AttributedStringBuilder() - .append("The following commands are available:\n\n") - .append(formatCommand(SqlCommandParser.SqlCommand.CREATE_TABLE, "Create table under current catalog and database.")) - .append(formatCommand(SqlCommandParser.SqlCommand.DROP_TABLE, "Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'")) - .append(formatCommand(SqlCommandParser.SqlCommand.CREATE_VIEW, "Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'")) - .append(formatCommand(SqlCommandParser.SqlCommand.DESCRIBE, "Describes the schema of a table with the given name.")) - .append(formatCommand(SqlCommandParser.SqlCommand.DROP_VIEW, "Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'")) - .append(formatCommand(SqlCommandParser.SqlCommand.EXPLAIN, "Describes the execution plan of a query or table with the given name.")) - .append(formatCommand(SqlCommandParser.SqlCommand.HELP, "Prints the available commands.")) - .append(formatCommand(SqlCommandParser.SqlCommand.INSERT_INTO, "Inserts the results of a SQL SELECT query into a declared table sink.")) - .append(formatCommand(SqlCommandParser.SqlCommand.INSERT_OVERWRITE, "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.")) - .append(formatCommand(SqlCommandParser.SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster.")) - .append(formatCommand(SqlCommandParser.SqlCommand.SET, "Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties.")) - .append(formatCommand(SqlCommandParser.SqlCommand.SHOW_FUNCTIONS, "Shows all user-defined and built-in functions.")) - .append(formatCommand(SqlCommandParser.SqlCommand.SHOW_TABLES, "Shows all registered tables.")) - .append(formatCommand(SqlCommandParser.SqlCommand.SOURCE, "Reads a SQL SELECT query from a file and executes it on the Flink cluster.")) - .append(formatCommand(SqlCommandParser.SqlCommand.USE_CATALOG, "Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'")) - .append(formatCommand(SqlCommandParser.SqlCommand.USE, "Sets the current default database. Experimental! Syntax: 'USE <name>;'")) - .style(AttributedStyle.DEFAULT.underline()) - .append("\nHint") - .style(AttributedStyle.DEFAULT) - .append(": Make sure that a statement ends with ';' for finalizing (multi-line) statements.") - .toAttributedString(); - - public Flink110Shims(FlinkVersion flinkVersion, Properties properties) { - super(flinkVersion, properties); - } - - @Override - public void disableSysoutLogging(Object batchConfig, Object streamConfig) { - ((ExecutionConfig) batchConfig).disableSysoutLogging(); - ((ExecutionConfig) streamConfig).disableSysoutLogging(); - } - - @Override - public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) { - return new StreamExecutionEnvironmentFactory() { - @Override - public StreamExecutionEnvironment createExecutionEnvironment() { - return (StreamExecutionEnvironment) streamExecutionEnvironment; - } - }; - } - - @Override - public Object createCatalogManager(Object config) { - return new CatalogManager("default_catalog", - new GenericInMemoryCatalog("default_catalog", "default_database")); - } - - @Override - public String getPyFlinkPythonPath(Properties properties) throws IOException { - String flinkHome = System.getenv("FLINK_HOME"); - if (flinkHome != null) { - File tmpDir = Files.createTempDirectory("zeppelin").toFile(); - List<File> depFiles = null; - try { - depFiles = ResourceUtil.extractBuiltInDependencies(tmpDir.getAbsolutePath(), "pyflink", true); - } catch (InterruptedException e) { - throw new IOException(e); - } - StringBuilder builder = new StringBuilder(); - for (File file : depFiles) { - LOGGER.info("Adding extracted file to PYTHONPATH: " + file.getAbsolutePath()); - builder.append(file.getAbsolutePath() + ":"); - } - return builder.toString(); - } else { - throw new IOException("No FLINK_HOME is specified"); - } - } - - @Override - public Object getCollectStreamTableSink(InetAddress targetAddress, int targetPort, Object serializer) { - return new CollectStreamTableSink(targetAddress, targetPort, (TypeSerializer<Tuple2<Boolean, Row >>) serializer); - } - - @Override - public List collectToList(Object table) throws Exception { - return TableUtils.collectToList((Table) table); - } - - @Override - public void startMultipleInsert(Object tblEnv, InterpreterContext context) throws Exception { - - } - - @Override - public void addInsertStatement(String sql, Object tblEnv, InterpreterContext context) throws Exception { - ((TableEnvironment) tblEnv).sqlUpdate(sql); - } - - @Override - public boolean executeMultipleInsertInto(String jobName, Object tblEnv, InterpreterContext context) throws Exception { - ((TableEnvironment) tblEnv).execute(jobName); - return true; - } - - @Override - public boolean rowEquals(Object row1, Object row2) { - return ((Row)row1).equals((Row) row2); - } - - public Object fromDataSet(Object btenv, Object ds) { - return Flink110ScalaShims.fromDataSet((BatchTableEnvironment) btenv, (DataSet) ds); - } - - @Override - public Object toDataSet(Object btenv, Object table) { - return Flink110ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) table); - } - - @Override - public void registerTableSink(Object stenv, String tableName, Object collectTableSink) { - ((TableEnvironment) stenv).registerTableSink(tableName, (TableSink) collectTableSink); - } - - @Override - public void registerScalarFunction(Object btenv, String name, Object scalarFunction) { - ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, (ScalarFunction) scalarFunction); - } - - @Override - public void registerTableFunction(Object btenv, String name, Object tableFunction) { - ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, (TableFunction) tableFunction); - } - - @Override - public void registerAggregateFunction(Object btenv, String name, Object aggregateFunction) { - ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, (AggregateFunction) aggregateFunction); - } - - @Override - public void registerTableAggregateFunction(Object btenv, String name, Object tableAggregateFunction) { - ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, (TableAggregateFunction) tableAggregateFunction); - } - - @Override - public Optional<SqlCommandParser.SqlCommandCall> parseSql(Object tableEnv, String stmt) { - // parse - for (SqlCommandParser.SqlCommand cmd : SqlCommandParser.SqlCommand.values()) { - if (cmd.pattern == null){ - continue; - } - final Matcher matcher = cmd.pattern.matcher(stmt); - if (matcher.matches()) { - final String[] groups = new String[matcher.groupCount()]; - for (int i = 0; i < groups.length; i++) { - groups[i] = matcher.group(i + 1); - } - if (cmd == SqlCommandParser.SqlCommand.EXPLAIN) { - String[] operands = cmd.operandConverter.apply(groups).get(); - if (operands[0].equalsIgnoreCase("select")) { - // flink 1.10 only suppports explain select statement. - String[] newOperands = new String[]{operands[0] + " " + operands[1]}; - return Optional.of(new SqlCommandParser.SqlCommandCall(cmd, newOperands, stmt)); - } else { - return Optional.empty(); - } - } else { - return cmd.operandConverter.apply(groups) - .map((operands) -> new SqlCommandParser.SqlCommandCall(cmd, operands, stmt)); - } - } - } - return Optional.empty(); - } - - @Override - public void executeSql(Object tableEnv, String sql) { - throw new RuntimeException("Should not be called for flink 1.10"); - } - - @Override - public String explain(Object tableEnv, String sql) { - Table table = ((TableEnvironment) tableEnv).sqlQuery(sql); - return ((TableEnvironment) tableEnv).explain(table); - } - - @Override - public String sqlHelp() { - return MESSAGE_HELP.toString(); - } - - @Override - public void setCatalogManagerSchemaResolver(Object catalogManager, - Object parser, - Object environmentSetting) { - // do nothing for flink 1.10 - } - - @Override - public Object updateEffectiveConfig(Object cliFrontend, Object commandLine, Object effectiveConfig) { - CustomCommandLine customCommandLine = null; - try { - customCommandLine = ((CliFrontend) cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine); - } catch (NoSuchMethodError e) { - try { - Method method = CliFrontend.class.getMethod("getActiveCustomCommandLine", CommandLine.class); - customCommandLine = (CustomCommandLine) method.invoke((CliFrontend) cliFrontend, commandLine); - } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ex) { - LOGGER.error("Fail to call getCustomCli", ex); - throw new RuntimeException("Fail to call getCustomCli", ex); - } - } - try { - return customCommandLine.applyCommandLineOptionsToConfiguration((CommandLine) commandLine); - } catch (FlinkException e) { - throw new RuntimeException("Fail to call applyCommandLineOptionsToConfiguration", e); - } - } - - @Override - public Map extractTableConfigOptions() { - Map<String, ConfigOption> configOptions = new HashMap<>(); - configOptions.putAll(extractConfigOptions(ExecutionConfigOptions.class)); - configOptions.putAll(extractConfigOptions(OptimizerConfigOptions.class)); - try { - configOptions.putAll(extractConfigOptions(PythonOptions.class)); - } catch (NoClassDefFoundError e) { - LOGGER.warn("No pyflink jars found"); - } - return configOptions; - } - - private Map<String, ConfigOption> extractConfigOptions(Class clazz) { - Map<String, ConfigOption> configOptions = new HashMap(); - Field[] fields = clazz.getDeclaredFields(); - for (Field field : fields) { - if (field.getType().isAssignableFrom(ConfigOption.class)) { - try { - ConfigOption configOption = (ConfigOption) field.get(ConfigOption.class); - configOptions.put(configOption.key(), configOption); - } catch (Throwable e) { - LOGGER.warn("Fail to get ConfigOption", e); - } - } - } - return configOptions; - } - - @Override - public String[] rowToString(Object row, Object table, Object tableConfig) { - return rowToString((Row) row); - } - - private String[] rowToString(Row row) { - final String[] fields = new String[row.getArity()]; - for (int i = 0; i < row.getArity(); i++) { - final Object field = row.getField(i); - if (field == null) { - fields[i] = "(NULL)"; - } else { - fields[i] = EncodingUtils.objectToString(field); - } - } - return fields; - } - - public boolean isTimeIndicatorType(Object type) { - return FlinkTypeFactory.isTimeIndicatorType((TypeInformation<?>) type); - } - - private Object lookupExecutor(ClassLoader classLoader, - Object settings, - Object sEnv) { - try { - Map<String, String> executorProperties = ((EnvironmentSettings) settings).toExecutorProperties(); - ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties); - Method createMethod = executorFactory.getClass() - .getMethod("create", Map.class, StreamExecutionEnvironment.class); - - return (Executor) createMethod.invoke( - executorFactory, - executorProperties, - (StreamExecutionEnvironment) sEnv); - } catch (Exception e) { - throw new TableException( - "Could not instantiate the executor. Make sure a planner module is on the classpath", - e); - } - } - - @Override - public ImmutablePair<Object, Object> createPlannerAndExecutor( - ClassLoader classLoader, Object environmentSettings, Object sEnv, - Object tableConfig, Object functionCatalog, Object catalogManager) { - EnvironmentSettings settings = (EnvironmentSettings) environmentSettings; - Executor executor = (Executor) lookupExecutor(classLoader, settings, sEnv); - Map<String, String> plannerProperties = settings.toPlannerProperties(); - Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) - .create(plannerProperties, executor, (TableConfig) tableConfig, - (FunctionCatalog) functionCatalog, - (CatalogManager) catalogManager); - return ImmutablePair.of(planner, executor); - } -} diff --git a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/shims110/CollectStreamTableSink.java b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/shims110/CollectStreamTableSink.java deleted file mode 100644 index 925e3a7..0000000 --- a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/shims110/CollectStreamTableSink.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.flink.shims110; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.experimental.CollectSink; -import org.apache.flink.table.sinks.RetractStreamTableSink; -import org.apache.flink.types.Row; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InetAddress; -import java.util.UUID; - -/** - * Table sink for collecting the results locally using sockets. - */ -public class CollectStreamTableSink implements RetractStreamTableSink<Row> { - - private static final Logger LOGGER = LoggerFactory.getLogger(CollectStreamTableSink.class); - - private final InetAddress targetAddress; - private final int targetPort; - private final TypeSerializer<Tuple2<Boolean, Row>> serializer; - - private String[] fieldNames; - private TypeInformation<?>[] fieldTypes; - - public CollectStreamTableSink(InetAddress targetAddress, - int targetPort, - TypeSerializer<Tuple2<Boolean, Row>> serializer) { - LOGGER.info("Use address: " + targetAddress.getHostAddress() + ":" + targetPort); - this.targetAddress = targetAddress; - this.targetPort = targetPort; - this.serializer = serializer; - } - - @Override - public String[] getFieldNames() { - return fieldNames; - } - - @Override - public TypeInformation<?>[] getFieldTypes() { - return fieldTypes; - } - - @Override - public CollectStreamTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { - final CollectStreamTableSink copy = - new CollectStreamTableSink(targetAddress, targetPort, serializer); - copy.fieldNames = fieldNames; - copy.fieldTypes = fieldTypes; - return copy; - } - - @Override - public TypeInformation<Row> getRecordType() { - return Types.ROW_NAMED(fieldNames, fieldTypes); - } - - @Override - public void emitDataStream(DataStream<Tuple2<Boolean, Row>> stream) { - consumeDataStream(stream); - } - - @Override - public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> stream) { - // add sink - return stream - .addSink(new CollectSink<>(targetAddress, targetPort, serializer)) - .name("Zeppelin Flink Sql Stream Collect Sink " + UUID.randomUUID()) - .setParallelism(1); - } - - @Override - public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() { - return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType()); - } -} diff --git a/flink/flink1.10-shims/src/main/scala/org/apache/zeppelin/flink/shims110/Flink110ScalaShims.scala b/flink/flink1.10-shims/src/main/scala/org/apache/zeppelin/flink/shims110/Flink110ScalaShims.scala deleted file mode 100644 index 9be7b8a..0000000 --- a/flink/flink1.10-shims/src/main/scala/org/apache/zeppelin/flink/shims110/Flink110ScalaShims.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.flink.shims110 - -import org.apache.flink.api.scala.DataSet -import org.apache.flink.table.api.Table -import org.apache.flink.table.api.scala.BatchTableEnvironment -import org.apache.flink.types.Row -import org.apache.flink.streaming.api.scala._ - - -object Flink110ScalaShims { - - def fromDataSet(btenv: BatchTableEnvironment, ds: DataSet[_]): Table = { - btenv.fromDataSet(ds) - } - - def toDataSet(btenv: BatchTableEnvironment, table: Table): DataSet[Row] = { - btenv.toDataSet[Row](table) - } -} diff --git a/flink/flink1.11-shims/pom.xml b/flink/flink1.11-shims/pom.xml deleted file mode 100644 index 04dbf01..0000000 --- a/flink/flink1.11-shims/pom.xml +++ /dev/null @@ -1,211 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <artifactId>flink-parent</artifactId> - <groupId>org.apache.zeppelin</groupId> - <version>0.11.0-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - - <modelVersion>4.0.0</modelVersion> - <artifactId>flink1.11-shims</artifactId> - <packaging>jar</packaging> - <name>Zeppelin: Flink1.11 Shims</name> - - <properties> - <flink.version>${flink1.11.version}</flink.version> - </properties> - - <dependencies> - - <dependency> - <groupId>org.apache.zeppelin</groupId> - <artifactId>flink-shims</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-clients_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-api-scala_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-api-scala-bridge_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-api-java-bridge_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-scala_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-scala_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>org.reflections</groupId> - <artifactId>reflections</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-python_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - - </dependencies> - - <build> - <plugins> - - <plugin> - <groupId>net.alchim31.maven</groupId> - <artifactId>scala-maven-plugin</artifactId> - <executions> - <execution> - <id>eclipse-add-source</id> - <goals> - <goal>add-source</goal> - </goals> - </execution> - <execution> - <id>scala-compile-first</id> - <phase>process-resources</phase> - <goals> - <goal>compile</goal> - </goals> - </execution> - <execution> - <id>scala-test-compile-first</id> - <phase>process-test-resources</phase> - <goals> - <goal>testCompile</goal> - </goals> - </execution> - </executions> - <configuration> - <scalaVersion>${flink.scala.version}</scalaVersion> - <args> - <arg>-unchecked</arg> - <arg>-deprecation</arg> - <arg>-feature</arg> - <arg>-target:jvm-1.8</arg> - </args> - <jvmArgs> - <jvmArg>-Xms1024m</jvmArg> - <jvmArg>-Xmx1024m</jvmArg> - <jvmArg>-XX:MaxMetaspaceSize=${MaxMetaspace}</jvmArg> - </jvmArgs> - <javacArgs> - <javacArg>-source</javacArg> - <javacArg>${java.version}</javacArg> - <javacArg>-target</javacArg> - <javacArg>${java.version}</javacArg> - <javacArg>-Xlint:all,-serial,-path,-options</javacArg> - </javacArgs> - </configuration> - </plugin> - - <plugin> - <artifactId>maven-resources-plugin</artifactId> - <executions> - <execution> - <id>copy-interpreter-setting</id> - <phase>none</phase> - <configuration> - <skip>true</skip> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - -</project> \ No newline at end of file diff --git a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java deleted file mode 100644 index 64979fd..0000000 --- a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java +++ /dev/null @@ -1,526 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.flink; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.compress.utils.Lists; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.JobStatus; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.scala.DataSet; -import org.apache.flink.client.cli.CliFrontend; -import org.apache.flink.client.cli.CustomCommandLine; -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.core.execution.JobClient; -import org.apache.flink.python.PythonOptions; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.StatementSet; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.TableException; -import org.apache.flink.table.api.TableResult; -import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; -import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment; -import org.apache.flink.table.api.config.ExecutionConfigOptions; -import org.apache.flink.table.api.config.OptimizerConfigOptions; -import org.apache.flink.table.api.config.TableConfigOptions; -import org.apache.flink.table.api.internal.TableEnvironmentInternal; -import org.apache.flink.table.api.internal.CatalogTableSchemaResolver; -import org.apache.flink.table.catalog.CatalogManager; -import org.apache.flink.table.catalog.FunctionCatalog; -import org.apache.flink.table.catalog.GenericInMemoryCatalog; -import org.apache.flink.table.delegation.Executor; -import org.apache.flink.table.delegation.ExecutorFactory; -import org.apache.flink.table.delegation.Parser; -import org.apache.flink.table.delegation.Planner; -import org.apache.flink.table.delegation.PlannerFactory; -import org.apache.flink.table.factories.ComponentFactoryService; -import org.apache.flink.table.functions.AggregateFunction; -import org.apache.flink.table.functions.ScalarFunction; -import org.apache.flink.table.functions.TableAggregateFunction; -import org.apache.flink.table.functions.TableFunction; -import org.apache.flink.table.operations.CatalogSinkModifyOperation; -import org.apache.flink.table.operations.DescribeTableOperation; -import org.apache.flink.table.operations.ExplainOperation; -import org.apache.flink.table.operations.Operation; -import org.apache.flink.table.operations.QueryOperation; -import org.apache.flink.table.operations.ShowCatalogsOperation; -import org.apache.flink.table.operations.ShowDatabasesOperation; -import org.apache.flink.table.operations.ShowFunctionsOperation; -import org.apache.flink.table.operations.ShowTablesOperation; -import org.apache.flink.table.operations.UseCatalogOperation; -import org.apache.flink.table.operations.UseDatabaseOperation; -import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation; -import org.apache.flink.table.operations.ddl.AlterDatabaseOperation; -import org.apache.flink.table.operations.ddl.AlterTableOperation; -import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation; -import org.apache.flink.table.operations.ddl.CreateCatalogOperation; -import org.apache.flink.table.operations.ddl.CreateDatabaseOperation; -import org.apache.flink.table.operations.ddl.CreateTableOperation; -import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation; -import org.apache.flink.table.operations.ddl.CreateViewOperation; -import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation; -import org.apache.flink.table.operations.ddl.DropCatalogOperation; -import org.apache.flink.table.operations.ddl.DropDatabaseOperation; -import org.apache.flink.table.operations.ddl.DropTableOperation; -import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation; -import org.apache.flink.table.operations.ddl.DropViewOperation; -import org.apache.flink.table.planner.calcite.FlinkTypeFactory; -import org.apache.flink.table.sinks.TableSink; -import org.apache.flink.table.utils.PrintUtils; -import org.apache.flink.types.Row; -import org.apache.flink.types.RowKind; -import org.apache.flink.util.FlinkException; -import org.apache.zeppelin.flink.shims111.CollectStreamTableSink; -import org.apache.zeppelin.flink.shims111.Flink111ScalaShims; -import org.apache.zeppelin.flink.sql.SqlCommandParser; -import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand; -import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommandCall; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.jline.utils.AttributedString; -import org.jline.utils.AttributedStringBuilder; -import org.jline.utils.AttributedStyle; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.net.InetAddress; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; -import java.util.regex.Matcher; - - -/** - * Shims for flink 1.11 - */ -public class Flink111Shims extends FlinkShims { - - private static final Logger LOGGER = LoggerFactory.getLogger(Flink111Shims.class); - public static final AttributedString MESSAGE_HELP = new AttributedStringBuilder() - .append("The following commands are available:\n\n") - .append(formatCommand(SqlCommand.CREATE_TABLE, "Create table under current catalog and database.")) - .append(formatCommand(SqlCommand.DROP_TABLE, "Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'")) - .append(formatCommand(SqlCommand.CREATE_VIEW, "Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'")) - .append(formatCommand(SqlCommand.DESCRIBE, "Describes the schema of a table with the given name.")) - .append(formatCommand(SqlCommand.DROP_VIEW, "Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'")) - .append(formatCommand(SqlCommand.EXPLAIN, "Describes the execution plan of a query or table with the given name.")) - .append(formatCommand(SqlCommand.HELP, "Prints the available commands.")) - .append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the results of a SQL SELECT query into a declared table sink.")) - .append(formatCommand(SqlCommand.INSERT_OVERWRITE, "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.")) - .append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster.")) - .append(formatCommand(SqlCommand.SET, "Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties.")) - .append(formatCommand(SqlCommand.SHOW_FUNCTIONS, "Shows all user-defined and built-in functions.")) - .append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered tables.")) - .append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query from a file and executes it on the Flink cluster.")) - .append(formatCommand(SqlCommand.USE_CATALOG, "Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'")) - .append(formatCommand(SqlCommand.USE, "Sets the current default database. Experimental! Syntax: 'USE <name>;'")) - .style(AttributedStyle.DEFAULT.underline()) - .append("\nHint") - .style(AttributedStyle.DEFAULT) - .append(": Make sure that a statement ends with ';' for finalizing (multi-line) statements.") - .toAttributedString(); - - private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>(); - - public Flink111Shims(FlinkVersion flinkVersion, Properties properties) { - super(flinkVersion, properties); - } - @Override - public void disableSysoutLogging(Object batchConfig, Object streamConfig) { - ((ExecutionConfig) batchConfig).disableSysoutLogging(); - ((ExecutionConfig) streamConfig).disableSysoutLogging(); - } - - @Override - public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) { - return new StreamExecutionEnvironmentFactory() { - @Override - public StreamExecutionEnvironment createExecutionEnvironment() { - return (StreamExecutionEnvironment) streamExecutionEnvironment; - } - }; - } - - @Override - public Object createCatalogManager(Object config) { - return CatalogManager.newBuilder() - .classLoader(Thread.currentThread().getContextClassLoader()) - .config((ReadableConfig) config) - .defaultCatalog("default_catalog", - new GenericInMemoryCatalog("default_catalog", "default_database")) - .build(); - } - - @Override - public String getPyFlinkPythonPath(Properties properties) throws IOException { - String flinkHome = System.getenv("FLINK_HOME"); - if (flinkHome != null) { - List<File> depFiles = null; - depFiles = Arrays.asList(new File(flinkHome + "/opt/python").listFiles()); - StringBuilder builder = new StringBuilder(); - for (File file : depFiles) { - LOGGER.info("Adding extracted file to PYTHONPATH: " + file.getAbsolutePath()); - builder.append(file.getAbsolutePath() + ":"); - } - return builder.toString(); - } else { - throw new IOException("No FLINK_HOME is specified"); - } - } - - @Override - public Object getCollectStreamTableSink(InetAddress targetAddress, int targetPort, Object serializer) { - return new CollectStreamTableSink(targetAddress, targetPort, (TypeSerializer<Tuple2<Boolean, Row>>) serializer); - } - - @Override - public List collectToList(Object table) throws Exception { - return Lists.newArrayList(((Table) table).execute().collect()); - } - - @Override - public void startMultipleInsert(Object tblEnv, InterpreterContext context) throws Exception { - StatementSet statementSet = ((TableEnvironment) tblEnv).createStatementSet(); - statementSetMap.put(context.getParagraphId(), statementSet); - } - - @Override - public void addInsertStatement(String sql, Object tblEnv, InterpreterContext context) throws Exception { - statementSetMap.get(context.getParagraphId()).addInsertSql(sql); - } - - @Override - public boolean executeMultipleInsertInto(String jobName, Object tblEnv, InterpreterContext context) throws Exception { - JobClient jobClient = statementSetMap.get(context.getParagraphId()).execute().getJobClient().get(); - while (!jobClient.getJobStatus().get().isTerminalState()) { - LOGGER.debug("Wait for job to finish"); - Thread.sleep(1000 * 5); - } - if (jobClient.getJobStatus().get() == JobStatus.CANCELED) { - context.out.write("Job is cancelled.\n"); - return false; - } - return true; - } - - @Override - public boolean rowEquals(Object row1, Object row2) { - Row r1 = (Row) row1; - Row r2 = (Row) row2; - r1.setKind(RowKind.INSERT); - r2.setKind(RowKind.INSERT); - return r1.equals(r2); - } - - @Override - public Object fromDataSet(Object btenv, Object ds) { - return Flink111ScalaShims.fromDataSet((BatchTableEnvironment) btenv, (DataSet) ds); - } - - @Override - public Object toDataSet(Object btenv, Object table) { - return Flink111ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) table); - } - - @Override - public void registerTableSink(Object stenv, String tableName, Object collectTableSink) { - ((org.apache.flink.table.api.internal.TableEnvironmentInternal) stenv) - .registerTableSinkInternal(tableName, (TableSink) collectTableSink); - } - - @Override - public void registerScalarFunction(Object btenv, String name, Object scalarFunction) { - ((StreamTableEnvironmentImpl)(btenv)).createTemporarySystemFunction(name, (ScalarFunction) scalarFunction); - } - - @Override - public void registerTableFunction(Object btenv, String name, Object tableFunction) { - ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, (TableFunction) tableFunction); - } - - @Override - public void registerAggregateFunction(Object btenv, String name, Object aggregateFunction) { - ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, (AggregateFunction) aggregateFunction); - } - - @Override - public void registerTableAggregateFunction(Object btenv, String name, Object tableAggregateFunction) { - ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, (TableAggregateFunction) tableAggregateFunction); - } - - /** - * Parse it via flink SqlParser first, then fallback to regular expression matching. - * - * @param tableEnv - * @param stmt - * @return - */ - @Override - public Optional<SqlCommandParser.SqlCommandCall> parseSql(Object tableEnv, String stmt) { - Parser sqlParser = ((TableEnvironmentInternal) tableEnv).getParser(); - SqlCommandCall sqlCommandCall = null; - try { - // parse statement via regex matching first - Optional<SqlCommandCall> callOpt = parseByRegexMatching(stmt); - if (callOpt.isPresent()) { - sqlCommandCall = callOpt.get(); - } else { - sqlCommandCall = parseBySqlParser(sqlParser, stmt); - } - } catch (Exception e) { - return Optional.empty(); - } - return Optional.of(sqlCommandCall); - - } - - private SqlCommandCall parseBySqlParser(Parser sqlParser, String stmt) throws Exception { - List<Operation> operations; - try { - operations = sqlParser.parse(stmt); - } catch (Throwable e) { - throw new Exception("Invalidate SQL statement.", e); - } - if (operations.size() != 1) { - throw new Exception("Only single statement is supported now."); - } - - final SqlCommand cmd; - String[] operands = new String[]{stmt}; - Operation operation = operations.get(0); - if (operation instanceof CatalogSinkModifyOperation) { - boolean overwrite = ((CatalogSinkModifyOperation) operation).isOverwrite(); - cmd = overwrite ? SqlCommand.INSERT_OVERWRITE : SqlCommand.INSERT_INTO; - } else if (operation instanceof CreateTableOperation) { - cmd = SqlCommand.CREATE_TABLE; - } else if (operation instanceof DropTableOperation) { - cmd = SqlCommand.DROP_TABLE; - } else if (operation instanceof AlterTableOperation) { - cmd = SqlCommand.ALTER_TABLE; - } else if (operation instanceof CreateViewOperation) { - cmd = SqlCommand.CREATE_VIEW; - } else if (operation instanceof DropViewOperation) { - cmd = SqlCommand.DROP_VIEW; - } else if (operation instanceof CreateDatabaseOperation) { - cmd = SqlCommand.CREATE_DATABASE; - } else if (operation instanceof DropDatabaseOperation) { - cmd = SqlCommand.DROP_DATABASE; - } else if (operation instanceof AlterDatabaseOperation) { - cmd = SqlCommand.ALTER_DATABASE; - } else if (operation instanceof CreateCatalogOperation) { - cmd = SqlCommand.CREATE_CATALOG; - } else if (operation instanceof DropCatalogOperation) { - cmd = SqlCommand.DROP_CATALOG; - } else if (operation instanceof UseCatalogOperation) { - cmd = SqlCommand.USE_CATALOG; - operands = new String[]{((UseCatalogOperation) operation).getCatalogName()}; - } else if (operation instanceof UseDatabaseOperation) { - cmd = SqlCommand.USE; - operands = new String[]{((UseDatabaseOperation) operation).getDatabaseName()}; - } else if (operation instanceof ShowCatalogsOperation) { - cmd = SqlCommand.SHOW_CATALOGS; - operands = new String[0]; - } else if (operation instanceof ShowDatabasesOperation) { - cmd = SqlCommand.SHOW_DATABASES; - operands = new String[0]; - } else if (operation instanceof ShowTablesOperation) { - cmd = SqlCommand.SHOW_TABLES; - operands = new String[0]; - } else if (operation instanceof ShowFunctionsOperation) { - cmd = SqlCommand.SHOW_FUNCTIONS; - operands = new String[0]; - } else if (operation instanceof CreateCatalogFunctionOperation || - operation instanceof CreateTempSystemFunctionOperation) { - cmd = SqlCommand.CREATE_FUNCTION; - } else if (operation instanceof DropCatalogFunctionOperation || - operation instanceof DropTempSystemFunctionOperation) { - cmd = SqlCommand.DROP_FUNCTION; - } else if (operation instanceof AlterCatalogFunctionOperation) { - cmd = SqlCommand.ALTER_FUNCTION; - } else if (operation instanceof ExplainOperation) { - cmd = SqlCommand.EXPLAIN; - } else if (operation instanceof DescribeTableOperation) { - cmd = SqlCommand.DESCRIBE; - operands = new String[]{((DescribeTableOperation) operation).getSqlIdentifier().asSerializableString()}; - } else if (operation instanceof QueryOperation) { - cmd = SqlCommand.SELECT; - } else { - throw new Exception("Unknown operation: " + operation.asSummaryString()); - } - - return new SqlCommandCall(cmd, operands, stmt); - } - - private static Optional<SqlCommandCall> parseByRegexMatching(String stmt) { - // parse statement via regex matching - for (SqlCommand cmd : SqlCommand.values()) { - if (cmd.pattern != null) { - final Matcher matcher = cmd.pattern.matcher(stmt); - if (matcher.matches()) { - final String[] groups = new String[matcher.groupCount()]; - for (int i = 0; i < groups.length; i++) { - groups[i] = matcher.group(i + 1); - } - return cmd.operandConverter.apply(groups) - .map((operands) -> { - String[] newOperands = operands; - if (cmd == SqlCommand.EXPLAIN) { - // convert `explain xx` to `explain plan for xx` - // which can execute through executeSql method - newOperands = new String[]{"EXPLAIN PLAN FOR " + operands[0] + " " + operands[1]}; - } - return new SqlCommandCall(cmd, newOperands, stmt); - }); - } - } - } - return Optional.empty(); - } - - @Override - public void executeSql(Object tableEnv, String sql) { - ((TableEnvironment) tableEnv).executeSql(sql); - } - - @Override - public String explain(Object tableEnv, String sql) { - TableResult tableResult = ((TableEnvironment) tableEnv).executeSql(sql); - return tableResult.collect().next().getField(0).toString(); - } - - @Override - public String sqlHelp() { - return MESSAGE_HELP.toString(); - } - - /** - * Flink 1.11 bind CatalogManager with parser which make blink and flink could not share the same CatalogManager. - * This is a workaround which always reset CatalogTableSchemaResolver before running any flink code. - * @param catalogManager - * @param parserObject - * @param environmentSetting - */ - @Override - public void setCatalogManagerSchemaResolver(Object catalogManager, - Object parserObject, - Object environmentSetting) { - ((CatalogManager) catalogManager).setCatalogTableSchemaResolver( - new CatalogTableSchemaResolver((Parser)parserObject, - ((EnvironmentSettings)environmentSetting).isStreamingMode())); - } - - @Override - public Object updateEffectiveConfig(Object cliFrontend, Object commandLine, Object effectiveConfig) { - CustomCommandLine customCommandLine = ((CliFrontend)cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine); - try { - return customCommandLine.applyCommandLineOptionsToConfiguration((CommandLine) commandLine); - } catch (FlinkException e) { - throw new RuntimeException("Fail to call applyCommandLineOptionsToConfiguration", e); - } - } - - @Override - public Map extractTableConfigOptions() { - Map<String, ConfigOption> configOptions = new HashMap<>(); - configOptions.putAll(extractConfigOptions(ExecutionConfigOptions.class)); - configOptions.putAll(extractConfigOptions(OptimizerConfigOptions.class)); - try { - configOptions.putAll(extractConfigOptions(PythonOptions.class)); - } catch (NoClassDefFoundError e) { - LOGGER.warn("No pyflink jars found"); - } - configOptions.putAll(extractConfigOptions(TableConfigOptions.class)); - return configOptions; - } - - private Map<String, ConfigOption> extractConfigOptions(Class clazz) { - Map<String, ConfigOption> configOptions = new HashMap(); - Field[] fields = clazz.getDeclaredFields(); - for (Field field : fields) { - if (field.getType().isAssignableFrom(ConfigOption.class)) { - try { - ConfigOption configOption = (ConfigOption) field.get(ConfigOption.class); - configOptions.put(configOption.key(), configOption); - } catch (Throwable e) { - LOGGER.warn("Fail to get ConfigOption", e); - } - } - } - return configOptions; - } - - @Override - public String[] rowToString(Object row, Object table, Object tableConfig) { - return PrintUtils.rowToString((Row) row); - } - - public boolean isTimeIndicatorType(Object type) { - return FlinkTypeFactory.isTimeIndicatorType((TypeInformation<?>) type); - } - - private Object lookupExecutor(ClassLoader classLoader, - Object settings, - Object sEnv) { - try { - Map<String, String> executorProperties = ((EnvironmentSettings) settings).toExecutorProperties(); - ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties); - Method createMethod = executorFactory.getClass() - .getMethod("create", Map.class, StreamExecutionEnvironment.class); - - return (Executor) createMethod.invoke( - executorFactory, - executorProperties, - (StreamExecutionEnvironment) sEnv); - } catch (Exception e) { - throw new TableException( - "Could not instantiate the executor. Make sure a planner module is on the classpath", - e); - } - } - - @Override - public ImmutablePair<Object, Object> createPlannerAndExecutor( - ClassLoader classLoader, Object environmentSettings, Object sEnv, - Object tableConfig, Object functionCatalog, Object catalogManager) { - EnvironmentSettings settings = (EnvironmentSettings) environmentSettings; - Executor executor = (Executor) lookupExecutor(classLoader, settings, sEnv); - Map<String, String> plannerProperties = settings.toPlannerProperties(); - Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) - .create(plannerProperties, executor, (TableConfig) tableConfig, - (FunctionCatalog) functionCatalog, - (CatalogManager) catalogManager); - return ImmutablePair.of(planner, executor); - - } -} diff --git a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/shims111/CollectStreamTableSink.java b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/shims111/CollectStreamTableSink.java deleted file mode 100644 index b98f406..0000000 --- a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/shims111/CollectStreamTableSink.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.flink.shims111; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.experimental.CollectSink; -import org.apache.flink.table.sinks.RetractStreamTableSink; -import org.apache.flink.types.Row; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InetAddress; -import java.util.UUID; - -/** - * Table sink for collecting the results locally using sockets. - */ -public class CollectStreamTableSink implements RetractStreamTableSink<Row> { - - private static final Logger LOGGER = LoggerFactory.getLogger(CollectStreamTableSink.class); - - private final InetAddress targetAddress; - private final int targetPort; - private final TypeSerializer<Tuple2<Boolean, Row>> serializer; - - private String[] fieldNames; - private TypeInformation<?>[] fieldTypes; - - public CollectStreamTableSink(InetAddress targetAddress, - int targetPort, - TypeSerializer<Tuple2<Boolean, Row>> serializer) { - LOGGER.info("Use address: " + targetAddress.getHostAddress() + ":" + targetPort); - this.targetAddress = targetAddress; - this.targetPort = targetPort; - this.serializer = serializer; - } - - @Override - public String[] getFieldNames() { - return fieldNames; - } - - @Override - public TypeInformation<?>[] getFieldTypes() { - return fieldTypes; - } - - @Override - public CollectStreamTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { - final CollectStreamTableSink copy = - new CollectStreamTableSink(targetAddress, targetPort, serializer); - copy.fieldNames = fieldNames; - copy.fieldTypes = fieldTypes; - return copy; - } - - @Override - public TypeInformation<Row> getRecordType() { - return Types.ROW_NAMED(fieldNames, fieldTypes); - } - - @Override - public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> stream) { - // add sink - return stream - .addSink(new CollectSink<>(targetAddress, targetPort, serializer)) - .name("Zeppelin Flink Sql Stream Collect Sink " + UUID.randomUUID()) - .setParallelism(1); - } - - @Override - public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() { - return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType()); - } -} diff --git a/flink/flink1.11-shims/src/main/scala/org/apache/zeppelin/flink/shims111/Flink111ScalaShims.scala b/flink/flink1.11-shims/src/main/scala/org/apache/zeppelin/flink/shims111/Flink111ScalaShims.scala deleted file mode 100644 index abdaca2..0000000 --- a/flink/flink1.11-shims/src/main/scala/org/apache/zeppelin/flink/shims111/Flink111ScalaShims.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.flink.shims111 - -import org.apache.flink.api.scala.DataSet -import org.apache.flink.streaming.api.scala._ -import org.apache.flink.table.api.Table -import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment -import org.apache.flink.types.Row - -object Flink111ScalaShims { - - def fromDataSet(btenv: BatchTableEnvironment, ds: DataSet[_]): Table = { - btenv.fromDataSet(ds) - } - - def toDataSet(btenv: BatchTableEnvironment, table: Table): DataSet[Row] = { - btenv.toDataSet[Row](table) - } -} diff --git a/flink/pom.xml b/flink/pom.xml index 329f79e..02a9ec5 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -37,16 +37,12 @@ <module>flink-scala-2.11</module> <module>flink-scala-2.12</module> <module>flink-shims</module> - <module>flink1.10-shims</module> - <module>flink1.11-shims</module> <module>flink1.12-shims</module> <module>flink1.13-shims</module> <module>flink1.14-shims</module> </modules> <properties> - <flink1.10.version>1.10.3</flink1.10.version> - <flink1.11.version>1.11.3</flink1.11.version> <flink1.12.version>1.12.4</flink1.12.version> <flink1.13.version>1.13.2</flink1.13.version> <flink1.14.version>1.14.0</flink1.14.version> diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java deleted file mode 100644 index 21f5292..0000000 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.integration; - -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.Arrays; -import java.util.List; - -@RunWith(value = Parameterized.class) -public class FlinkIntegrationTest110 extends FlinkIntegrationTest { - - @Parameterized.Parameters - public static List<Object[]> data() { - return Arrays.asList(new Object[][]{ - {"1.10.3", "2.11"}, - {"1.10.3", "2.12"} - }); - } - - public FlinkIntegrationTest110(String flinkVersion, String scalaVersion) { - super(flinkVersion, scalaVersion); - } -} diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest111.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest111.java deleted file mode 100644 index 66fe6d8..0000000 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest111.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.integration; - -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.Arrays; -import java.util.List; - -@RunWith(value = Parameterized.class) -public class FlinkIntegrationTest111 extends FlinkIntegrationTest { - - @Parameterized.Parameters - public static List<Object[]> data() { - return Arrays.asList(new Object[][]{ - {"1.11.3", "2.11"}, - {"1.11.3", "2.12"} - }); - } - - public FlinkIntegrationTest111(String flinkVersion, String scalaVersion) { - super(flinkVersion, scalaVersion); - } -} diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java index 0fe8673..18d5b73 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java @@ -69,7 +69,7 @@ public class ZSessionIntegrationTest extends AbstractTestRestApi { notebook = TestUtils.getInstance(Notebook.class); sparkHome = DownloadUtils.downloadSpark("2.4.4", "2.7"); - flinkHome = DownloadUtils.downloadFlink("1.10.1", "2.11"); + flinkHome = DownloadUtils.downloadFlink("1.12.4", "2.11"); } @AfterClass diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest.java index 8566803..f43c34b 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest.java @@ -29,6 +29,7 @@ import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.utils.TestUtils; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,10 +46,11 @@ public abstract class ZeppelinFlinkClusterTest extends AbstractTestRestApi { private String flinkVersion; private String flinkHome; - public ZeppelinFlinkClusterTest(String flinkVersion) throws Exception { + public ZeppelinFlinkClusterTest(String flinkVersion, String scalaVersion) throws Exception { this.flinkVersion = flinkVersion; LOGGER.info("Testing FlinkVersion: " + flinkVersion); - this.flinkHome = DownloadUtils.downloadFlink(flinkVersion, "2.11"); + LOGGER.info("Testing ScalaVersion: " + scalaVersion); + this.flinkHome = DownloadUtils.downloadFlink(flinkVersion, scalaVersion); } @BeforeClass @@ -63,6 +65,7 @@ public abstract class ZeppelinFlinkClusterTest extends AbstractTestRestApi { AbstractTestRestApi.shutDown(); } + // TODO(zjffdu) Disable Temporary //@Test public void testResumeFromCheckpoint() throws Exception { @@ -96,9 +99,11 @@ public abstract class ZeppelinFlinkClusterTest extends AbstractTestRestApi { note.run(p2.getId(), false); p2.waitUntilRunning(); - Thread.sleep(30 * 1000); - TestUtils.getInstance(Notebook.class).getInterpreterSettingManager() - .getInterpreterSettingByName("flink").close(); + Thread.sleep(60 * 1000); + p2.abort(); + + // Sleep 5 seconds to ensure checkpoint info is written to note file + Thread.sleep(5 * 1000); assertTrue(p2.getConfig().toString(), p2.getConfig().get("latest_checkpoint_path").toString().contains(checkpointPath)); // run it again diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest112.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest112.java index 443b254..d0c6b46 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest112.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest112.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.integration; +import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.util.Arrays; @@ -28,11 +29,12 @@ public class ZeppelinFlinkClusterTest112 extends ZeppelinFlinkClusterTest { @Parameterized.Parameters public static List<Object[]> data() { return Arrays.asList(new Object[][]{ - {"1.12.0"} + {"1.12.4", "2.11"}, + {"1.12.4", "2.12"} }); } - public ZeppelinFlinkClusterTest112(String flinkVersion) throws Exception { - super(flinkVersion); + public ZeppelinFlinkClusterTest112(String flinkVersion, String scalaVersion) throws Exception { + super(flinkVersion, scalaVersion); } } diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest111.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest113.java similarity index 81% rename from zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest111.java rename to zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest113.java index 7b1ba0f..cc8dcd9 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest111.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest113.java @@ -24,16 +24,17 @@ import java.util.Arrays; import java.util.List; //@RunWith(value = Parameterized.class) -public class ZeppelinFlinkClusterTest111 extends ZeppelinFlinkClusterTest { +public class ZeppelinFlinkClusterTest113 extends ZeppelinFlinkClusterTest { @Parameterized.Parameters public static List<Object[]> data() { return Arrays.asList(new Object[][]{ - {"1.11.3"} + {"1.13.2", "2.11"}, + {"1.13.2", "2.12"} }); } - public ZeppelinFlinkClusterTest111(String flinkVersion) throws Exception { - super(flinkVersion); + public ZeppelinFlinkClusterTest113(String flinkVersion, String scalaVersion) throws Exception { + super(flinkVersion, scalaVersion); } } diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest110.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest114.java similarity index 81% rename from zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest110.java rename to zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest114.java index 4400706..1668491 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest110.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest114.java @@ -24,16 +24,17 @@ import java.util.Arrays; import java.util.List; //@RunWith(value = Parameterized.class) -public class ZeppelinFlinkClusterTest110 extends ZeppelinFlinkClusterTest { +public class ZeppelinFlinkClusterTest114 extends ZeppelinFlinkClusterTest { @Parameterized.Parameters public static List<Object[]> data() { return Arrays.asList(new Object[][]{ - {"1.10.2"} + {"1.14.0", "2.11"}, + {"1.14.0", "2.12"} }); } - public ZeppelinFlinkClusterTest110(String flinkVersion) throws Exception { - super(flinkVersion); + public ZeppelinFlinkClusterTest114(String flinkVersion, String scalaVersion) throws Exception { + super(flinkVersion, scalaVersion); } } diff --git a/zeppelin-interpreter-integration/src/test/resources/init_stream.scala b/zeppelin-interpreter-integration/src/test/resources/init_stream.scala index e4153be..f8d27ae 100644 --- a/zeppelin-interpreter-integration/src/test/resources/init_stream.scala +++ b/zeppelin-interpreter-integration/src/test/resources/init_stream.scala @@ -6,7 +6,7 @@ import java.util.Collections import scala.collection.JavaConversions._ senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) -senv.enableCheckpointing(15000) +senv.enableCheckpointing(5000) val data = senv.addSource(new SourceFunction[(Long, String)] with ListCheckpointed[java.lang.Long] { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java index 7aa6d6c..b8cd73f 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java @@ -556,6 +556,7 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi Map<String, String> config) throws InterpreterRPCException, TException { try { + LOGGER.info("Update paragraph config"); Note note = interpreterSettingManager.getNotebook().getNote(noteId); note.getParagraph(paragraphId).updateConfig(config); interpreterSettingManager.getNotebook().saveNote(note, AuthenticationInfo.ANONYMOUS);