This is an automated email from the ASF dual-hosted git repository. guanhuali pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new efadcd6aea [ZEPPELIN-5972] Support Flink 1.17 (#4677) efadcd6aea is described below commit efadcd6aea07ea5e0c7751770dc2e7a41558fcf2 Author: Cheng Pan <cheng...@apache.org> AuthorDate: Sun Oct 22 21:50:04 2023 +0800 [ZEPPELIN-5972] Support Flink 1.17 (#4677) * Support Flink 1.17 * Bump Flink 1.17.1 * Fix package name * fix java 11 compile * empty --------- Co-authored-by: Jeff Zhang <zjf...@apache.org> --- .github/workflows/core.yml | 2 +- flink/flink-scala-parent/pom.xml | 42 ++ .../zeppelin/flink/FlinkSqlInterpreterTest.java | 7 +- .../java/org/apache/zeppelin/flink/FlinkShims.java | 3 + .../org/apache/zeppelin/flink/Flink116Shims.java | 2 - flink/flink1.17-shims/pom.xml | 207 ++++++++ .../org/apache/zeppelin/flink/Flink117Shims.java} | 24 +- .../zeppelin/flink/Flink117SqlInterpreter.java | 590 +++++++++++++++++++++ .../java/org/apache/zeppelin/flink/PrintUtils.java | 318 +++++++++++ .../zeppelin/flink/TimestampStringUtils.java | 143 +++++ .../flink/shims117/CollectStreamTableSink.java | 97 ++++ flink/pom.xml | 10 + testing/env_python_3_with_flink_117.yml | 29 + 13 files changed, 1456 insertions(+), 18 deletions(-) diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index 5c8a8d7720..cc639869a9 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -243,7 +243,7 @@ jobs: strategy: fail-fast: false matrix: - flink: [113, 114, 115, 116] + flink: [113, 114, 115, 116, 117] steps: - name: Checkout uses: actions/checkout@v3 diff --git a/flink/flink-scala-parent/pom.xml b/flink/flink-scala-parent/pom.xml index 135fdd8617..79b839fefb 100644 --- a/flink/flink-scala-parent/pom.xml +++ b/flink/flink-scala-parent/pom.xml @@ -79,6 +79,12 @@ <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>flink1.17-shims</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.apache.zeppelin</groupId> <artifactId>zeppelin-python</artifactId> @@ -1007,6 +1013,42 @@ </dependencies> </profile> + <profile> + <id>flink-117</id> + <properties> + <flink.version>${flink1.17.version}</flink.version> + <flink.scala.version>2.12.7</flink.scala.version> + <flink.scala.binary.version>2.12</flink.scala.binary.version> + <flink.library.scala.suffix></flink.library.scala.suffix> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-sql-client</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-python</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + </profile> + <profile> <id>hive2</id> <activation> diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkSqlInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkSqlInterpreterTest.java index 2673ff0f90..e51042dc14 100644 --- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkSqlInterpreterTest.java +++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkSqlInterpreterTest.java @@ -511,6 +511,11 @@ public abstract class FlinkSqlInterpreterTest { resultMessages = context.out.toInterpreterResultMessage(); assertEquals("current catalog: test_catalog\n", resultMessages.get(0).getData()); + // USE DEFAULT_CATALOG + context = getInterpreterContext(); + result = sqlInterpreter.interpret("USE CATALOG default_catalog", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + // DROP CATALOG context = getInterpreterContext(); result = sqlInterpreter.interpret( @@ -526,8 +531,6 @@ public abstract class FlinkSqlInterpreterTest { assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); resultMessages = context.out.toInterpreterResultMessage(); assertTrue(context.out.toString(), resultMessages.get(0).getData().contains("default_catalog")); - assertFalse(context.out.toString(), resultMessages.get(0).getData().contains("test_catalog")); - } @Test diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java index 916c3313dc..11de5bd3b7 100644 --- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java +++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java @@ -66,6 +66,9 @@ public abstract class FlinkShims { } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 16) { LOGGER.info("Initializing shims for Flink 1.16"); flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink116Shims"); + } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 17) { + LOGGER.info("Initializing shims for Flink 1.17"); + flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink117Shims"); } else { throw new Exception("Flink version: '" + flinkVersion + "' is not supported yet"); } diff --git a/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/Flink116Shims.java b/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/Flink116Shims.java index b96e5f5e42..3578ffc8bb 100644 --- a/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/Flink116Shims.java +++ b/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/Flink116Shims.java @@ -23,7 +23,6 @@ import org.apache.commons.compress.utils.Lists; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.flink.api.common.RuntimeExecutionMode; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.cli.CliFrontend; @@ -53,7 +52,6 @@ import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.TableAggregateFunction; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.table.module.ModuleManager; -import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.resource.ResourceManager; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo; diff --git a/flink/flink1.17-shims/pom.xml b/flink/flink1.17-shims/pom.xml new file mode 100644 index 0000000000..997020abfc --- /dev/null +++ b/flink/flink1.17-shims/pom.xml @@ -0,0 +1,207 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-parent</artifactId> + <groupId>org.apache.zeppelin</groupId> + <version>0.11.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.zeppelin</groupId> + <artifactId>flink1.17-shims</artifactId> + <version>0.11.0-SNAPSHOT</version> + <packaging>jar</packaging> + <name>Zeppelin: Flink1.17 Shims</name> + + <properties> + <flink.version>${flink1.17.version}</flink.version> + <flink.scala.binary.version>2.12</flink.scala.binary.version> + </properties> + + <dependencies> + + <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>flink-shims</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala-bridge_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-scala_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-scala_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-python</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-sql-client</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <executions> + <execution> + <id>eclipse-add-source</id> + <goals> + <goal>add-source</goal> + </goals> + </execution> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + <execution> + <id>scala-test-compile-first</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + <configuration> + <scalaVersion>${flink.scala.version}</scalaVersion> + <args> + <arg>-unchecked</arg> + <arg>-deprecation</arg> + <arg>-feature</arg> + <arg>-nobootcp</arg> + </args> + <jvmArgs> + <jvmArg>-Xms1024m</jvmArg> + <jvmArg>-Xmx1024m</jvmArg> + <jvmArg>-XX:MaxMetaspaceSize=${MaxMetaspace}</jvmArg> + </jvmArgs> + <javacArgs> + <javacArg>-source</javacArg> + <javacArg>${java.version}</javacArg> + <javacArg>-target</javacArg> + <javacArg>${java.version}</javacArg> + <javacArg>-Xlint:all,-serial,-path,-options</javacArg> + </javacArgs> + </configuration> + </plugin> + + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <executions> + <execution> + <id>copy-interpreter-setting</id> + <phase>none</phase> + <configuration> + <skip>true</skip> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/Flink116Shims.java b/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/Flink117Shims.java similarity index 96% copy from flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/Flink116Shims.java copy to flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/Flink117Shims.java index b96e5f5e42..9bc22cc57d 100644 --- a/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/Flink116Shims.java +++ b/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/Flink117Shims.java @@ -23,7 +23,6 @@ import org.apache.commons.compress.utils.Lists; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.flink.api.common.RuntimeExecutionMode; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.cli.CliFrontend; @@ -53,14 +52,13 @@ import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.TableAggregateFunction; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.table.module.ModuleManager; -import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.resource.ResourceManager; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.FlinkException; -import org.apache.zeppelin.flink.shims116.CollectStreamTableSink; +import org.apache.zeppelin.flink.shims117.CollectStreamTableSink; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterResult; import org.slf4j.Logger; @@ -78,25 +76,25 @@ import java.util.Properties; /** - * Shims for flink 1.16 + * Shims for flink 1.17 */ -public class Flink116Shims extends FlinkShims { +public class Flink117Shims extends FlinkShims { - private static final Logger LOGGER = LoggerFactory.getLogger(Flink116Shims.class); + private static final Logger LOGGER = LoggerFactory.getLogger(Flink117Shims.class); - private Flink116SqlInterpreter batchSqlInterpreter; - private Flink116SqlInterpreter streamSqlInterpreter; + private Flink117SqlInterpreter batchSqlInterpreter; + private Flink117SqlInterpreter streamSqlInterpreter; - public Flink116Shims(FlinkVersion flinkVersion, Properties properties) { + public Flink117Shims(FlinkVersion flinkVersion, Properties properties) { super(flinkVersion, properties); } public void initInnerBatchSqlInterpreter(FlinkSqlContext flinkSqlContext) { - this.batchSqlInterpreter = new Flink116SqlInterpreter(flinkSqlContext, true); + this.batchSqlInterpreter = new Flink117SqlInterpreter(flinkSqlContext, true); } public void initInnerStreamSqlInterpreter(FlinkSqlContext flinkSqlContext) { - this.streamSqlInterpreter = new Flink116SqlInterpreter(flinkSqlContext, false); + this.streamSqlInterpreter = new Flink117SqlInterpreter(flinkSqlContext, false); } @Override @@ -252,12 +250,12 @@ public class Flink116Shims extends FlinkShims { @Override public Object fromDataSet(Object btenv, Object ds) { - throw new RuntimeException("Conversion from DataSet is not supported in Flink 1.15"); + throw new RuntimeException("Conversion from DataSet is not supported in Flink 1.17"); } @Override public Object toDataSet(Object btenv, Object table) { - throw new RuntimeException("Conversion to DataSet is not supported in Flink 1.15"); + throw new RuntimeException("Conversion to DataSet is not supported in Flink 1.17"); } @Override diff --git a/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/Flink117SqlInterpreter.java b/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/Flink117SqlInterpreter.java new file mode 100644 index 0000000000..b53d02c8e6 --- /dev/null +++ b/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/Flink117SqlInterpreter.java @@ -0,0 +1,590 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.JobListener; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.SqlParserException; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.internal.TableEnvironmentInternal; +import org.apache.flink.table.delegation.Parser; +import org.apache.flink.table.operations.*; +import org.apache.flink.table.operations.command.HelpOperation; +import org.apache.flink.table.operations.command.SetOperation; +import org.apache.flink.table.operations.ddl.*; +import org.apache.flink.table.utils.EncodingUtils; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.ZeppelinContext; +import org.apache.zeppelin.interpreter.util.SqlSplitter; +import org.jline.utils.AttributedString; +import org.jline.utils.AttributedStringBuilder; +import org.jline.utils.AttributedStyle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + + +public class Flink117SqlInterpreter { + + private static final Logger LOGGER = LoggerFactory.getLogger(Flink117SqlInterpreter.class); + private static final String CMD_DESC_DELIMITER = "\t\t"; + + /** + * SQL Client HELP command helper class. + */ + private static final class SQLCliCommandsDescriptions { + private int commandMaxLength; + private final Map<String, String> commandsDescriptions; + + public SQLCliCommandsDescriptions() { + this.commandsDescriptions = new LinkedHashMap<>(); + this.commandMaxLength = -1; + } + + public SQLCliCommandsDescriptions commandDescription(String command, String description) { + Preconditions.checkState( + StringUtils.isNotBlank(command), "content of command must not be empty."); + Preconditions.checkState( + StringUtils.isNotBlank(description), + "content of command's description must not be empty."); + this.updateMaxCommandLength(command.length()); + this.commandsDescriptions.put(command, description); + return this; + } + + private void updateMaxCommandLength(int newLength) { + Preconditions.checkState(newLength > 0); + if (this.commandMaxLength < newLength) { + this.commandMaxLength = newLength; + } + } + + public AttributedString build() { + AttributedStringBuilder attributedStringBuilder = new AttributedStringBuilder(); + if (!this.commandsDescriptions.isEmpty()) { + this.commandsDescriptions.forEach( + (cmd, cmdDesc) -> { + attributedStringBuilder + .style(AttributedStyle.DEFAULT.bold()) + .append( + String.format( + String.format("%%-%ds", commandMaxLength), cmd)) + .append(CMD_DESC_DELIMITER) + .style(AttributedStyle.DEFAULT) + .append(cmdDesc) + .append('\n'); + }); + } + return attributedStringBuilder.toAttributedString(); + } + } + + private static final AttributedString SQL_CLI_COMMANDS_DESCRIPTIONS = + new SQLCliCommandsDescriptions() + .commandDescription("HELP", "Prints the available commands.") + .commandDescription( + "SET", + "Sets a session configuration property. Syntax: \"SET '<key>'='<value>';\". Use \"SET;\" for listing all properties.") + .commandDescription( + "RESET", + "Resets a session configuration property. Syntax: \"RESET '<key>';\". Use \"RESET;\" for reset all session properties.") + .commandDescription( + "INSERT INTO", + "Inserts the results of a SQL SELECT query into a declared table sink.") + .commandDescription( + "INSERT OVERWRITE", + "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.") + .commandDescription( + "SELECT", "Executes a SQL SELECT query on the Flink cluster.") + .commandDescription( + "EXPLAIN", + "Describes the execution plan of a query or table with the given name.") + .commandDescription( + "BEGIN STATEMENT SET", + "Begins a statement set. Syntax: \"BEGIN STATEMENT SET;\"") + .commandDescription("END", "Ends a statement set. Syntax: \"END;\"") + // (TODO) zjffdu, ADD/REMOVE/SHOW JAR + .build(); + + // -------------------------------------------------------------------------------------------- + + public static final AttributedString MESSAGE_HELP = + new AttributedStringBuilder() + .append("The following commands are available:\n\n") + .append(SQL_CLI_COMMANDS_DESCRIPTIONS) + .style(AttributedStyle.DEFAULT.underline()) + .append("\nHint") + .style(AttributedStyle.DEFAULT) + .append( + ": Make sure that a statement ends with \";\" for finalizing (multi-line) statements.") + // About Documentation Link. + .style(AttributedStyle.DEFAULT) + .append( + "\nYou can also type any Flink SQL statement, please visit https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/overview/ for more details.") + .toAttributedString(); + + private static final String MESSAGE_NO_STATEMENT_IN_STATEMENT_SET = "No statement in the statement set, skip submit."; + + private FlinkSqlContext flinkSqlContext; + private TableEnvironment tbenv; + private ZeppelinContext z; + private Parser sqlParser; + private SqlSplitter sqlSplitter; + // paragraphId -> list of ModifyOperation, used for statement set in 2 syntax: + // 1. runAsOne= true + // 2. begin statement set; + // ... + // end; + private Map<String, List<ModifyOperation>> statementOperationsMap = new HashMap<>(); + private boolean isBatch; + private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock(); + + + public Flink117SqlInterpreter(FlinkSqlContext flinkSqlContext, boolean isBatch) { + this.flinkSqlContext = flinkSqlContext; + this.isBatch = isBatch; + if (isBatch) { + this.tbenv = (TableEnvironment) flinkSqlContext.getBtenv(); + } else { + this.tbenv = (TableEnvironment) flinkSqlContext.getStenv(); + } + this.z = (ZeppelinContext) flinkSqlContext.getZeppelinContext(); + this.sqlParser = ((TableEnvironmentInternal) tbenv).getParser(); + this.sqlSplitter = new SqlSplitter(); + JobListener jobListener = new JobListener() { + @Override + public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + LOGGER.info("UnLock JobSubmitLock"); + } + } + + @Override + public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) { + + } + }; + + ((ExecutionEnvironment) flinkSqlContext.getBenv()).registerJobListener(jobListener); + ((StreamExecutionEnvironment) flinkSqlContext.getSenv()).registerJobListener(jobListener); + } + + public InterpreterResult runSqlList(String st, InterpreterContext context) { + try { + boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false")); + if (runAsOne) { + statementOperationsMap.put(context.getParagraphId(), new ArrayList<>()); + } + + String jobName = context.getLocalProperties().get("jobName"); + if (StringUtils.isNotBlank(jobName)) { + tbenv.getConfig().getConfiguration().set(PipelineOptions.NAME, jobName); + } + + List<String> sqls = sqlSplitter.splitSql(st).stream().map(String::trim).collect(Collectors.toList()); + for (String sql : sqls) { + List<Operation> operations = null; + try { + operations = sqlParser.parse(sql); + } catch (SqlParserException e) { + context.out.write("%text Invalid Sql statement: " + sql + "\n"); + context.out.write(MESSAGE_HELP.toString()); + return new InterpreterResult(InterpreterResult.Code.ERROR, e.toString()); + } + + try { + callOperation(sql, operations.get(0), context); + context.out.flush(); + } catch (Throwable e) { + LOGGER.error("Fail to run sql:" + sql, e); + try { + context.out.write("%text Fail to run sql command: " + + sql + "\n" + ExceptionUtils.getStackTrace(e) + "\n"); + } catch (IOException ex) { + LOGGER.warn("Unexpected exception:", ex); + return new InterpreterResult(InterpreterResult.Code.ERROR, + ExceptionUtils.getStackTrace(e)); + } + return new InterpreterResult(InterpreterResult.Code.ERROR); + } + } + + if (runAsOne && !statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>()).isEmpty()) { + try { + lock.lock(); + List<ModifyOperation> modifyOperations = statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>()); + if (!modifyOperations.isEmpty()) { + callInserts(modifyOperations, context); + } + } catch (Exception e) { + LOGGER.error("Fail to execute sql as one job", e); + return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e)); + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + } + } catch (Exception e) { + LOGGER.error("Fail to execute sql", e); + return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e)); + } finally { + statementOperationsMap.remove(context.getParagraphId()); + } + + return new InterpreterResult(InterpreterResult.Code.SUCCESS); + } + + private void callOperation(String sql, Operation operation, InterpreterContext context) throws IOException { + if (operation instanceof HelpOperation) { + // HELP + callHelp(context); + } else if (operation instanceof SetOperation) { + // SET + callSet((SetOperation) operation, context); + } else if (operation instanceof ModifyOperation) { + // INSERT INTO/OVERWRITE + callInsert((ModifyOperation) operation, context); + } else if (operation instanceof QueryOperation) { + // SELECT + callSelect(sql, (QueryOperation) operation, context); + } else if (operation instanceof ExplainOperation) { + // EXPLAIN + callExplain((ExplainOperation) operation, context); + } else if (operation instanceof BeginStatementSetOperation) { + // BEGIN STATEMENT SET + callBeginStatementSet(context); + } else if (operation instanceof EndStatementSetOperation) { + // END + callEndStatementSet(context); + } else if (operation instanceof ShowCreateTableOperation) { + // SHOW CREATE TABLE + callShowCreateTable((ShowCreateTableOperation) operation, context); + } else if (operation instanceof ShowCatalogsOperation) { + callShowCatalogs(context); + } else if (operation instanceof ShowCurrentCatalogOperation) { + callShowCurrentCatalog(context); + } else if (operation instanceof UseCatalogOperation) { + callUseCatalog(((UseCatalogOperation) operation).getCatalogName(), context); + } else if (operation instanceof CreateCatalogOperation) { + callDDL(sql, context, "Catalog has been created."); + } else if (operation instanceof DropCatalogOperation) { + callDDL(sql, context, "Catalog has been dropped."); + } else if (operation instanceof UseDatabaseOperation) { + UseDatabaseOperation useDBOperation = (UseDatabaseOperation) operation; + callUseDatabase(useDBOperation.getDatabaseName(), context); + } else if (operation instanceof CreateDatabaseOperation) { + callDDL(sql, context, "Database has been created."); + } else if (operation instanceof DropDatabaseOperation) { + callDDL(sql, context, "Database has been removed."); + } else if (operation instanceof AlterDatabaseOperation) { + callDDL(sql, context, "Alter database succeeded!"); + } else if (operation instanceof ShowDatabasesOperation) { + callShowDatabases(context); + } else if (operation instanceof ShowCurrentDatabaseOperation) { + callShowCurrentDatabase(context); + } else if (operation instanceof CreateTableOperation || operation instanceof CreateTableASOperation) { + callDDL(sql, context, "Table has been created."); + } else if (operation instanceof AlterTableOperation) { + callDDL(sql, context, "Alter table succeeded!"); + } else if (operation instanceof DropTableOperation) { + callDDL(sql, context, "Table has been dropped."); + } else if (operation instanceof DescribeTableOperation) { + DescribeTableOperation describeTableOperation = (DescribeTableOperation) operation; + callDescribe(describeTableOperation.getSqlIdentifier().getObjectName(), context); + } else if (operation instanceof ShowTablesOperation) { + callShowTables(context); + } else if (operation instanceof CreateViewOperation) { + callDDL(sql, context, "View has been created."); + } else if (operation instanceof DropViewOperation) { + callDDL(sql, context, "View has been dropped."); + } else if (operation instanceof AlterViewOperation) { + callDDL(sql, context, "Alter view succeeded!"); + } else if (operation instanceof CreateCatalogFunctionOperation || operation instanceof CreateTempSystemFunctionOperation) { + callDDL(sql, context, "Function has been created."); + } else if (operation instanceof DropCatalogFunctionOperation || operation instanceof DropTempSystemFunctionOperation) { + callDDL(sql, context, "Function has been removed."); + } else if (operation instanceof AlterCatalogFunctionOperation) { + callDDL(sql, context, "Alter function succeeded!"); + } else if (operation instanceof ShowFunctionsOperation) { + callShowFunctions(context); + } else if (operation instanceof ShowModulesOperation) { + callShowModules(context); + } else if (operation instanceof ShowPartitionsOperation) { + ShowPartitionsOperation showPartitionsOperation = (ShowPartitionsOperation) operation; + callShowPartitions(showPartitionsOperation.asSummaryString(), context); + } else { + throw new IOException(operation.getClass().getName() + " is not supported"); + } + } + + + private void callHelp(InterpreterContext context) throws IOException { + context.out.write(MESSAGE_HELP.toString() + "\n"); + } + + private void callInsert(ModifyOperation operation, InterpreterContext context) throws IOException { + if (statementOperationsMap.containsKey(context.getParagraphId())) { + List<ModifyOperation> modifyOperations = statementOperationsMap.get(context.getParagraphId()); + modifyOperations.add(operation); + } else { + callInserts(Collections.singletonList(operation), context); + } + } + + private void callInserts(List<ModifyOperation> operations, InterpreterContext context) throws IOException { + if (!isBatch) { + context.getLocalProperties().put("flink.streaming.insert_into", "true"); + } + TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations); + checkState(tableResult.getJobClient().isPresent()); + try { + tableResult.await(); + JobClient jobClient = tableResult.getJobClient().get(); + if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { + context.out.write("Insertion successfully.\n"); + } else { + throw new IOException("Job is failed, " + jobClient.getJobExecutionResult().get().toString()); + } + } catch (InterruptedException e) { + throw new IOException("Flink job is interrupted", e); + } catch (ExecutionException e) { + throw new IOException("Flink job is failed", e); + } + } + + private void callShowCreateTable(ShowCreateTableOperation showCreateTableOperation, InterpreterContext context) throws IOException { + try { + lock.lock(); + TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(showCreateTableOperation); + String explanation = + Objects.requireNonNull(tableResult.collect().next().getField(0)).toString(); + context.out.write(explanation + "\n"); + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + } + + private void callExplain(ExplainOperation explainOperation, InterpreterContext context) throws IOException { + try { + lock.lock(); + TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(explainOperation); + String explanation = + Objects.requireNonNull(tableResult.collect().next().getField(0)).toString(); + context.out.write(explanation + "\n"); + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + } + + public void callSelect(String sql, QueryOperation queryOperation, InterpreterContext context) throws IOException { + try { + lock.lock(); + if (isBatch) { + callBatchInnerSelect(sql, context); + } else { + callStreamInnerSelect(sql, context); + } + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + } + + public void callBatchInnerSelect(String sql, InterpreterContext context) throws IOException { + Table table = this.tbenv.sqlQuery(sql); + String result = z.showData(table); + context.out.write(result); + } + + public void callStreamInnerSelect(String sql, InterpreterContext context) throws IOException { + flinkSqlContext.getStreamSqlSelectConsumer().accept(sql); + } + + public void callSet(SetOperation setOperation, InterpreterContext context) throws IOException { + if (setOperation.getKey().isPresent() && setOperation.getValue().isPresent()) { + // set a property + String key = setOperation.getKey().get().trim(); + String value = setOperation.getValue().get().trim(); + this.tbenv.getConfig().getConfiguration().setString(key, value); + LOGGER.info("Set table config: {}={}", key, value); + } else { + // show all properties + final Map<String, String> properties = this.tbenv.getConfig().getConfiguration().toMap(); + List<String> prettyEntries = new ArrayList<>(); + for (String key : properties.keySet()) { + prettyEntries.add( + String.format( + "'%s' = '%s'", + EncodingUtils.escapeSingleQuotes(key), + EncodingUtils.escapeSingleQuotes(properties.get(key)))); + } + prettyEntries.sort(String::compareTo); + prettyEntries.forEach(entry -> { + try { + context.out.write(entry + "\n"); + } catch (IOException e) { + LOGGER.warn("Fail to write output", e); + } + }); + } + } + + private void callBeginStatementSet(InterpreterContext context) throws IOException { + statementOperationsMap.put(context.getParagraphId(), new ArrayList<>()); + } + + private void callEndStatementSet(InterpreterContext context) throws IOException { + List<ModifyOperation> modifyOperations = statementOperationsMap.get(context.getParagraphId()); + if (modifyOperations != null && !modifyOperations.isEmpty()) { + callInserts(modifyOperations, context); + } else { + context.out.write(MESSAGE_NO_STATEMENT_IN_STATEMENT_SET); + } + } + + private void callUseCatalog(String catalog, InterpreterContext context) throws IOException { + tbenv.executeSql("USE CATALOG `" + catalog + "`"); + } + + private void callUseDatabase(String databaseName, + InterpreterContext context) throws IOException { + this.tbenv.executeSql("USE `" + databaseName + "`"); + } + + private void callShowCatalogs(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Catalogs"); + List<String> catalogs = CollectionUtil.iteratorToList(tableResult.collect()).stream() + .map(r -> checkNotNull(r.getField(0)).toString()) + .collect(Collectors.toList()); + context.out.write("%table catalog\n" + StringUtils.join(catalogs, "\n") + "\n"); + } + + private void callShowCurrentCatalog(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Current Catalog"); + String catalog = tableResult.collect().next().getField(0).toString(); + context.out.write("%text current catalog: " + catalog + "\n"); + } + + private void callShowDatabases(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Databases"); + List<String> databases = CollectionUtil.iteratorToList(tableResult.collect()).stream() + .map(r -> checkNotNull(r.getField(0)).toString()) + .collect(Collectors.toList()); + context.out.write( + "%table database\n" + StringUtils.join(databases, "\n") + "\n"); + } + + private void callShowCurrentDatabase(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Current Database"); + String database = tableResult.collect().next().getField(0).toString(); + context.out.write("%text current database: " + database + "\n"); + } + + private void callShowTables(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Tables"); + List<String> tables = CollectionUtil.iteratorToList(tableResult.collect()).stream() + .map(r -> checkNotNull(r.getField(0)).toString()) + .filter(tbl -> !tbl.startsWith("UnnamedTable")) + .collect(Collectors.toList()); + context.out.write( + "%table table\n" + StringUtils.join(tables, "\n") + "\n"); + } + + private void callShowFunctions(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Functions"); + List<String> functions = CollectionUtil.iteratorToList(tableResult.collect()).stream() + .map(r -> checkNotNull(r.getField(0)).toString()) + .collect(Collectors.toList()); + context.out.write( + "%table function\n" + StringUtils.join(functions, "\n") + "\n"); + } + + private void callShowModules(InterpreterContext context) throws IOException { + String[] modules = this.tbenv.listModules(); + context.out.write("%table modules\n" + StringUtils.join(modules, "\n") + "\n"); + } + + private void callShowPartitions(String sql, InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql(sql); + List<String> partions = CollectionUtil.iteratorToList(tableResult.collect()).stream() + .map(r -> checkNotNull(r.getField(0)).toString()) + .collect(Collectors.toList()); + context.out.write( + "%table partitions\n" + StringUtils.join(partions, "\n") + "\n"); + } + + private void callDDL(String sql, InterpreterContext context, String message) throws IOException { + try { + lock.lock(); + this.tbenv.executeSql(sql); + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + context.out.write(message + "\n"); + } + + private void callDescribe(String name, InterpreterContext context) throws IOException { + TableResult tableResult = null; + try { + tableResult = tbenv.executeSql("DESCRIBE " + name); + } catch (Exception e) { + throw new IOException("Fail to describe table: " + name, e); + } + CloseableIterator<Row> result = tableResult.collect(); + StringBuilder builder = new StringBuilder(); + builder.append("Column\tType\n"); + while (result.hasNext()) { + Row row = result.next(); + builder.append(row.getField(0) + "\t" + row.getField(1) + "\n"); + } + context.out.write("%table\n" + builder.toString()); + } +} diff --git a/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java b/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java new file mode 100644 index 0000000000..a35ad3a6cd --- /dev/null +++ b/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink; + + +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.*; +import org.apache.flink.types.Row; +import org.apache.flink.util.StringUtils; + +import java.sql.Time; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; +import static org.apache.zeppelin.flink.TimestampStringUtils.*; + +/** + * Copied from flink-project with minor modification. + * */ +public class PrintUtils { + + public static final String NULL_COLUMN = "(NULL)"; + private static final String COLUMN_TRUNCATED_FLAG = "..."; + + private PrintUtils() {} + + + public static String[] rowToString( + Row row, ResolvedSchema resolvedSchema, ZoneId sessionTimeZone) { + return rowToString(row, NULL_COLUMN, false, resolvedSchema, sessionTimeZone); + } + + public static String[] rowToString( + Row row, + String nullColumn, + boolean printRowKind, + ResolvedSchema resolvedSchema, + ZoneId sessionTimeZone) { + final int len = printRowKind ? row.getArity() + 1 : row.getArity(); + final List<String> fields = new ArrayList<>(len); + if (printRowKind) { + fields.add(row.getKind().shortString()); + } + for (int i = 0; i < row.getArity(); i++) { + final Object field = row.getField(i); + final LogicalType fieldType = + resolvedSchema.getColumnDataTypes().get(i).getLogicalType(); + if (field == null) { + fields.add(nullColumn); + } else { + fields.add( + StringUtils.arrayAwareToString( + formattedTimestamp(field, fieldType, sessionTimeZone))); + } + } + return fields.toArray(new String[0]); + } + + /** + * Normalizes field that contains TIMESTAMP, TIMESTAMP_LTZ and TIME type data. + * + * <p>This method also supports nested type ARRAY, ROW, MAP. + */ + private static Object formattedTimestamp( + Object field, LogicalType fieldType, ZoneId sessionTimeZone) { + final LogicalTypeRoot typeRoot = fieldType.getTypeRoot(); + if (field == null) { + return "null"; + } + switch (typeRoot) { + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return formatTimestampField(field, fieldType, sessionTimeZone); + case TIME_WITHOUT_TIME_ZONE: + return formatTimeField(field); + case ARRAY: + LogicalType elementType = ((ArrayType) fieldType).getElementType(); + if (field instanceof List) { + List<?> array = (List<?>) field; + Object[] formattedArray = new Object[array.size()]; + for (int i = 0; i < array.size(); i++) { + formattedArray[i] = + formattedTimestamp(array.get(i), elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass().isArray()) { + // primitive type + if (field.getClass() == byte[].class) { + byte[] array = (byte[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == short[].class) { + short[] array = (short[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == int[].class) { + int[] array = (int[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == long[].class) { + long[] array = (long[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == float[].class) { + float[] array = (float[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == double[].class) { + double[] array = (double[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == boolean[].class) { + boolean[] array = (boolean[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == char[].class) { + char[] array = (char[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else { + // non-primitive type + Object[] array = (Object[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } + } else { + return field; + } + case ROW: + if (fieldType instanceof RowType && field instanceof Row) { + Row row = (Row) field; + Row formattedRow = new Row(row.getKind(), row.getArity()); + for (int i = 0; i < ((RowType) fieldType).getFields().size(); i++) { + LogicalType type = ((RowType) fieldType).getFields().get(i).getType(); + formattedRow.setField( + i, formattedTimestamp(row.getField(i), type, sessionTimeZone)); + } + return formattedRow; + + } else if (fieldType instanceof RowType && field instanceof RowData) { + RowData rowData = (RowData) field; + Row formattedRow = new Row(rowData.getRowKind(), rowData.getArity()); + for (int i = 0; i < ((RowType) fieldType).getFields().size(); i++) { + LogicalType type = ((RowType) fieldType).getFields().get(i).getType(); + RowData.FieldGetter fieldGetter = RowData.createFieldGetter(type, i); + formattedRow.setField( + i, + formattedTimestamp( + fieldGetter.getFieldOrNull(rowData), + type, + sessionTimeZone)); + } + return formattedRow; + } else { + return field; + } + case MAP: + LogicalType keyType = ((MapType) fieldType).getKeyType(); + LogicalType valueType = ((MapType) fieldType).getValueType(); + if (fieldType instanceof MapType && field instanceof Map) { + Map<Object, Object> map = ((Map) field); + Map<Object, Object> formattedMap = new HashMap<>(map.size()); + for (Object key : map.keySet()) { + formattedMap.put( + formattedTimestamp(key, keyType, sessionTimeZone), + formattedTimestamp(map.get(key), valueType, sessionTimeZone)); + } + return formattedMap; + } else if (fieldType instanceof MapType && field instanceof MapData) { + MapData map = ((MapData) field); + Map<Object, Object> formattedMap = new HashMap<>(map.size()); + Object[] keyArray = + (Object[]) formattedTimestamp(map.keyArray(), keyType, sessionTimeZone); + Object[] valueArray = + (Object[]) + formattedTimestamp( + map.valueArray(), valueType, sessionTimeZone); + for (int i = 0; i < keyArray.length; i++) { + formattedMap.put(keyArray[i], valueArray[i]); + } + return formattedMap; + } else { + return field; + } + default: + return field; + } + } + + /** + * Formats the print content of TIMESTAMP and TIMESTAMP_LTZ type data, consider the user + * configured time zone. + */ + private static Object formatTimestampField( + Object timestampField, LogicalType fieldType, ZoneId sessionTimeZone) { + switch (fieldType.getTypeRoot()) { + case TIMESTAMP_WITHOUT_TIME_ZONE: + final int precision = getPrecision(fieldType); + if (timestampField instanceof java.sql.Timestamp) { + // conversion between java.sql.Timestamp and TIMESTAMP_WITHOUT_TIME_ZONE + return timestampToString( + ((Timestamp) timestampField).toLocalDateTime(), precision); + } else if (timestampField instanceof java.time.LocalDateTime) { + return timestampToString(((LocalDateTime) timestampField), precision); + } else if (timestampField instanceof TimestampData) { + return timestampToString( + ((TimestampData) timestampField).toLocalDateTime(), precision); + } else { + return timestampField; + } + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + Instant instant = null; + if (timestampField instanceof java.time.Instant) { + instant = ((Instant) timestampField); + } else if (timestampField instanceof java.sql.Timestamp) { + Timestamp timestamp = ((Timestamp) timestampField); + // conversion between java.sql.Timestamp and TIMESTAMP_WITH_LOCAL_TIME_ZONE + instant = + TimestampData.fromEpochMillis( + timestamp.getTime(), timestamp.getNanos() % 1000_000) + .toInstant(); + } else if (timestampField instanceof TimestampData) { + instant = ((TimestampData) timestampField).toInstant(); + } else if (timestampField instanceof Integer) { + instant = Instant.ofEpochSecond((Integer) timestampField); + } else if (timestampField instanceof Long) { + instant = Instant.ofEpochMilli((Long) timestampField); + } + if (instant != null) { + return timestampToString( + instant.atZone(sessionTimeZone).toLocalDateTime(), + getPrecision(fieldType)); + } else { + return timestampField; + } + default: + return timestampField; + } + } + + /** Formats the print content of TIME type data. */ + private static Object formatTimeField(Object timeField) { + if (timeField.getClass().isAssignableFrom(int.class) || timeField instanceof Integer) { + return unixTimeToString((int) timeField); + } else if (timeField.getClass().isAssignableFrom(long.class) || timeField instanceof Long) { + return unixTimeToString(((Long) timeField).intValue()); + } else if (timeField instanceof Time) { + return unixTimeToString(timeToInternal((Time) timeField)); + } else if (timeField instanceof LocalTime) { + return unixTimeToString(localTimeToUnixDate((LocalTime) timeField)); + } else { + return timeField; + } + } +} diff --git a/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/TimestampStringUtils.java b/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/TimestampStringUtils.java new file mode 100644 index 0000000000..c52104e45a --- /dev/null +++ b/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/TimestampStringUtils.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink; + +import java.sql.Time; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.TimeZone; + +/** + * Copied from flink-project with minor modification. + * */ +public class TimestampStringUtils { + + private static final TimeZone LOCAL_TZ = TimeZone.getDefault(); + + public TimestampStringUtils() { + } + + public static String timestampToString(LocalDateTime ldt, int precision) { + String fraction; + for(fraction = pad(9, (long)ldt.getNano()); fraction.length() > precision && fraction.endsWith("0"); fraction = fraction.substring(0, fraction.length() - 1)) { + } + + StringBuilder ymdhms = ymdhms(new StringBuilder(), ldt.getYear(), ldt.getMonthValue(), ldt.getDayOfMonth(), ldt.getHour(), ldt.getMinute(), ldt.getSecond()); + if (fraction.length() > 0) { + ymdhms.append(".").append(fraction); + } + + return ymdhms.toString(); + } + + private static String pad(int length, long v) { + StringBuilder s = new StringBuilder(Long.toString(v)); + + while(s.length() < length) { + s.insert(0, "0"); + } + + return s.toString(); + } + + private static StringBuilder hms(StringBuilder b, int h, int m, int s) { + int2(b, h); + b.append(':'); + int2(b, m); + b.append(':'); + int2(b, s); + return b; + } + + private static StringBuilder ymdhms(StringBuilder b, int year, int month, int day, int h, int m, int s) { + ymd(b, year, month, day); + b.append(' '); + hms(b, h, m, s); + return b; + } + + private static StringBuilder ymd(StringBuilder b, int year, int month, int day) { + int4(b, year); + b.append('-'); + int2(b, month); + b.append('-'); + int2(b, day); + return b; + } + + private static void int4(StringBuilder buf, int i) { + buf.append((char)(48 + i / 1000 % 10)); + buf.append((char)(48 + i / 100 % 10)); + buf.append((char)(48 + i / 10 % 10)); + buf.append((char)(48 + i % 10)); + } + + private static void int2(StringBuilder buf, int i) { + buf.append((char)(48 + i / 10 % 10)); + buf.append((char)(48 + i % 10)); + } + + public static String unixTimeToString(int time) { + StringBuilder buf = new StringBuilder(8); + unixTimeToString(buf, time, 0); + return buf.toString(); + } + + private static void unixTimeToString(StringBuilder buf, int time, int precision) { + while(time < 0) { + time = (int)((long)time + 86400000L); + } + + int h = time / 3600000; + int time2 = time % 3600000; + int m = time2 / '\uea60'; + int time3 = time2 % '\uea60'; + int s = time3 / 1000; + int ms = time3 % 1000; + int2(buf, h); + buf.append(':'); + int2(buf, m); + buf.append(':'); + int2(buf, s); + if (precision > 0) { + buf.append('.'); + + while(precision > 0) { + buf.append((char)(48 + ms / 100)); + ms %= 100; + ms *= 10; + if (ms == 0) { + break; + } + + --precision; + } + } + + } + + public static int timeToInternal(Time time) { + long ts = time.getTime() + (long)LOCAL_TZ.getOffset(time.getTime()); + return (int)(ts % 86400000L); + } + + public static int localTimeToUnixDate(LocalTime time) { + return time.getHour() * 3600000 + time.getMinute() * '\uea60' + time.getSecond() * 1000 + time.getNano() / 1000000; + } +} diff --git a/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/shims117/CollectStreamTableSink.java b/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/shims117/CollectStreamTableSink.java new file mode 100644 index 0000000000..ee58e770d4 --- /dev/null +++ b/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/shims117/CollectStreamTableSink.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink.shims117; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.experimental.CollectSink; +import org.apache.flink.table.sinks.RetractStreamTableSink; +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.util.UUID; + +/** + * Table sink for collecting the results locally using sockets. + */ +public class CollectStreamTableSink implements RetractStreamTableSink<Row> { + + private static final Logger LOGGER = LoggerFactory.getLogger(CollectStreamTableSink.class); + + private final InetAddress targetAddress; + private final int targetPort; + private final TypeSerializer<Tuple2<Boolean, Row>> serializer; + + private String[] fieldNames; + private TypeInformation<?>[] fieldTypes; + + public CollectStreamTableSink(InetAddress targetAddress, + int targetPort, + TypeSerializer<Tuple2<Boolean, Row>> serializer) { + LOGGER.info("Use address: " + targetAddress.getHostAddress() + ":" + targetPort); + this.targetAddress = targetAddress; + this.targetPort = targetPort; + this.serializer = serializer; + } + + @Override + public String[] getFieldNames() { + return fieldNames; + } + + @Override + public TypeInformation<?>[] getFieldTypes() { + return fieldTypes; + } + + @Override + public CollectStreamTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { + final CollectStreamTableSink copy = + new CollectStreamTableSink(targetAddress, targetPort, serializer); + copy.fieldNames = fieldNames; + copy.fieldTypes = fieldTypes; + return copy; + } + + @Override + public TypeInformation<Row> getRecordType() { + return Types.ROW_NAMED(fieldNames, fieldTypes); + } + + @Override + public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> stream) { + // add sink + return stream + .addSink(new CollectSink<>(targetAddress, targetPort, serializer)) + .name("Zeppelin Flink Sql Stream Collect Sink " + UUID.randomUUID()) + .setParallelism(1); + } + + @Override + public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() { + return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType()); + } +} diff --git a/flink/pom.xml b/flink/pom.xml index 1939c1ccb4..428ab57c89 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -40,6 +40,7 @@ <module>flink1.14-shims</module> <module>flink1.15-shims</module> <module>flink1.16-shims</module> + <module>flink1.17-shims</module> </modules> <properties> @@ -47,12 +48,21 @@ <flink1.14.version>1.14.0</flink1.14.version> <flink1.15.version>1.15.1</flink1.15.version> <flink1.16.version>1.16.0</flink1.16.version> + <flink1.17.version>1.17.1</flink1.17.version> <flink.scala.version>2.11.12</flink.scala.version> <flink.scala.binary.version>2.11</flink.scala.binary.version> </properties> <profiles> + <profile> + <id>flink-117</id> + <!-- Flink 1.17 only support scala 2.12--> + <modules> + <module>flink-scala-2.12</module> + </modules> + </profile> + <profile> <id>flink-116</id> <!-- Flink 1.16 only support scala 2.12--> diff --git a/testing/env_python_3_with_flink_117.yml b/testing/env_python_3_with_flink_117.yml new file mode 100644 index 0000000000..91dfb2f644 --- /dev/null +++ b/testing/env_python_3_with_flink_117.yml @@ -0,0 +1,29 @@ +name: python_3_with_flink +channels: + - conda-forge + - defaults +dependencies: + - pycodestyle + - scipy + - numpy=1.19.5 + - grpcio + - protobuf + - pandasql + - ipython + - ipython_genutils + - ipykernel + - jupyter_client=5 + - hvplot + - plotnine + - seaborn + - intake + - intake-parquet + - intake-xarray + - altair + - vega_datasets + - plotly + - jinja2=3.0.3 + - pip + - pip: + - apache-flink==1.17.1 +