This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 521acca985 [ZEPPELIN-5844] Support flink 1.16 (#4506) 521acca985 is described below commit 521acca98570b1e64b4c3ae569aad5da8423b0f7 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Tue Jan 31 20:17:30 2023 +0800 [ZEPPELIN-5844] Support flink 1.16 (#4506) * [ZEPPELIN-5844] Support flink 1.16 * Update github action * Fix CI * save * Fix flink test * Remove -B in core.yaml --- .github/workflows/core.yml | 14 +- flink/flink-scala-parent/pom.xml | 73 ++- .../org/apache/zeppelin/flink/TableEnvFactory.java | 186 +------ .../internal/ScalaShellStreamEnvironment.java | 10 + .../zeppelin/flink/FlinkScalaInterpreter.scala | 27 +- .../zeppelin/flink/internal/FlinkILoop.scala | 43 +- .../flink/FlinkStreamSqlInterpreterTest.java | 10 +- .../{init_stream.scala => init_stream.scala2} | 0 .../java/org/apache/zeppelin/flink/FlinkShims.java | 31 +- .../org/apache/zeppelin/flink/Flink112Shims.java | 59 ++- .../org/apache/zeppelin/flink/Flink113Shims.java | 59 ++- .../org/apache/zeppelin/flink/Flink114Shims.java | 59 ++- .../org/apache/zeppelin/flink/Flink115Shims.java | 60 ++- flink/flink1.16-shims/pom.xml | 207 ++++++++ .../org/apache/zeppelin/flink/Flink116Shims.java} | 105 +++- .../zeppelin/flink/Flink116SqlInterpreter.java | 590 +++++++++++++++++++++ .../java/org/apache/zeppelin/flink/PrintUtils.java | 318 +++++++++++ .../zeppelin/flink/TimestampStringUtils.java | 143 +++++ .../flink/shims116/CollectStreamTableSink.java | 97 ++++ flink/pom.xml | 10 + testing/env_python_3_with_flink_116.yml | 29 + 21 files changed, 1886 insertions(+), 244 deletions(-) diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index 4d806acf87..76380103da 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -233,7 +233,7 @@ jobs: strategy: fail-fast: false matrix: - flink: [112, 113, 114, 115] + flink: [112, 113, 114, 115, 116] steps: - name: Checkout uses: actions/checkout@v3 @@ -256,12 +256,12 @@ jobs: restore-keys: | ${{ runner.os }}-zeppelin- - name: install environment for flink before 1.15 (exclusive) - if: matrix.flink != '115' + if: matrix.flink < '115' run: | ./mvnw install -DskipTests -am -pl flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration ${MAVEN_ARGS} ./mvnw clean package -pl zeppelin-plugins -amd -DskipTests ${MAVEN_ARGS} - name: install environment for flink after 1.15 (inclusive) - if: matrix.flink == '115' + if: matrix.flink >= '115' run: | ./mvnw install -DskipTests -am -pl flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration ${MAVEN_ARGS} ./mvnw clean package -pl zeppelin-plugins -amd -DskipTests ${MAVEN_ARGS} @@ -277,10 +277,10 @@ jobs: auto-activate-base: false use-mamba: true - name: run tests for flink before 1.15 (exclusive) - if: matrix.flink != '115' - run: ./mvnw verify -pl flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -am -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration -DfailIfNoTests=false -Dtest=org.apache.zeppelin.flink.*Test,FlinkIntegrationTest${{ matrix.flink }} ${MAVEN_ARGS} - - name: run tests for flink before 1.15 (inclusive) - if: matrix.flink == '115' + if: matrix.flink < '115' + run: ./mvnw verify -pl flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration -DfailIfNoTests=false -Dtest=org.apache.zeppelin.flink.*Test,FlinkIntegrationTest${{ matrix.flink }} ${MAVEN_ARGS} + - name: run tests for flink after 1.15 (inclusive) + if: matrix.flink >= '115' run: ./mvnw verify -pl flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -am -Phadoop2 -Pintegration -DfailIfNoTests=false -Dtest=org.apache.zeppelin.flink.*Test,FlinkIntegrationTest${{ matrix.flink }} ${MAVEN_ARGS} - name: Print zeppelin logs if: always() diff --git a/flink/flink-scala-parent/pom.xml b/flink/flink-scala-parent/pom.xml index e9f364a162..8bbeebd26f 100644 --- a/flink/flink-scala-parent/pom.xml +++ b/flink/flink-scala-parent/pom.xml @@ -79,6 +79,12 @@ <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>flink1.16-shims</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.apache.zeppelin</groupId> <artifactId>zeppelin-python</artifactId> @@ -138,13 +144,6 @@ </exclusions> </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-python_${flink.scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> - </dependency> - <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> @@ -931,6 +930,12 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-python_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> </dependencies> </profile> @@ -970,6 +975,12 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-python_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> </dependencies> </profile> @@ -991,6 +1002,12 @@ <version>${flink.version}</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-python_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> </dependencies> </profile> @@ -1015,6 +1032,48 @@ <version>${flink.version}</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-python_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + </profile> + + <profile> + <id>flink-116</id> + <properties> + <flink.version>${flink1.16.version}</flink.version> + <flink.scala.version>2.12.7</flink.scala.version> + <flink.scala.binary.version>2.12</flink.scala.binary.version> + <flink.library.scala.suffix></flink.library.scala.suffix> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-sql-client</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-python</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> </dependencies> </profile> diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java index 5ec2de96eb..0328ca3936 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java @@ -20,6 +20,7 @@ package org.apache.zeppelin.flink; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableConfig; @@ -34,6 +35,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.Constructor; +import java.net.URL; +import java.util.List; +import java.util.stream.Collectors; /** * Factory class for creating flink table env for different purpose: @@ -51,6 +55,8 @@ public class TableEnvFactory { private org.apache.flink.api.scala.ExecutionEnvironment benv; private org.apache.flink.streaming.api.scala.StreamExecutionEnvironment senv; + private List<URL> userJars; + /*********************************************************************** Should use different TableConfig for different kinds of table_env otherwise it will cause conflicts after flink 1.13 @@ -73,7 +79,8 @@ public class TableEnvFactory { FlinkShims flinkShims, org.apache.flink.api.scala.ExecutionEnvironment env, org.apache.flink.streaming.api.scala.StreamExecutionEnvironment senv, - TableConfig streamTableConfig) { + TableConfig streamTableConfig, + List<URL> userJars) { this.flinkVersion = flinkVersion; this.flinkShims = flinkShims; @@ -94,7 +101,11 @@ public class TableEnvFactory { this.oldPlannerCatalogManager = (CatalogManager) flinkShims.createCatalogManager( this.oldPlannerStreamTableConfig.getConfiguration()); this.moduleManager = new ModuleManager(); - this.functionCatalog = (FunctionCatalog) flinkShims.createFunctionCatalog(streamTableConfig, catalogManager, moduleManager); + this.functionCatalog = (FunctionCatalog) flinkShims.createFunctionCatalog(streamTableConfig, + catalogManager, + moduleManager, + userJars); + this.userJars = userJars; } public TableEnvironment createScalaFlinkBatchTableEnvironment() { @@ -138,178 +149,19 @@ public class TableEnvFactory { } public TableEnvironment createScalaBlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) { - - try { - ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor( - classLoader, settings, senv.getJavaEnv(), - streamTableConfig, moduleManager, functionCatalog, catalogManager); - Planner planner = (Planner) pair.left; - Executor executor = (Executor) pair.right; - - Class<?> clazz = Class - .forName("org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl"); - try { - Constructor<?> constructor = clazz - .getConstructor( - CatalogManager.class, - ModuleManager.class, - FunctionCatalog.class, - TableConfig.class, - org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class, - Planner.class, - Executor.class, - boolean.class); - return (TableEnvironment) constructor.newInstance(catalogManager, - moduleManager, - functionCatalog, - streamTableConfig, - senv, - planner, - executor, - settings.isStreamingMode()); - } catch (NoSuchMethodException e) { - // Flink 1.11.1 change the constructor signature, FLINK-18419 - Constructor<?> constructor = clazz - .getConstructor( - CatalogManager.class, - ModuleManager.class, - FunctionCatalog.class, - TableConfig.class, - org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.class, - Planner.class, - Executor.class, - boolean.class, - ClassLoader.class); - return (TableEnvironment) constructor.newInstance(catalogManager, - moduleManager, - functionCatalog, - streamTableConfig, - senv, - planner, - executor, - settings.isStreamingMode(), - classLoader); - } - } catch (Exception e) { - throw new TableException("Fail to createScalaBlinkStreamTableEnvironment", e); - } + return (TableEnvironment) flinkShims.createScalaBlinkStreamTableEnvironment(settings, + senv.getJavaEnv(), streamTableConfig, moduleManager, functionCatalog, catalogManager, userJars, classLoader); } public TableEnvironment createJavaBlinkStreamTableEnvironment(EnvironmentSettings settings, ClassLoader classLoader) { - try { - ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor( - classLoader, settings, senv.getJavaEnv(), - streamTableConfig, moduleManager, functionCatalog, catalogManager); - Planner planner = (Planner) pair.left; - Executor executor = (Executor) pair.right; - - Class<?> clazz = Class - .forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl"); - - try { - Constructor<?> constructor = clazz - .getConstructor( - CatalogManager.class, - ModuleManager.class, - FunctionCatalog.class, - TableConfig.class, - org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class, - Planner.class, - Executor.class, - boolean.class); - return (TableEnvironment) constructor.newInstance(catalogManager, - moduleManager, - functionCatalog, - streamTableConfig, - senv.getJavaEnv(), - planner, - executor, - settings.isStreamingMode()); - } catch (NoSuchMethodException e) { - // Flink 1.11.1 change the constructor signature, FLINK-18419 - Constructor constructor = clazz - .getConstructor( - CatalogManager.class, - ModuleManager.class, - FunctionCatalog.class, - TableConfig.class, - org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.class, - Planner.class, - Executor.class, - boolean.class, - ClassLoader.class); - return (TableEnvironment) constructor.newInstance(catalogManager, - moduleManager, - functionCatalog, - streamTableConfig, - senv.getJavaEnv(), - planner, - executor, - settings.isStreamingMode(), - classLoader); - } - } catch (Exception e) { - throw new TableException("Fail to createJavaBlinkStreamTableEnvironment", e); - } + return (TableEnvironment) flinkShims.createJavaBlinkStreamTableEnvironment(settings, + senv.getJavaEnv(), streamTableConfig, moduleManager, functionCatalog, catalogManager, userJars, classLoader); } public TableEnvironment createJavaBlinkBatchTableEnvironment( EnvironmentSettings settings, ClassLoader classLoader) { - try { - ImmutablePair<Object, Object> pair = flinkShims.createPlannerAndExecutor( - classLoader, settings, senv.getJavaEnv(), - batchTableConfig, moduleManager, functionCatalog, catalogManager); - Planner planner = (Planner) pair.left; - Executor executor = (Executor) pair.right; - - Class<?> clazz = Class - .forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl"); - try { - Constructor<?> constructor = clazz.getConstructor( - CatalogManager.class, - ModuleManager.class, - FunctionCatalog.class, - TableConfig.class, - StreamExecutionEnvironment.class, - Planner.class, - Executor.class, - boolean.class); - return (TableEnvironment) constructor.newInstance( - catalogManager, - moduleManager, - functionCatalog, - batchTableConfig, - senv.getJavaEnv(), - planner, - executor, - settings.isStreamingMode()); - } catch (NoSuchMethodException e) { - // Flink 1.11.1 change the constructor signature, FLINK-18419 - Constructor<?> constructor = clazz.getConstructor( - CatalogManager.class, - ModuleManager.class, - FunctionCatalog.class, - TableConfig.class, - StreamExecutionEnvironment.class, - Planner.class, - Executor.class, - boolean.class, - ClassLoader.class); - return (TableEnvironment) constructor.newInstance( - catalogManager, - moduleManager, - functionCatalog, - batchTableConfig, - senv.getJavaEnv(), - planner, - executor, - settings.isStreamingMode(), - classLoader); - } - } catch (Exception e) { - LOGGER.info(ExceptionUtils.getStackTrace(e)); - throw new TableException("Fail to createJavaBlinkBatchTableEnvironment", e); - } + return (TableEnvironment) flinkShims.createJavaBlinkStreamTableEnvironment(settings, + senv.getJavaEnv(), batchTableConfig, moduleManager, functionCatalog, catalogManager, userJars, classLoader); } public void createStreamPlanner(EnvironmentSettings settings) { diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java index bee40a0f69..ae9fd3d0f0 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellStreamEnvironment.java @@ -59,11 +59,21 @@ public class ScalaShellStreamEnvironment extends StreamExecutionEnvironment { final Configuration configuration, final FlinkILoop flinkILoop, final FlinkVersion flinkVersion, + final ClassLoader classLoader, final String... jarFiles) { super(configuration); this.flinkILoop = checkNotNull(flinkILoop); this.flinkVersion = checkNotNull(flinkVersion); this.jarFiles = checkNotNull(JarUtils.getJarFiles(jarFiles)); + if (flinkVersion.newerThanOrEqual(FlinkVersion.fromVersionString("1.16"))) { + try { + Field field = StreamExecutionEnvironment.class.getDeclaredField("userClassloader"); + field.setAccessible(true); + field.set(this, classLoader); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException("Unable to set userClassLoader", e); + } + } } @Override diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala index 40adcffeae..d5bf2fc14e 100644 --- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala +++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala @@ -365,6 +365,8 @@ abstract class FlinkScalaInterpreter(val properties: Properties, flinkILoop.settings = settings flinkILoop.intp = createIMain(settings, replOut) + flinkILoop.initEnvironments() + flinkILoop.intp.beQuietDuring { // set execution environment flinkILoop.intp.bind("benv", flinkILoop.scalaBenv) @@ -424,29 +426,29 @@ abstract class FlinkScalaInterpreter(val properties: Properties, private def createTableEnvs(): Unit = { val originalClassLoader = Thread.currentThread().getContextClassLoader try { - Thread.currentThread().setContextClassLoader(getFlinkClassLoader) + Thread.currentThread().setContextClassLoader(getFlinkScalaShellLoader) val tableConfig = new TableConfig tableConfig.getConfiguration.addAll(configuration) this.tblEnvFactory = new TableEnvFactory(this.flinkVersion, this.flinkShims, - this.benv, this.senv, tableConfig) + this.benv, this.senv, tableConfig, this.userJars.map(new URL(_)).asJava) // blink planner - var btEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder() + val btEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder() .asInstanceOf[EnvironmentSettings.Builder] .inBatchMode() .build() - this.btenv = tblEnvFactory.createJavaBlinkBatchTableEnvironment(btEnvSetting, getFlinkClassLoader); + this.btenv = tblEnvFactory.createJavaBlinkBatchTableEnvironment(btEnvSetting, getFlinkScalaShellLoader); flinkILoop.bind("btenv", btenv.getClass().getCanonicalName(), btenv, List("@transient")) this.java_btenv = this.btenv - var stEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder() + val stEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder() .asInstanceOf[EnvironmentSettings.Builder] .inStreamingMode() .build() - this.stenv = tblEnvFactory.createScalaBlinkStreamTableEnvironment(stEnvSetting, getFlinkClassLoader) + this.stenv = tblEnvFactory.createScalaBlinkStreamTableEnvironment(stEnvSetting, getFlinkScalaShellLoader) flinkILoop.bind("stenv", stenv.getClass().getCanonicalName(), stenv, List("@transient")) - this.java_stenv = tblEnvFactory.createJavaBlinkStreamTableEnvironment(stEnvSetting, getFlinkClassLoader) + this.java_stenv = tblEnvFactory.createJavaBlinkStreamTableEnvironment(stEnvSetting, getFlinkScalaShellLoader) if (!flinkVersion.isAfterFlink114()) { // flink planner is not supported after flink 1.14 @@ -506,7 +508,7 @@ abstract class FlinkScalaInterpreter(val properties: Properties, val udfPackages = properties.getProperty("flink.udf.jars.packages", "").split(",").toSet val urls = Array(new URL("jar:file:" + jar + "!/")) - val cl = new URLClassLoader(urls, getFlinkScalaShellLoader) + val cl = new URLClassLoader(urls, getFlinkClassLoader) while (entries.hasMoreElements) { val je = entries.nextElement @@ -590,7 +592,7 @@ abstract class FlinkScalaInterpreter(val properties: Properties, def createPlannerAgain(): Unit = { val originalClassLoader = Thread.currentThread().getContextClassLoader try { - Thread.currentThread().setContextClassLoader(getFlinkClassLoader) + Thread.currentThread().setContextClassLoader(getFlinkScalaShellLoader) val stEnvSetting = this.flinkShims.createBlinkPlannerEnvSettingBuilder() .asInstanceOf[EnvironmentSettings.Builder] .inStreamingMode() @@ -844,8 +846,11 @@ abstract class FlinkScalaInterpreter(val properties: Properties, def getJobManager = this.jobManager def getFlinkScalaShellLoader: ClassLoader = { - val userCodeJarFile = this.flinkILoop.writeFilesToDisk() - new URLClassLoader(Array(userCodeJarFile.toURL) ++ userJars.map(e => new File(e).toURL)) + if (this.flinkILoop == null) { + getFlinkClassLoader + } else { + flinkILoop.classLoader; + } } private def getFlinkClassLoader: ClassLoader = { diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala index b133487749..b50135b91e 100644 --- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala +++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala @@ -49,52 +49,46 @@ class FlinkILoop( private lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass) - // remote environment - private val (remoteBenv: ScalaShellEnvironment, - remoteSenv: ScalaShellStreamEnvironment) = { + var remoteBenv: ScalaShellEnvironment = null + var remoteSenv: ScalaShellStreamEnvironment = null + var scalaBenv: ExecutionEnvironment = null + var scalaSenv: StreamExecutionEnvironment = null + + def initEnvironments(): Unit = { ScalaShellEnvironment.resetContextEnvironments() ScalaShellStreamEnvironment.resetContextEnvironments() // create our environment that submits against the cluster (local or remote) - val remoteBenv = new ScalaShellEnvironment( + remoteBenv = new ScalaShellEnvironment( flinkConfig, this, this.getExternalJars(): _*) - val remoteSenv = new ScalaShellStreamEnvironment( + remoteSenv = new ScalaShellStreamEnvironment( flinkConfig, this, flinkScalaInterpreter.getFlinkVersion, + this.classLoader, getExternalJars(): _*) - (remoteBenv,remoteSenv) - } - - // local environment - val ( - scalaBenv: ExecutionEnvironment, - scalaSenv: StreamExecutionEnvironment - ) = { if (ExecutionMode.isApplicationMode(mode)) { // For yarn application mode, ExecutionEnvironment & StreamExecutionEnvironment has already been created // by flink itself, we here just try get them via reflection and reconstruct them. - val scalaBenv = new ExecutionEnvironment(new ApplicationModeExecutionEnvironment( + scalaBenv = new ExecutionEnvironment(new ApplicationModeExecutionEnvironment( getExecutionEnvironmentField(jenv, "executorServiceLoader").asInstanceOf[PipelineExecutorServiceLoader], getExecutionEnvironmentField(jenv, "configuration").asInstanceOf[Configuration], getExecutionEnvironmentField(jenv, "userClassloader").asInstanceOf[ClassLoader], this, flinkScalaInterpreter )) - val scalaSenv = new StreamExecutionEnvironment(new ApplicationModeStreamEnvironment( + scalaSenv = new StreamExecutionEnvironment(new ApplicationModeStreamEnvironment( getStreamExecutionEnvironmentField(jsenv, "executorServiceLoader").asInstanceOf[PipelineExecutorServiceLoader], getStreamExecutionEnvironmentField(jsenv, "configuration").asInstanceOf[Configuration], getStreamExecutionEnvironmentField(jsenv, "userClassloader").asInstanceOf[ClassLoader], this, flinkScalaInterpreter )) - (scalaBenv, scalaSenv) } else { - val scalaBenv = new ExecutionEnvironment(remoteBenv) - val scalaSenv = new StreamExecutionEnvironment(remoteSenv) - (scalaBenv, scalaSenv) + scalaBenv = new ExecutionEnvironment(remoteBenv) + scalaSenv = new StreamExecutionEnvironment(remoteSenv) } } @@ -183,28 +177,20 @@ class FlinkILoop( } } val vd = intp.virtualDirectory - val vdIt = vd.iterator - for (fi <- vdIt) { if (fi.isDirectory) { - val fiIt = fi.iterator - for (f <- fiIt) { - // directory for compiled line val lineDir = new File(tmpDirShell.getAbsolutePath, fi.name) lineDir.mkdirs() - // compiled classes for commands from shell val writeFile = new File(lineDir.getAbsolutePath, f.name) val outputStream = new FileOutputStream(writeFile) val inputStream = f.input - // copy file contents org.apache.commons.io.IOUtils.copy(inputStream, outputStream) - inputStream.close() outputStream.close() } @@ -212,12 +198,9 @@ class FlinkILoop( } val compiledClasses = new File(tmpDirShell.getAbsolutePath) - val jarFilePath = new File(tmpJarShell.getAbsolutePath) - val jh: JarHelper = new JarHelper jh.jarDir(compiledClasses, jarFilePath) - jarFilePath } diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java index 49b609d745..ca49a35a15 100644 --- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java +++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java @@ -408,9 +408,9 @@ public class FlinkStreamSqlInterpreterTest extends FlinkSqlInterpreterTest { @Test public void testStreamUDF() throws IOException, InterpreterException { String initStreamScalaScript = getInitStreamScript(100); - InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript, - getInterpreterContext()); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + InterpreterContext context = getInterpreterContext(); + InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript, context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); result = flinkInterpreter.interpret( "class MyUpper extends ScalarFunction {\n" + @@ -418,7 +418,7 @@ public class FlinkStreamSqlInterpreterTest extends FlinkSqlInterpreterTest { "}\n" + "stenv.registerFunction(\"myupper\", new MyUpper())", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - InterpreterContext context = getInterpreterContext(); + context = getInterpreterContext(); result = sqlInterpreter.interpret("select myupper(url), count(1) as pv from " + "log group by url", context); assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); @@ -692,7 +692,7 @@ public class FlinkStreamSqlInterpreterTest extends FlinkSqlInterpreterTest { } public static String getInitStreamScript(int sleep_interval) throws IOException { - return IOUtils.toString(FlinkStreamSqlInterpreterTest.class.getResource("/init_stream.scala")) + return IOUtils.toString(FlinkStreamSqlInterpreterTest.class.getResource("/init_stream.scala2")) .replace("{{sleep_interval}}", sleep_interval + ""); } } diff --git a/flink/flink-scala-parent/src/test/resources/init_stream.scala b/flink/flink-scala-parent/src/test/resources/init_stream.scala2 similarity index 100% rename from flink/flink-scala-parent/src/test/resources/init_stream.scala rename to flink/flink-scala-parent/src/test/resources/init_stream.scala2 diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java index a473a0f888..a7b6e9871a 100644 --- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java +++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.lang.reflect.Constructor; import java.net.InetAddress; +import java.net.URL; import java.util.List; import java.util.Properties; @@ -65,6 +66,9 @@ public abstract class FlinkShims { } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 15) { LOGGER.info("Initializing shims for Flink 1.15"); flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink115Shims"); + } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 16) { + LOGGER.info("Initializing shims for Flink 1.16"); + flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink116Shims"); } else { throw new Exception("Flink version: '" + flinkVersion + "' is not supported yet"); } @@ -91,7 +95,10 @@ public abstract class FlinkShims { return flinkVersion; } - public abstract Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager); + public abstract Object createFunctionCatalog(Object tableConfig, + Object catalogManager, + Object moduleManager, + List<URL> jars); public abstract void initInnerBatchSqlInterpreter(FlinkSqlContext flinkSqlContext); @@ -149,6 +156,28 @@ public abstract class FlinkShims { ClassLoader classLoader, Object environmentSettings, Object sEnv, Object tableConfig, Object moduleManager, Object functionCatalog, Object catalogManager); + public abstract Object createResourceManager(List<URL> jars, Object tableConfig); + + public abstract Object createScalaBlinkStreamTableEnvironment( + Object environmentSettingsObj, + Object senvObj, + Object tableConfigObj, + Object moduleManagerObj, + Object functionCatalogObj, + Object catalogManagerObj, + List<URL> jars, + ClassLoader classLoader); + + public abstract Object createJavaBlinkStreamTableEnvironment( + Object environmentSettingsObj, + Object senvObj, + Object tableConfigObj, + Object moduleManagerObj, + Object functionCatalogObj, + Object catalogManagerObj, + List<URL> jars, + ClassLoader classLoader); + public abstract Object createBlinkPlannerEnvSettingBuilder(); public abstract Object createOldPlannerEnvSettingBuilder(); diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java index 5088a7f623..187eea0b4d 100644 --- a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java +++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java @@ -66,6 +66,7 @@ import java.io.File; import java.io.IOException; import java.lang.reflect.Method; import java.net.InetAddress; +import java.net.URL; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -96,7 +97,12 @@ public class Flink112Shims extends FlinkShims { } @Override - public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager) { + public Object createResourceManager(List<URL> jars, Object tableConfig) { + return null; + } + + @Override + public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager, List<URL> jars) { return new FunctionCatalog((TableConfig) tableConfig, (CatalogManager) catalogManager, (ModuleManager) moduleManager); } @@ -105,6 +111,57 @@ public class Flink112Shims extends FlinkShims { // do nothing } + @Override + public Object createScalaBlinkStreamTableEnvironment(Object environmentSettingsObj, + Object senvObj, + Object tableConfigObj, + Object moduleManagerObj, + Object functionCatalogObj, + Object catalogManagerObj, + List<URL> jars, + ClassLoader classLoader) { + EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj; + StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj; + TableConfig tableConfig = (TableConfig) tableConfigObj; + ModuleManager moduleManager = (ModuleManager) moduleManagerObj; + FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj; + CatalogManager catalogManager = (CatalogManager) catalogManagerObj; + ImmutablePair<Object, Object> pair = createPlannerAndExecutor( + classLoader, environmentSettings, senv, + tableConfig, moduleManager, functionCatalog, catalogManager); + Planner planner = (Planner) pair.left; + Executor executor = (Executor) pair.right; + + return new org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl(catalogManager, + moduleManager, + functionCatalog, tableConfig, new org.apache.flink.streaming.api.scala.StreamExecutionEnvironment(senv), + planner, executor, environmentSettings.isStreamingMode(), classLoader); + } + + @Override + public Object createJavaBlinkStreamTableEnvironment(Object environmentSettingsObj, + Object senvObj, + Object tableConfigObj, + Object moduleManagerObj, + Object functionCatalogObj, + Object catalogManagerObj, + List<URL> jars, + ClassLoader classLoader) { + EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj; + StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj; + TableConfig tableConfig = (TableConfig) tableConfigObj; + ModuleManager moduleManager = (ModuleManager) moduleManagerObj; + FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj; + CatalogManager catalogManager = (CatalogManager) catalogManagerObj; + ImmutablePair<Object, Object> pair = createPlannerAndExecutor( + classLoader, environmentSettings, senv, + tableConfig, moduleManager, functionCatalog, catalogManager); + Planner planner = (Planner) pair.left; + Executor executor = (Executor) pair.right; + + return new StreamTableEnvironmentImpl(catalogManager, moduleManager, + functionCatalog, tableConfig, senv, planner, executor, environmentSettings.isStreamingMode(), classLoader); + } @Override public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) { return new StreamExecutionEnvironmentFactory() { diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java index 1df3f92951..440b245796 100644 --- a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java +++ b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java @@ -69,6 +69,7 @@ import java.io.File; import java.io.IOException; import java.lang.reflect.Method; import java.net.InetAddress; +import java.net.URL; import java.time.ZoneId; import java.util.Arrays; import java.util.List; @@ -99,7 +100,12 @@ public class Flink113Shims extends FlinkShims { } @Override - public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager) { + public Object createResourceManager(List<URL> jars, Object tableConfig) { + return null; + } + + @Override + public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager, List<URL> jars) { return new FunctionCatalog((TableConfig) tableConfig, (CatalogManager) catalogManager, (ModuleManager) moduleManager); } @@ -108,6 +114,57 @@ public class Flink113Shims extends FlinkShims { // do nothing } + @Override + public Object createScalaBlinkStreamTableEnvironment(Object environmentSettingsObj, + Object senvObj, + Object tableConfigObj, + Object moduleManagerObj, + Object functionCatalogObj, + Object catalogManagerObj, + List<URL> jars, + ClassLoader classLoader) { + EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj; + StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj; + TableConfig tableConfig = (TableConfig) tableConfigObj; + ModuleManager moduleManager = (ModuleManager) moduleManagerObj; + FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj; + CatalogManager catalogManager = (CatalogManager) catalogManagerObj; + ImmutablePair<Object, Object> pair = createPlannerAndExecutor( + classLoader, environmentSettings, senv, + tableConfig, moduleManager, functionCatalog, catalogManager); + Planner planner = (Planner) pair.left; + Executor executor = (Executor) pair.right; + + return new org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl(catalogManager, + moduleManager, + functionCatalog, tableConfig, new org.apache.flink.streaming.api.scala.StreamExecutionEnvironment(senv), + planner, executor, environmentSettings.isStreamingMode(), classLoader); + } + + @Override + public Object createJavaBlinkStreamTableEnvironment(Object environmentSettingsObj, + Object senvObj, + Object tableConfigObj, + Object moduleManagerObj, + Object functionCatalogObj, + Object catalogManagerObj, + List<URL> jars, + ClassLoader classLoader) { + EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj; + StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj; + TableConfig tableConfig = (TableConfig) tableConfigObj; + ModuleManager moduleManager = (ModuleManager) moduleManagerObj; + FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj; + CatalogManager catalogManager = (CatalogManager) catalogManagerObj; + ImmutablePair<Object, Object> pair = createPlannerAndExecutor( + classLoader, environmentSettings, senv, + tableConfig, moduleManager, functionCatalog, catalogManager); + Planner planner = (Planner) pair.left; + Executor executor = (Executor) pair.right; + + return new StreamTableEnvironmentImpl(catalogManager, moduleManager, + functionCatalog, tableConfig, senv, planner, executor, environmentSettings.isStreamingMode(), classLoader); + } @Override public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) { return new StreamExecutionEnvironmentFactory() { diff --git a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java index 9660c9f469..475be0da7e 100644 --- a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java +++ b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java @@ -66,6 +66,7 @@ import java.io.File; import java.io.IOException; import java.lang.reflect.Method; import java.net.InetAddress; +import java.net.URL; import java.time.ZoneId; import java.util.Arrays; import java.util.List; @@ -95,7 +96,12 @@ public class Flink114Shims extends FlinkShims { } @Override - public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager) { + public Object createResourceManager(List<URL> jars, Object tableConfig) { + return null; + } + + @Override + public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager, List<URL> jars) { return new FunctionCatalog((TableConfig) tableConfig, (CatalogManager) catalogManager, (ModuleManager) moduleManager); } @@ -104,6 +110,57 @@ public class Flink114Shims extends FlinkShims { // do nothing } + @Override + public Object createScalaBlinkStreamTableEnvironment(Object environmentSettingsObj, + Object senvObj, + Object tableConfigObj, + Object moduleManagerObj, + Object functionCatalogObj, + Object catalogManagerObj, + List<URL> jars, + ClassLoader classLoader) { + EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj; + StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj; + TableConfig tableConfig = (TableConfig) tableConfigObj; + ModuleManager moduleManager = (ModuleManager) moduleManagerObj; + FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj; + CatalogManager catalogManager = (CatalogManager) catalogManagerObj; + ImmutablePair<Object, Object> pair = createPlannerAndExecutor( + classLoader, environmentSettings, senv, + tableConfig, moduleManager, functionCatalog, catalogManager); + Planner planner = (Planner) pair.left; + Executor executor = (Executor) pair.right; + + return new org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl(catalogManager, + moduleManager, + functionCatalog, tableConfig, new org.apache.flink.streaming.api.scala.StreamExecutionEnvironment(senv), + planner, executor, environmentSettings.isStreamingMode(), classLoader); + } + + @Override + public Object createJavaBlinkStreamTableEnvironment(Object environmentSettingsObj, + Object senvObj, + Object tableConfigObj, + Object moduleManagerObj, + Object functionCatalogObj, + Object catalogManagerObj, + List<URL> jars, + ClassLoader classLoader) { + EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj; + StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj; + TableConfig tableConfig = (TableConfig) tableConfigObj; + ModuleManager moduleManager = (ModuleManager) moduleManagerObj; + FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj; + CatalogManager catalogManager = (CatalogManager) catalogManagerObj; + ImmutablePair<Object, Object> pair = createPlannerAndExecutor( + classLoader, environmentSettings, senv, + tableConfig, moduleManager, functionCatalog, catalogManager); + Planner planner = (Planner) pair.left; + Executor executor = (Executor) pair.right; + + return new StreamTableEnvironmentImpl(catalogManager, moduleManager, + functionCatalog, tableConfig, senv, planner, executor, environmentSettings.isStreamingMode(), classLoader); + } @Override public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) { diff --git a/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115Shims.java b/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115Shims.java index 98c250773d..4ed8abf3af 100644 --- a/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115Shims.java +++ b/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115Shims.java @@ -65,6 +65,7 @@ import java.io.File; import java.io.IOException; import java.lang.reflect.Method; import java.net.InetAddress; +import java.net.URL; import java.time.ZoneId; import java.util.Arrays; import java.util.List; @@ -94,7 +95,12 @@ public class Flink115Shims extends FlinkShims { } @Override - public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager) { + public Object createResourceManager(List<URL> jars, Object tableConfig) { + return null; + } + + @Override + public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager, List<URL> jars) { return new FunctionCatalog((TableConfig) tableConfig, (CatalogManager) catalogManager, (ModuleManager) moduleManager); } @@ -103,6 +109,58 @@ public class Flink115Shims extends FlinkShims { // do nothing } + @Override + public Object createScalaBlinkStreamTableEnvironment(Object environmentSettingsObj, + Object senvObj, + Object tableConfigObj, + Object moduleManagerObj, + Object functionCatalogObj, + Object catalogManagerObj, + List<URL> jars, + ClassLoader classLoader) { + EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj; + StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj; + TableConfig tableConfig = (TableConfig) tableConfigObj; + ModuleManager moduleManager = (ModuleManager) moduleManagerObj; + FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj; + CatalogManager catalogManager = (CatalogManager) catalogManagerObj; + ImmutablePair<Object, Object> pair = createPlannerAndExecutor( + classLoader, environmentSettings, senv, + tableConfig, moduleManager, functionCatalog, catalogManager); + Planner planner = (Planner) pair.left; + Executor executor = (Executor) pair.right; + + return new org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl(catalogManager, + moduleManager, + functionCatalog, tableConfig, new org.apache.flink.streaming.api.scala.StreamExecutionEnvironment(senv), + planner, executor, environmentSettings.isStreamingMode(), classLoader); + } + + @Override + public Object createJavaBlinkStreamTableEnvironment(Object environmentSettingsObj, + Object senvObj, + Object tableConfigObj, + Object moduleManagerObj, + Object functionCatalogObj, + Object catalogManagerObj, + List<URL> jars, + ClassLoader classLoader) { + EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj; + StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj; + TableConfig tableConfig = (TableConfig) tableConfigObj; + ModuleManager moduleManager = (ModuleManager) moduleManagerObj; + FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj; + CatalogManager catalogManager = (CatalogManager) catalogManagerObj; + ImmutablePair<Object, Object> pair = createPlannerAndExecutor( + classLoader, environmentSettings, senv, + tableConfig, moduleManager, functionCatalog, catalogManager); + Planner planner = (Planner) pair.left; + Executor executor = (Executor) pair.right; + + return new StreamTableEnvironmentImpl(catalogManager, moduleManager, + functionCatalog, tableConfig, senv, planner, executor, environmentSettings.isStreamingMode(), classLoader); + } + @Override public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) { return new StreamExecutionEnvironmentFactory() { diff --git a/flink/flink1.16-shims/pom.xml b/flink/flink1.16-shims/pom.xml new file mode 100644 index 0000000000..756f64f571 --- /dev/null +++ b/flink/flink1.16-shims/pom.xml @@ -0,0 +1,207 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-parent</artifactId> + <groupId>org.apache.zeppelin</groupId> + <version>0.11.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.zeppelin</groupId> + <artifactId>flink1.16-shims</artifactId> + <version>0.11.0-SNAPSHOT</version> + <packaging>jar</packaging> + <name>Zeppelin: Flink1.16 Shims</name> + + <properties> + <flink.version>${flink1.16.version}</flink.version> + <flink.scala.binary.version>2.12</flink.scala.binary.version> + </properties> + + <dependencies> + + <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>flink-shims</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala-bridge_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-scala_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-scala_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-python</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-sql-client</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <executions> + <execution> + <id>eclipse-add-source</id> + <goals> + <goal>add-source</goal> + </goals> + </execution> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + <execution> + <id>scala-test-compile-first</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + <configuration> + <scalaVersion>${flink.scala.version}</scalaVersion> + <args> + <arg>-unchecked</arg> + <arg>-deprecation</arg> + <arg>-feature</arg> + <arg>-target:jvm-1.8</arg> + </args> + <jvmArgs> + <jvmArg>-Xms1024m</jvmArg> + <jvmArg>-Xmx1024m</jvmArg> + <jvmArg>-XX:MaxMetaspaceSize=${MaxMetaspace}</jvmArg> + </jvmArgs> + <javacArgs> + <javacArg>-source</javacArg> + <javacArg>${java.version}</javacArg> + <javacArg>-target</javacArg> + <javacArg>${java.version}</javacArg> + <javacArg>-Xlint:all,-serial,-path,-options</javacArg> + </javacArgs> + </configuration> + </plugin> + + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <executions> + <execution> + <id>copy-interpreter-setting</id> + <phase>none</phase> + <configuration> + <skip>true</skip> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115Shims.java b/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/Flink116Shims.java similarity index 68% copy from flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115Shims.java copy to flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/Flink116Shims.java index 98c250773d..b96e5f5e42 100644 --- a/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115Shims.java +++ b/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/Flink116Shims.java @@ -40,6 +40,9 @@ import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.catalog.GenericInMemoryCatalog; import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.client.resource.ClientResourceManager; +import org.apache.flink.table.client.util.ClientClassloaderUtil; +import org.apache.flink.table.client.util.ClientWrapperClassLoader; import org.apache.flink.table.delegation.Executor; import org.apache.flink.table.delegation.ExecutorFactory; import org.apache.flink.table.delegation.Planner; @@ -51,11 +54,13 @@ import org.apache.flink.table.functions.TableAggregateFunction; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.resource.ResourceManager; import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.FlinkException; -import org.apache.zeppelin.flink.shims115.CollectStreamTableSink; +import org.apache.zeppelin.flink.shims116.CollectStreamTableSink; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterResult; import org.slf4j.Logger; @@ -65,6 +70,7 @@ import java.io.File; import java.io.IOException; import java.lang.reflect.Method; import java.net.InetAddress; +import java.net.URL; import java.time.ZoneId; import java.util.Arrays; import java.util.List; @@ -72,30 +78,44 @@ import java.util.Properties; /** - * Shims for flink 1.15 + * Shims for flink 1.16 */ -public class Flink115Shims extends FlinkShims { +public class Flink116Shims extends FlinkShims { - private static final Logger LOGGER = LoggerFactory.getLogger(Flink115Shims.class); + private static final Logger LOGGER = LoggerFactory.getLogger(Flink116Shims.class); - private Flink115SqlInterpreter batchSqlInterpreter; - private Flink115SqlInterpreter streamSqlInterpreter; + private Flink116SqlInterpreter batchSqlInterpreter; + private Flink116SqlInterpreter streamSqlInterpreter; - public Flink115Shims(FlinkVersion flinkVersion, Properties properties) { + public Flink116Shims(FlinkVersion flinkVersion, Properties properties) { super(flinkVersion, properties); } public void initInnerBatchSqlInterpreter(FlinkSqlContext flinkSqlContext) { - this.batchSqlInterpreter = new Flink115SqlInterpreter(flinkSqlContext, true); + this.batchSqlInterpreter = new Flink116SqlInterpreter(flinkSqlContext, true); } public void initInnerStreamSqlInterpreter(FlinkSqlContext flinkSqlContext) { - this.streamSqlInterpreter = new Flink115SqlInterpreter(flinkSqlContext, false); + this.streamSqlInterpreter = new Flink116SqlInterpreter(flinkSqlContext, false); } @Override - public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager) { - return new FunctionCatalog((TableConfig) tableConfig, (CatalogManager) catalogManager, (ModuleManager) moduleManager); + public Object createResourceManager(List<URL> jars, Object tableConfig) { + Configuration configuration = ((TableConfig) tableConfig).getConfiguration().clone(); + ClientWrapperClassLoader userClassLoader = + new ClientWrapperClassLoader( + ClientClassloaderUtil.buildUserClassLoader( + jars, + Thread.currentThread().getContextClassLoader(), + new Configuration(configuration)), + configuration); + return new ClientResourceManager(configuration, userClassLoader); + } + + @Override + public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager, List<URL> jars) { + ResourceManager resourceManager = (ResourceManager) createResourceManager(jars, (TableConfig) tableConfig); + return new FunctionCatalog((TableConfig) tableConfig, resourceManager, (CatalogManager) catalogManager, (ModuleManager) moduleManager); } @Override @@ -103,6 +123,62 @@ public class Flink115Shims extends FlinkShims { // do nothing } + @Override + public Object createScalaBlinkStreamTableEnvironment(Object environmentSettingsObj, + Object senvObj, + Object tableConfigObj, + Object moduleManagerObj, + Object functionCatalogObj, + Object catalogManagerObj, + List<URL> jars, + ClassLoader classLoader) { + EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj; + StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj; + TableConfig tableConfig = (TableConfig) tableConfigObj; + ModuleManager moduleManager = (ModuleManager) moduleManagerObj; + FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj; + CatalogManager catalogManager = (CatalogManager) catalogManagerObj; + ImmutablePair<Object, Object> pair = createPlannerAndExecutor( + classLoader, environmentSettings, senv, + tableConfig, moduleManager, functionCatalog, catalogManager); + Planner planner = (Planner) pair.left; + Executor executor = (Executor) pair.right; + + ResourceManager resourceManager = (ResourceManager) createResourceManager(jars, tableConfig); + + return new org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl(catalogManager, + moduleManager, resourceManager, + functionCatalog, tableConfig, new org.apache.flink.streaming.api.scala.StreamExecutionEnvironment(senv), + planner, executor, environmentSettings.isStreamingMode()); + } + + @Override + public Object createJavaBlinkStreamTableEnvironment(Object environmentSettingsObj, + Object senvObj, + Object tableConfigObj, + Object moduleManagerObj, + Object functionCatalogObj, + Object catalogManagerObj, + List<URL> jars, + ClassLoader classLoader) { + EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj; + StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj; + TableConfig tableConfig = (TableConfig) tableConfigObj; + ModuleManager moduleManager = (ModuleManager) moduleManagerObj; + FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj; + CatalogManager catalogManager = (CatalogManager) catalogManagerObj; + ImmutablePair<Object, Object> pair = createPlannerAndExecutor( + classLoader, environmentSettings, senv, + tableConfig, moduleManager, functionCatalog, catalogManager); + Planner planner = (Planner) pair.left; + Executor executor = (Executor) pair.right; + + ResourceManager resourceManager = (ResourceManager) createResourceManager(jars, tableConfig); + + return new StreamTableEnvironmentImpl(catalogManager, moduleManager, resourceManager, + functionCatalog, tableConfig, senv, planner, executor, environmentSettings.isStreamingMode()); + } + @Override public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) { return new StreamExecutionEnvironmentFactory() { @@ -261,7 +337,11 @@ public class Flink115Shims extends FlinkShims { @Override public boolean isTimeIndicatorType(Object type) { - return FlinkTypeFactory.isTimeIndicatorType((TypeInformation<?>) type); + if (type instanceof TimeIndicatorTypeInfo) { + return true; + } else { + return false; + } } private Object lookupExecutor(ClassLoader classLoader, @@ -292,6 +372,7 @@ public class Flink115Shims extends FlinkShims { Executor executor = (Executor) lookupExecutor(classLoader, environmentSettings, sEnv); Planner planner = PlannerFactoryUtil.createPlanner(executor, (TableConfig) tableConfig, + Thread.currentThread().getContextClassLoader(), (ModuleManager) moduleManager, (CatalogManager) catalogManager, (FunctionCatalog) functionCatalog); diff --git a/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/Flink116SqlInterpreter.java b/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/Flink116SqlInterpreter.java new file mode 100644 index 0000000000..e4f098cea7 --- /dev/null +++ b/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/Flink116SqlInterpreter.java @@ -0,0 +1,590 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.JobListener; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.SqlParserException; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.internal.TableEnvironmentInternal; +import org.apache.flink.table.delegation.Parser; +import org.apache.flink.table.operations.*; +import org.apache.flink.table.operations.command.HelpOperation; +import org.apache.flink.table.operations.command.SetOperation; +import org.apache.flink.table.operations.ddl.*; +import org.apache.flink.table.utils.EncodingUtils; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.ZeppelinContext; +import org.apache.zeppelin.interpreter.util.SqlSplitter; +import org.jline.utils.AttributedString; +import org.jline.utils.AttributedStringBuilder; +import org.jline.utils.AttributedStyle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + + +public class Flink116SqlInterpreter { + + private static final Logger LOGGER = LoggerFactory.getLogger(Flink116SqlInterpreter.class); + private static final String CMD_DESC_DELIMITER = "\t\t"; + + /** + * SQL Client HELP command helper class. + */ + private static final class SQLCliCommandsDescriptions { + private int commandMaxLength; + private final Map<String, String> commandsDescriptions; + + public SQLCliCommandsDescriptions() { + this.commandsDescriptions = new LinkedHashMap<>(); + this.commandMaxLength = -1; + } + + public SQLCliCommandsDescriptions commandDescription(String command, String description) { + Preconditions.checkState( + StringUtils.isNotBlank(command), "content of command must not be empty."); + Preconditions.checkState( + StringUtils.isNotBlank(description), + "content of command's description must not be empty."); + this.updateMaxCommandLength(command.length()); + this.commandsDescriptions.put(command, description); + return this; + } + + private void updateMaxCommandLength(int newLength) { + Preconditions.checkState(newLength > 0); + if (this.commandMaxLength < newLength) { + this.commandMaxLength = newLength; + } + } + + public AttributedString build() { + AttributedStringBuilder attributedStringBuilder = new AttributedStringBuilder(); + if (!this.commandsDescriptions.isEmpty()) { + this.commandsDescriptions.forEach( + (cmd, cmdDesc) -> { + attributedStringBuilder + .style(AttributedStyle.DEFAULT.bold()) + .append( + String.format( + String.format("%%-%ds", commandMaxLength), cmd)) + .append(CMD_DESC_DELIMITER) + .style(AttributedStyle.DEFAULT) + .append(cmdDesc) + .append('\n'); + }); + } + return attributedStringBuilder.toAttributedString(); + } + } + + private static final AttributedString SQL_CLI_COMMANDS_DESCRIPTIONS = + new SQLCliCommandsDescriptions() + .commandDescription("HELP", "Prints the available commands.") + .commandDescription( + "SET", + "Sets a session configuration property. Syntax: \"SET '<key>'='<value>';\". Use \"SET;\" for listing all properties.") + .commandDescription( + "RESET", + "Resets a session configuration property. Syntax: \"RESET '<key>';\". Use \"RESET;\" for reset all session properties.") + .commandDescription( + "INSERT INTO", + "Inserts the results of a SQL SELECT query into a declared table sink.") + .commandDescription( + "INSERT OVERWRITE", + "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.") + .commandDescription( + "SELECT", "Executes a SQL SELECT query on the Flink cluster.") + .commandDescription( + "EXPLAIN", + "Describes the execution plan of a query or table with the given name.") + .commandDescription( + "BEGIN STATEMENT SET", + "Begins a statement set. Syntax: \"BEGIN STATEMENT SET;\"") + .commandDescription("END", "Ends a statement set. Syntax: \"END;\"") + // (TODO) zjffdu, ADD/REMOVE/SHOW JAR + .build(); + + // -------------------------------------------------------------------------------------------- + + public static final AttributedString MESSAGE_HELP = + new AttributedStringBuilder() + .append("The following commands are available:\n\n") + .append(SQL_CLI_COMMANDS_DESCRIPTIONS) + .style(AttributedStyle.DEFAULT.underline()) + .append("\nHint") + .style(AttributedStyle.DEFAULT) + .append( + ": Make sure that a statement ends with \";\" for finalizing (multi-line) statements.") + // About Documentation Link. + .style(AttributedStyle.DEFAULT) + .append( + "\nYou can also type any Flink SQL statement, please visit https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/overview/ for more details.") + .toAttributedString(); + + private static final String MESSAGE_NO_STATEMENT_IN_STATEMENT_SET = "No statement in the statement set, skip submit."; + + private FlinkSqlContext flinkSqlContext; + private TableEnvironment tbenv; + private ZeppelinContext z; + private Parser sqlParser; + private SqlSplitter sqlSplitter; + // paragraphId -> list of ModifyOperation, used for statement set in 2 syntax: + // 1. runAsOne= true + // 2. begin statement set; + // ... + // end; + private Map<String, List<ModifyOperation>> statementOperationsMap = new HashMap<>(); + private boolean isBatch; + private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock(); + + + public Flink116SqlInterpreter(FlinkSqlContext flinkSqlContext, boolean isBatch) { + this.flinkSqlContext = flinkSqlContext; + this.isBatch = isBatch; + if (isBatch) { + this.tbenv = (TableEnvironment) flinkSqlContext.getBtenv(); + } else { + this.tbenv = (TableEnvironment) flinkSqlContext.getStenv(); + } + this.z = (ZeppelinContext) flinkSqlContext.getZeppelinContext(); + this.sqlParser = ((TableEnvironmentInternal) tbenv).getParser(); + this.sqlSplitter = new SqlSplitter(); + JobListener jobListener = new JobListener() { + @Override + public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + LOGGER.info("UnLock JobSubmitLock"); + } + } + + @Override + public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) { + + } + }; + + ((ExecutionEnvironment) flinkSqlContext.getBenv()).registerJobListener(jobListener); + ((StreamExecutionEnvironment) flinkSqlContext.getSenv()).registerJobListener(jobListener); + } + + public InterpreterResult runSqlList(String st, InterpreterContext context) { + try { + boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false")); + if (runAsOne) { + statementOperationsMap.put(context.getParagraphId(), new ArrayList<>()); + } + + String jobName = context.getLocalProperties().get("jobName"); + if (StringUtils.isNotBlank(jobName)) { + tbenv.getConfig().getConfiguration().set(PipelineOptions.NAME, jobName); + } + + List<String> sqls = sqlSplitter.splitSql(st).stream().map(String::trim).collect(Collectors.toList()); + for (String sql : sqls) { + List<Operation> operations = null; + try { + operations = sqlParser.parse(sql); + } catch (SqlParserException e) { + context.out.write("%text Invalid Sql statement: " + sql + "\n"); + context.out.write(MESSAGE_HELP.toString()); + return new InterpreterResult(InterpreterResult.Code.ERROR, e.toString()); + } + + try { + callOperation(sql, operations.get(0), context); + context.out.flush(); + } catch (Throwable e) { + LOGGER.error("Fail to run sql:" + sql, e); + try { + context.out.write("%text Fail to run sql command: " + + sql + "\n" + ExceptionUtils.getStackTrace(e) + "\n"); + } catch (IOException ex) { + LOGGER.warn("Unexpected exception:", ex); + return new InterpreterResult(InterpreterResult.Code.ERROR, + ExceptionUtils.getStackTrace(e)); + } + return new InterpreterResult(InterpreterResult.Code.ERROR); + } + } + + if (runAsOne && !statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>()).isEmpty()) { + try { + lock.lock(); + List<ModifyOperation> modifyOperations = statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>()); + if (!modifyOperations.isEmpty()) { + callInserts(modifyOperations, context); + } + } catch (Exception e) { + LOGGER.error("Fail to execute sql as one job", e); + return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e)); + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + } + } catch (Exception e) { + LOGGER.error("Fail to execute sql", e); + return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e)); + } finally { + statementOperationsMap.remove(context.getParagraphId()); + } + + return new InterpreterResult(InterpreterResult.Code.SUCCESS); + } + + private void callOperation(String sql, Operation operation, InterpreterContext context) throws IOException { + if (operation instanceof HelpOperation) { + // HELP + callHelp(context); + } else if (operation instanceof SetOperation) { + // SET + callSet((SetOperation) operation, context); + } else if (operation instanceof ModifyOperation) { + // INSERT INTO/OVERWRITE + callInsert((ModifyOperation) operation, context); + } else if (operation instanceof QueryOperation) { + // SELECT + callSelect(sql, (QueryOperation) operation, context); + } else if (operation instanceof ExplainOperation) { + // EXPLAIN + callExplain((ExplainOperation) operation, context); + } else if (operation instanceof BeginStatementSetOperation) { + // BEGIN STATEMENT SET + callBeginStatementSet(context); + } else if (operation instanceof EndStatementSetOperation) { + // END + callEndStatementSet(context); + } else if (operation instanceof ShowCreateTableOperation) { + // SHOW CREATE TABLE + callShowCreateTable((ShowCreateTableOperation) operation, context); + } else if (operation instanceof ShowCatalogsOperation) { + callShowCatalogs(context); + } else if (operation instanceof ShowCurrentCatalogOperation) { + callShowCurrentCatalog(context); + } else if (operation instanceof UseCatalogOperation) { + callUseCatalog(((UseCatalogOperation) operation).getCatalogName(), context); + } else if (operation instanceof CreateCatalogOperation) { + callDDL(sql, context, "Catalog has been created."); + } else if (operation instanceof DropCatalogOperation) { + callDDL(sql, context, "Catalog has been dropped."); + } else if (operation instanceof UseDatabaseOperation) { + UseDatabaseOperation useDBOperation = (UseDatabaseOperation) operation; + callUseDatabase(useDBOperation.getDatabaseName(), context); + } else if (operation instanceof CreateDatabaseOperation) { + callDDL(sql, context, "Database has been created."); + } else if (operation instanceof DropDatabaseOperation) { + callDDL(sql, context, "Database has been removed."); + } else if (operation instanceof AlterDatabaseOperation) { + callDDL(sql, context, "Alter database succeeded!"); + } else if (operation instanceof ShowDatabasesOperation) { + callShowDatabases(context); + } else if (operation instanceof ShowCurrentDatabaseOperation) { + callShowCurrentDatabase(context); + } else if (operation instanceof CreateTableOperation || operation instanceof CreateTableASOperation) { + callDDL(sql, context, "Table has been created."); + } else if (operation instanceof AlterTableOperation) { + callDDL(sql, context, "Alter table succeeded!"); + } else if (operation instanceof DropTableOperation) { + callDDL(sql, context, "Table has been dropped."); + } else if (operation instanceof DescribeTableOperation) { + DescribeTableOperation describeTableOperation = (DescribeTableOperation) operation; + callDescribe(describeTableOperation.getSqlIdentifier().getObjectName(), context); + } else if (operation instanceof ShowTablesOperation) { + callShowTables(context); + } else if (operation instanceof CreateViewOperation) { + callDDL(sql, context, "View has been created."); + } else if (operation instanceof DropViewOperation) { + callDDL(sql, context, "View has been dropped."); + } else if (operation instanceof AlterViewOperation) { + callDDL(sql, context, "Alter view succeeded!"); + } else if (operation instanceof CreateCatalogFunctionOperation || operation instanceof CreateTempSystemFunctionOperation) { + callDDL(sql, context, "Function has been created."); + } else if (operation instanceof DropCatalogFunctionOperation || operation instanceof DropTempSystemFunctionOperation) { + callDDL(sql, context, "Function has been removed."); + } else if (operation instanceof AlterCatalogFunctionOperation) { + callDDL(sql, context, "Alter function succeeded!"); + } else if (operation instanceof ShowFunctionsOperation) { + callShowFunctions(context); + } else if (operation instanceof ShowModulesOperation) { + callShowModules(context); + } else if (operation instanceof ShowPartitionsOperation) { + ShowPartitionsOperation showPartitionsOperation = (ShowPartitionsOperation) operation; + callShowPartitions(showPartitionsOperation.asSummaryString(), context); + } else { + throw new IOException(operation.getClass().getName() + " is not supported"); + } + } + + + private void callHelp(InterpreterContext context) throws IOException { + context.out.write(MESSAGE_HELP.toString() + "\n"); + } + + private void callInsert(ModifyOperation operation, InterpreterContext context) throws IOException { + if (statementOperationsMap.containsKey(context.getParagraphId())) { + List<ModifyOperation> modifyOperations = statementOperationsMap.get(context.getParagraphId()); + modifyOperations.add(operation); + } else { + callInserts(Collections.singletonList(operation), context); + } + } + + private void callInserts(List<ModifyOperation> operations, InterpreterContext context) throws IOException { + if (!isBatch) { + context.getLocalProperties().put("flink.streaming.insert_into", "true"); + } + TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations); + checkState(tableResult.getJobClient().isPresent()); + try { + tableResult.await(); + JobClient jobClient = tableResult.getJobClient().get(); + if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { + context.out.write("Insertion successfully.\n"); + } else { + throw new IOException("Job is failed, " + jobClient.getJobExecutionResult().get().toString()); + } + } catch (InterruptedException e) { + throw new IOException("Flink job is interrupted", e); + } catch (ExecutionException e) { + throw new IOException("Flink job is failed", e); + } + } + + private void callShowCreateTable(ShowCreateTableOperation showCreateTableOperation, InterpreterContext context) throws IOException { + try { + lock.lock(); + TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(showCreateTableOperation); + String explanation = + Objects.requireNonNull(tableResult.collect().next().getField(0)).toString(); + context.out.write(explanation + "\n"); + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + } + + private void callExplain(ExplainOperation explainOperation, InterpreterContext context) throws IOException { + try { + lock.lock(); + TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(explainOperation); + String explanation = + Objects.requireNonNull(tableResult.collect().next().getField(0)).toString(); + context.out.write(explanation + "\n"); + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + } + + public void callSelect(String sql, QueryOperation queryOperation, InterpreterContext context) throws IOException { + try { + lock.lock(); + if (isBatch) { + callBatchInnerSelect(sql, context); + } else { + callStreamInnerSelect(sql, context); + } + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + } + + public void callBatchInnerSelect(String sql, InterpreterContext context) throws IOException { + Table table = this.tbenv.sqlQuery(sql); + String result = z.showData(table); + context.out.write(result); + } + + public void callStreamInnerSelect(String sql, InterpreterContext context) throws IOException { + flinkSqlContext.getStreamSqlSelectConsumer().accept(sql); + } + + public void callSet(SetOperation setOperation, InterpreterContext context) throws IOException { + if (setOperation.getKey().isPresent() && setOperation.getValue().isPresent()) { + // set a property + String key = setOperation.getKey().get().trim(); + String value = setOperation.getValue().get().trim(); + this.tbenv.getConfig().getConfiguration().setString(key, value); + LOGGER.info("Set table config: {}={}", key, value); + } else { + // show all properties + final Map<String, String> properties = this.tbenv.getConfig().getConfiguration().toMap(); + List<String> prettyEntries = new ArrayList<>(); + for (String key : properties.keySet()) { + prettyEntries.add( + String.format( + "'%s' = '%s'", + EncodingUtils.escapeSingleQuotes(key), + EncodingUtils.escapeSingleQuotes(properties.get(key)))); + } + prettyEntries.sort(String::compareTo); + prettyEntries.forEach(entry -> { + try { + context.out.write(entry + "\n"); + } catch (IOException e) { + LOGGER.warn("Fail to write output", e); + } + }); + } + } + + private void callBeginStatementSet(InterpreterContext context) throws IOException { + statementOperationsMap.put(context.getParagraphId(), new ArrayList<>()); + } + + private void callEndStatementSet(InterpreterContext context) throws IOException { + List<ModifyOperation> modifyOperations = statementOperationsMap.get(context.getParagraphId()); + if (modifyOperations != null && !modifyOperations.isEmpty()) { + callInserts(modifyOperations, context); + } else { + context.out.write(MESSAGE_NO_STATEMENT_IN_STATEMENT_SET); + } + } + + private void callUseCatalog(String catalog, InterpreterContext context) throws IOException { + tbenv.executeSql("USE CATALOG `" + catalog + "`"); + } + + private void callUseDatabase(String databaseName, + InterpreterContext context) throws IOException { + this.tbenv.executeSql("USE `" + databaseName + "`"); + } + + private void callShowCatalogs(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Catalogs"); + List<String> catalogs = CollectionUtil.iteratorToList(tableResult.collect()).stream() + .map(r -> checkNotNull(r.getField(0)).toString()) + .collect(Collectors.toList()); + context.out.write("%table catalog\n" + StringUtils.join(catalogs, "\n") + "\n"); + } + + private void callShowCurrentCatalog(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Current Catalog"); + String catalog = tableResult.collect().next().getField(0).toString(); + context.out.write("%text current catalog: " + catalog + "\n"); + } + + private void callShowDatabases(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Databases"); + List<String> databases = CollectionUtil.iteratorToList(tableResult.collect()).stream() + .map(r -> checkNotNull(r.getField(0)).toString()) + .collect(Collectors.toList()); + context.out.write( + "%table database\n" + StringUtils.join(databases, "\n") + "\n"); + } + + private void callShowCurrentDatabase(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Current Database"); + String database = tableResult.collect().next().getField(0).toString(); + context.out.write("%text current database: " + database + "\n"); + } + + private void callShowTables(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Tables"); + List<String> tables = CollectionUtil.iteratorToList(tableResult.collect()).stream() + .map(r -> checkNotNull(r.getField(0)).toString()) + .filter(tbl -> !tbl.startsWith("UnnamedTable")) + .collect(Collectors.toList()); + context.out.write( + "%table table\n" + StringUtils.join(tables, "\n") + "\n"); + } + + private void callShowFunctions(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Functions"); + List<String> functions = CollectionUtil.iteratorToList(tableResult.collect()).stream() + .map(r -> checkNotNull(r.getField(0)).toString()) + .collect(Collectors.toList()); + context.out.write( + "%table function\n" + StringUtils.join(functions, "\n") + "\n"); + } + + private void callShowModules(InterpreterContext context) throws IOException { + String[] modules = this.tbenv.listModules(); + context.out.write("%table modules\n" + StringUtils.join(modules, "\n") + "\n"); + } + + private void callShowPartitions(String sql, InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql(sql); + List<String> partions = CollectionUtil.iteratorToList(tableResult.collect()).stream() + .map(r -> checkNotNull(r.getField(0)).toString()) + .collect(Collectors.toList()); + context.out.write( + "%table partitions\n" + StringUtils.join(partions, "\n") + "\n"); + } + + private void callDDL(String sql, InterpreterContext context, String message) throws IOException { + try { + lock.lock(); + this.tbenv.executeSql(sql); + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + context.out.write(message + "\n"); + } + + private void callDescribe(String name, InterpreterContext context) throws IOException { + TableResult tableResult = null; + try { + tableResult = tbenv.executeSql("DESCRIBE " + name); + } catch (Exception e) { + throw new IOException("Fail to describe table: " + name, e); + } + CloseableIterator<Row> result = tableResult.collect(); + StringBuilder builder = new StringBuilder(); + builder.append("Column\tType\n"); + while (result.hasNext()) { + Row row = result.next(); + builder.append(row.getField(0) + "\t" + row.getField(1) + "\n"); + } + context.out.write("%table\n" + builder.toString()); + } +} diff --git a/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java b/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java new file mode 100644 index 0000000000..a35ad3a6cd --- /dev/null +++ b/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink; + + +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.*; +import org.apache.flink.types.Row; +import org.apache.flink.util.StringUtils; + +import java.sql.Time; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; +import static org.apache.zeppelin.flink.TimestampStringUtils.*; + +/** + * Copied from flink-project with minor modification. + * */ +public class PrintUtils { + + public static final String NULL_COLUMN = "(NULL)"; + private static final String COLUMN_TRUNCATED_FLAG = "..."; + + private PrintUtils() {} + + + public static String[] rowToString( + Row row, ResolvedSchema resolvedSchema, ZoneId sessionTimeZone) { + return rowToString(row, NULL_COLUMN, false, resolvedSchema, sessionTimeZone); + } + + public static String[] rowToString( + Row row, + String nullColumn, + boolean printRowKind, + ResolvedSchema resolvedSchema, + ZoneId sessionTimeZone) { + final int len = printRowKind ? row.getArity() + 1 : row.getArity(); + final List<String> fields = new ArrayList<>(len); + if (printRowKind) { + fields.add(row.getKind().shortString()); + } + for (int i = 0; i < row.getArity(); i++) { + final Object field = row.getField(i); + final LogicalType fieldType = + resolvedSchema.getColumnDataTypes().get(i).getLogicalType(); + if (field == null) { + fields.add(nullColumn); + } else { + fields.add( + StringUtils.arrayAwareToString( + formattedTimestamp(field, fieldType, sessionTimeZone))); + } + } + return fields.toArray(new String[0]); + } + + /** + * Normalizes field that contains TIMESTAMP, TIMESTAMP_LTZ and TIME type data. + * + * <p>This method also supports nested type ARRAY, ROW, MAP. + */ + private static Object formattedTimestamp( + Object field, LogicalType fieldType, ZoneId sessionTimeZone) { + final LogicalTypeRoot typeRoot = fieldType.getTypeRoot(); + if (field == null) { + return "null"; + } + switch (typeRoot) { + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return formatTimestampField(field, fieldType, sessionTimeZone); + case TIME_WITHOUT_TIME_ZONE: + return formatTimeField(field); + case ARRAY: + LogicalType elementType = ((ArrayType) fieldType).getElementType(); + if (field instanceof List) { + List<?> array = (List<?>) field; + Object[] formattedArray = new Object[array.size()]; + for (int i = 0; i < array.size(); i++) { + formattedArray[i] = + formattedTimestamp(array.get(i), elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass().isArray()) { + // primitive type + if (field.getClass() == byte[].class) { + byte[] array = (byte[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == short[].class) { + short[] array = (short[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == int[].class) { + int[] array = (int[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == long[].class) { + long[] array = (long[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == float[].class) { + float[] array = (float[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == double[].class) { + double[] array = (double[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == boolean[].class) { + boolean[] array = (boolean[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == char[].class) { + char[] array = (char[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else { + // non-primitive type + Object[] array = (Object[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } + } else { + return field; + } + case ROW: + if (fieldType instanceof RowType && field instanceof Row) { + Row row = (Row) field; + Row formattedRow = new Row(row.getKind(), row.getArity()); + for (int i = 0; i < ((RowType) fieldType).getFields().size(); i++) { + LogicalType type = ((RowType) fieldType).getFields().get(i).getType(); + formattedRow.setField( + i, formattedTimestamp(row.getField(i), type, sessionTimeZone)); + } + return formattedRow; + + } else if (fieldType instanceof RowType && field instanceof RowData) { + RowData rowData = (RowData) field; + Row formattedRow = new Row(rowData.getRowKind(), rowData.getArity()); + for (int i = 0; i < ((RowType) fieldType).getFields().size(); i++) { + LogicalType type = ((RowType) fieldType).getFields().get(i).getType(); + RowData.FieldGetter fieldGetter = RowData.createFieldGetter(type, i); + formattedRow.setField( + i, + formattedTimestamp( + fieldGetter.getFieldOrNull(rowData), + type, + sessionTimeZone)); + } + return formattedRow; + } else { + return field; + } + case MAP: + LogicalType keyType = ((MapType) fieldType).getKeyType(); + LogicalType valueType = ((MapType) fieldType).getValueType(); + if (fieldType instanceof MapType && field instanceof Map) { + Map<Object, Object> map = ((Map) field); + Map<Object, Object> formattedMap = new HashMap<>(map.size()); + for (Object key : map.keySet()) { + formattedMap.put( + formattedTimestamp(key, keyType, sessionTimeZone), + formattedTimestamp(map.get(key), valueType, sessionTimeZone)); + } + return formattedMap; + } else if (fieldType instanceof MapType && field instanceof MapData) { + MapData map = ((MapData) field); + Map<Object, Object> formattedMap = new HashMap<>(map.size()); + Object[] keyArray = + (Object[]) formattedTimestamp(map.keyArray(), keyType, sessionTimeZone); + Object[] valueArray = + (Object[]) + formattedTimestamp( + map.valueArray(), valueType, sessionTimeZone); + for (int i = 0; i < keyArray.length; i++) { + formattedMap.put(keyArray[i], valueArray[i]); + } + return formattedMap; + } else { + return field; + } + default: + return field; + } + } + + /** + * Formats the print content of TIMESTAMP and TIMESTAMP_LTZ type data, consider the user + * configured time zone. + */ + private static Object formatTimestampField( + Object timestampField, LogicalType fieldType, ZoneId sessionTimeZone) { + switch (fieldType.getTypeRoot()) { + case TIMESTAMP_WITHOUT_TIME_ZONE: + final int precision = getPrecision(fieldType); + if (timestampField instanceof java.sql.Timestamp) { + // conversion between java.sql.Timestamp and TIMESTAMP_WITHOUT_TIME_ZONE + return timestampToString( + ((Timestamp) timestampField).toLocalDateTime(), precision); + } else if (timestampField instanceof java.time.LocalDateTime) { + return timestampToString(((LocalDateTime) timestampField), precision); + } else if (timestampField instanceof TimestampData) { + return timestampToString( + ((TimestampData) timestampField).toLocalDateTime(), precision); + } else { + return timestampField; + } + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + Instant instant = null; + if (timestampField instanceof java.time.Instant) { + instant = ((Instant) timestampField); + } else if (timestampField instanceof java.sql.Timestamp) { + Timestamp timestamp = ((Timestamp) timestampField); + // conversion between java.sql.Timestamp and TIMESTAMP_WITH_LOCAL_TIME_ZONE + instant = + TimestampData.fromEpochMillis( + timestamp.getTime(), timestamp.getNanos() % 1000_000) + .toInstant(); + } else if (timestampField instanceof TimestampData) { + instant = ((TimestampData) timestampField).toInstant(); + } else if (timestampField instanceof Integer) { + instant = Instant.ofEpochSecond((Integer) timestampField); + } else if (timestampField instanceof Long) { + instant = Instant.ofEpochMilli((Long) timestampField); + } + if (instant != null) { + return timestampToString( + instant.atZone(sessionTimeZone).toLocalDateTime(), + getPrecision(fieldType)); + } else { + return timestampField; + } + default: + return timestampField; + } + } + + /** Formats the print content of TIME type data. */ + private static Object formatTimeField(Object timeField) { + if (timeField.getClass().isAssignableFrom(int.class) || timeField instanceof Integer) { + return unixTimeToString((int) timeField); + } else if (timeField.getClass().isAssignableFrom(long.class) || timeField instanceof Long) { + return unixTimeToString(((Long) timeField).intValue()); + } else if (timeField instanceof Time) { + return unixTimeToString(timeToInternal((Time) timeField)); + } else if (timeField instanceof LocalTime) { + return unixTimeToString(localTimeToUnixDate((LocalTime) timeField)); + } else { + return timeField; + } + } +} diff --git a/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/TimestampStringUtils.java b/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/TimestampStringUtils.java new file mode 100644 index 0000000000..c52104e45a --- /dev/null +++ b/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/TimestampStringUtils.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink; + +import java.sql.Time; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.TimeZone; + +/** + * Copied from flink-project with minor modification. + * */ +public class TimestampStringUtils { + + private static final TimeZone LOCAL_TZ = TimeZone.getDefault(); + + public TimestampStringUtils() { + } + + public static String timestampToString(LocalDateTime ldt, int precision) { + String fraction; + for(fraction = pad(9, (long)ldt.getNano()); fraction.length() > precision && fraction.endsWith("0"); fraction = fraction.substring(0, fraction.length() - 1)) { + } + + StringBuilder ymdhms = ymdhms(new StringBuilder(), ldt.getYear(), ldt.getMonthValue(), ldt.getDayOfMonth(), ldt.getHour(), ldt.getMinute(), ldt.getSecond()); + if (fraction.length() > 0) { + ymdhms.append(".").append(fraction); + } + + return ymdhms.toString(); + } + + private static String pad(int length, long v) { + StringBuilder s = new StringBuilder(Long.toString(v)); + + while(s.length() < length) { + s.insert(0, "0"); + } + + return s.toString(); + } + + private static StringBuilder hms(StringBuilder b, int h, int m, int s) { + int2(b, h); + b.append(':'); + int2(b, m); + b.append(':'); + int2(b, s); + return b; + } + + private static StringBuilder ymdhms(StringBuilder b, int year, int month, int day, int h, int m, int s) { + ymd(b, year, month, day); + b.append(' '); + hms(b, h, m, s); + return b; + } + + private static StringBuilder ymd(StringBuilder b, int year, int month, int day) { + int4(b, year); + b.append('-'); + int2(b, month); + b.append('-'); + int2(b, day); + return b; + } + + private static void int4(StringBuilder buf, int i) { + buf.append((char)(48 + i / 1000 % 10)); + buf.append((char)(48 + i / 100 % 10)); + buf.append((char)(48 + i / 10 % 10)); + buf.append((char)(48 + i % 10)); + } + + private static void int2(StringBuilder buf, int i) { + buf.append((char)(48 + i / 10 % 10)); + buf.append((char)(48 + i % 10)); + } + + public static String unixTimeToString(int time) { + StringBuilder buf = new StringBuilder(8); + unixTimeToString(buf, time, 0); + return buf.toString(); + } + + private static void unixTimeToString(StringBuilder buf, int time, int precision) { + while(time < 0) { + time = (int)((long)time + 86400000L); + } + + int h = time / 3600000; + int time2 = time % 3600000; + int m = time2 / '\uea60'; + int time3 = time2 % '\uea60'; + int s = time3 / 1000; + int ms = time3 % 1000; + int2(buf, h); + buf.append(':'); + int2(buf, m); + buf.append(':'); + int2(buf, s); + if (precision > 0) { + buf.append('.'); + + while(precision > 0) { + buf.append((char)(48 + ms / 100)); + ms %= 100; + ms *= 10; + if (ms == 0) { + break; + } + + --precision; + } + } + + } + + public static int timeToInternal(Time time) { + long ts = time.getTime() + (long)LOCAL_TZ.getOffset(time.getTime()); + return (int)(ts % 86400000L); + } + + public static int localTimeToUnixDate(LocalTime time) { + return time.getHour() * 3600000 + time.getMinute() * '\uea60' + time.getSecond() * 1000 + time.getNano() / 1000000; + } +} diff --git a/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/shims116/CollectStreamTableSink.java b/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/shims116/CollectStreamTableSink.java new file mode 100644 index 0000000000..cf7968e7e6 --- /dev/null +++ b/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/shims116/CollectStreamTableSink.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink.shims116; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.experimental.CollectSink; +import org.apache.flink.table.sinks.RetractStreamTableSink; +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.util.UUID; + +/** + * Table sink for collecting the results locally using sockets. + */ +public class CollectStreamTableSink implements RetractStreamTableSink<Row> { + + private static final Logger LOGGER = LoggerFactory.getLogger(CollectStreamTableSink.class); + + private final InetAddress targetAddress; + private final int targetPort; + private final TypeSerializer<Tuple2<Boolean, Row>> serializer; + + private String[] fieldNames; + private TypeInformation<?>[] fieldTypes; + + public CollectStreamTableSink(InetAddress targetAddress, + int targetPort, + TypeSerializer<Tuple2<Boolean, Row>> serializer) { + LOGGER.info("Use address: " + targetAddress.getHostAddress() + ":" + targetPort); + this.targetAddress = targetAddress; + this.targetPort = targetPort; + this.serializer = serializer; + } + + @Override + public String[] getFieldNames() { + return fieldNames; + } + + @Override + public TypeInformation<?>[] getFieldTypes() { + return fieldTypes; + } + + @Override + public CollectStreamTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { + final CollectStreamTableSink copy = + new CollectStreamTableSink(targetAddress, targetPort, serializer); + copy.fieldNames = fieldNames; + copy.fieldTypes = fieldTypes; + return copy; + } + + @Override + public TypeInformation<Row> getRecordType() { + return Types.ROW_NAMED(fieldNames, fieldTypes); + } + + @Override + public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> stream) { + // add sink + return stream + .addSink(new CollectSink<>(targetAddress, targetPort, serializer)) + .name("Zeppelin Flink Sql Stream Collect Sink " + UUID.randomUUID()) + .setParallelism(1); + } + + @Override + public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() { + return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType()); + } +} diff --git a/flink/pom.xml b/flink/pom.xml index d432293e2f..5ac374ce37 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -40,6 +40,7 @@ <module>flink1.13-shims</module> <module>flink1.14-shims</module> <module>flink1.15-shims</module> + <module>flink1.16-shims</module> </modules> <properties> @@ -47,12 +48,21 @@ <flink1.13.version>1.13.2</flink1.13.version> <flink1.14.version>1.14.0</flink1.14.version> <flink1.15.version>1.15.1</flink1.15.version> + <flink1.16.version>1.16.0</flink1.16.version> <flink.scala.version>2.11.12</flink.scala.version> <flink.scala.binary.version>2.11</flink.scala.binary.version> </properties> <profiles> + <profile> + <id>flink-116</id> + <!-- Flink 1.16 only support scala 2.12--> + <modules> + <module>flink-scala-2.12</module> + </modules> + </profile> + <profile> <id>flink-115</id> <!-- Flink 1.15 only support scala 2.12--> diff --git a/testing/env_python_3_with_flink_116.yml b/testing/env_python_3_with_flink_116.yml new file mode 100644 index 0000000000..9b35aba642 --- /dev/null +++ b/testing/env_python_3_with_flink_116.yml @@ -0,0 +1,29 @@ +name: python_3_with_flink +channels: + - conda-forge + - defaults +dependencies: + - pycodestyle + - scipy + - numpy=1.19.5 + - grpcio + - protobuf + - pandasql + - ipython + - ipython_genutils + - ipykernel + - jupyter_client=5 + - hvplot + - plotnine + - seaborn + - intake + - intake-parquet + - intake-xarray + - altair + - vega_datasets + - plotly + - jinja2=3.0.3 + - pip + - pip: + - apache-flink==1.16.0 +