This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new c98081c [ZEPPELIN-5392] Timezone is not applied in the timestamp field in flink interpreter c98081c is described below commit c98081c7cf83e5d606ed09be2cdcbf7e845b6ad8 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Fri Jun 18 17:27:32 2021 +0800 [ZEPPELIN-5392] Timezone is not applied in the timestamp field in flink interpreter ### What is this PR for? Add timezone support for flink interpreter, It is only available for flink 1.13, previous version are not supported. ### What type of PR is it? [ Improvement ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5392 ### How should this be tested? * Manually tested. ### Screenshots (if appropriate)  ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #4137 from zjffdu/ZEPPELIN-5392 and squashes the following commits: 77695c7acb [Jeff Zhang] set table in run method 402c01c17d [Jeff Zhang] address comments b0b495463f [Jeff Zhang] [ZEPPELIN-5392] Timezone is not applied in the timestamp field in flink interpreter --- .../zeppelin/flink/sql/AbstractStreamSqlJob.java | 21 ++++++++++++++++++++- .../zeppelin/flink/sql/AppendStreamSqlJob.java | 11 +---------- .../zeppelin/flink/sql/UpdateStreamSqlJob.java | 11 +---------- .../java/org/apache/zeppelin/flink/FlinkShims.java | 2 ++ .../org/apache/zeppelin/flink/Flink110Shims.java | 20 +++++++++++++++++++- .../org/apache/zeppelin/flink/Flink111Shims.java | 6 ++++++ .../org/apache/zeppelin/flink/Flink112Shims.java | 6 ++++++ .../org/apache/zeppelin/flink/Flink113Shims.java | 16 ++++++++++++++++ 8 files changed, 71 insertions(+), 22 deletions(-) diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java index 91cdb90..1fb7832 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java @@ -36,14 +36,18 @@ import org.apache.zeppelin.flink.FlinkShims; import org.apache.zeppelin.flink.JobManager; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; +import org.apache.zeppelin.tabledata.TableDataUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetAddress; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -57,6 +61,7 @@ public abstract class AbstractStreamSqlJob { private static AtomicInteger SQL_INDEX = new AtomicInteger(0); protected StreamExecutionEnvironment senv; protected TableEnvironment stenv; + private Table table; protected JobManager jobManager; protected InterpreterContext context; protected TableSchema schema; @@ -99,7 +104,7 @@ public abstract class AbstractStreamSqlJob { protected abstract String getType(); public String run(String st) throws IOException { - Table table = stenv.sqlQuery(st); + this.table = stenv.sqlQuery(st); String tableName = "UnnamedTable_" + "_" + SQL_INDEX.getAndIncrement(); return run(table, tableName); @@ -107,6 +112,7 @@ public abstract class AbstractStreamSqlJob { public String run(Table table, String tableName) throws IOException { try { + this.table = table; int parallelism = Integer.parseInt(context.getLocalProperties() .getOrDefault("parallelism", defaultParallelism + "")); this.schema = removeTimeAttributes(table.getSchema()); @@ -197,6 +203,19 @@ public abstract class AbstractStreamSqlJob { protected abstract String buildResult(); + protected String tableToString(List<Row> rows) { + StringBuilder builder = new StringBuilder(); + for (Row row : rows) { + String[] fields = flinkShims.rowToString(row, table, stenv.getConfig()); + String rowString = Arrays.stream(fields) + .map(TableDataUtils::normalizeColumn) + .collect(Collectors.joining("\t")); + builder.append(rowString); + builder.append("\n"); + } + return builder.toString(); + } + private class ResultRetrievalThread extends Thread { private ScheduledExecutorService refreshExecutorService; diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java index f1eb997..7f636f3 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java @@ -107,16 +107,7 @@ public class AppendStreamSqlJob extends AbstractStreamSqlJob { maxTimestamp - tsWindowThreshold) .collect(Collectors.toList()); - for (Row row : materializedTable) { - for (int i = 0; i < row.getArity(); ++i) { - Object field = row.getField(i); - builder.append(TableDataUtils.normalizeColumn(StringUtils.arrayAwareToString(field))); - if (i != (row.getArity() - 1)) { - builder.append("\t"); - } - } - builder.append("\n"); - } + builder.append(tableToString(materializedTable)); } builder.append("\n%text "); return builder.toString(); diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java index dc1ecc7..44105b9 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java @@ -91,16 +91,7 @@ public class UpdateStreamSqlJob extends AbstractStreamSqlJob { String f2 = TableDataUtils.normalizeColumn(StringUtils.arrayAwareToString(r2.getField(0))); return f1.compareTo(f2); }); - for (Row row : materializedTable) { - for (int i = 0; i < row.getArity(); ++i) { - Object field = row.getField(i); - builder.append(TableDataUtils.normalizeColumn(StringUtils.arrayAwareToString(field))); - if (i != (row.getArity() - 1)) { - builder.append("\t"); - } - } - builder.append("\n"); - } + builder.append(tableToString(materializedTable)); builder.append("\n%text\n"); return builder.toString(); } diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java index acdb4a9..c41488f 100644 --- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java +++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java @@ -157,4 +157,6 @@ public abstract class FlinkShims { public void setOldPlanner(Object tableConfig) { // only needed after flink 1.13 } + + public abstract String[] rowToString(Object row, Object table, Object tableConfig); } diff --git a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java index 9a03b95..3faa52c 100644 --- a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java +++ b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java @@ -44,8 +44,8 @@ import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.TableAggregateFunction; import org.apache.flink.table.functions.TableFunction; -import org.apache.flink.table.functions.UserDefinedFunction; import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.utils.EncodingUtils; import org.apache.flink.types.Row; import org.apache.flink.util.FlinkException; import org.apache.zeppelin.flink.shims110.CollectStreamTableSink; @@ -319,4 +319,22 @@ public class Flink110Shims extends FlinkShims { } return configOptions; } + + @Override + public String[] rowToString(Object row, Object table, Object tableConfig) { + return rowToString((Row) row); + } + + private String[] rowToString(Row row) { + final String[] fields = new String[row.getArity()]; + for (int i = 0; i < row.getArity(); i++) { + final Object field = row.getField(i); + if (field == null) { + fields[i] = "(NULL)"; + } else { + fields[i] = EncodingUtils.objectToString(field); + } + } + return fields; + } } diff --git a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java index f290fad..c8db525 100644 --- a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java +++ b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java @@ -79,6 +79,7 @@ import org.apache.flink.table.operations.ddl.DropTableOperation; import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation; import org.apache.flink.table.operations.ddl.DropViewOperation; import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.utils.PrintUtils; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.FlinkException; @@ -467,4 +468,9 @@ public class Flink111Shims extends FlinkShims { } return configOptions; } + + @Override + public String[] rowToString(Object row, Object table, Object tableConfig) { + return PrintUtils.rowToString((Row) row); + } } diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java index b3f6ccc..648826a 100644 --- a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java +++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java @@ -80,6 +80,7 @@ import org.apache.flink.table.operations.ddl.DropTableOperation; import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation; import org.apache.flink.table.operations.ddl.DropViewOperation; import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.utils.PrintUtils; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.FlinkException; @@ -480,4 +481,9 @@ public class Flink112Shims extends FlinkShims { } return configOptions; } + + @Override + public String[] rowToString(Object row, Object table, Object tableConfig) { + return PrintUtils.rowToString((Row) row); + } } diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java index 8f6f813..054f07f 100644 --- a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java +++ b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java @@ -42,6 +42,7 @@ import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment; import org.apache.flink.table.api.config.ExecutionConfigOptions; @@ -50,6 +51,7 @@ import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentInternal; import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.delegation.Parser; import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.ScalarFunction; @@ -82,6 +84,7 @@ import org.apache.flink.table.operations.ddl.DropTableOperation; import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation; import org.apache.flink.table.operations.ddl.DropViewOperation; import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.utils.PrintUtils; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.FlinkException; @@ -101,6 +104,7 @@ import java.io.File; import java.io.IOException; import java.lang.reflect.Field; import java.net.InetAddress; +import java.time.ZoneId; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -492,4 +496,16 @@ public class Flink113Shims extends FlinkShims { ((TableConfig) tableConfig).getConfiguration() .set(TableConfigOptions.TABLE_PLANNER, PlannerType.OLD); } + + @Override + public String[] rowToString(Object row, Object table, Object tableConfig) { + final String zone = ((TableConfig) tableConfig).getConfiguration() + .get(TableConfigOptions.LOCAL_TIME_ZONE); + ZoneId zoneId = TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone) + ? ZoneId.systemDefault() + : ZoneId.of(zone); + + ResolvedSchema resolvedSchema = ((Table) table).getResolvedSchema(); + return PrintUtils.rowToString((Row) row, resolvedSchema, zoneId); + } }