This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push: new 609c1dc [ZEPPELIN-4800]. Support z.show(table) in flink interpreter 609c1dc is described below commit 609c1dccb762ba655115d64d4456bda8bc5c75b0 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Tue May 5 23:34:43 2020 +0800 [ZEPPELIN-4800]. Support z.show(table) in flink interpreter ### What is this PR for? This PR is to support `z.show(table)` in flink interpreter. It could not only be used in scala api but also pyflink api. See the screenshot below. ### What type of PR is it? [ Feature ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4800 ### How should this be tested? * CI pass ### Screenshots (if appropriate)   ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3762 from zjffdu/ZEPPELIN-4800 and squashes the following commits: 713b9996e [Jeff Zhang] [ZEPPELIN-4800]. Support z.show(table) in flink interpreter (cherry picked from commit 013f7d4629ee9d2f34a3694c616722b6172d23a1) Signed-off-by: Jeff Zhang <zjf...@apache.org> --- .../zeppelin/flink/FlinkZeppelinContext.scala | 2 +- .../flink/FlinkBatchSqlInterpreterTest.java | 10 + .../Flink Tutorial/6. Batch ETL_2EW19CSPA.zpln | 222 ++++++++++++++++++--- 3 files changed, 203 insertions(+), 31 deletions(-) diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala index 3694f9a..146ec63 100644 --- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala +++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala @@ -57,7 +57,7 @@ class FlinkZeppelinContext(val flinkInterpreter: FlinkScalaInterpreter, "ipyflink" -> "org.apache.zeppelin.flink.IPyFlinkInterpreter" ) - private val supportedClasses = Seq(classOf[DataSet[_]]) + private val supportedClasses = Seq(classOf[DataSet[_]], classOf[Table]) def setCurrentSql(sql: String): Unit = { this.currentSql = sql diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java index 19dedfa..03efa9b 100644 --- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java +++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java @@ -70,6 +70,16 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest { assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType()); assertEquals("id\tname\n1\ta\n2\tb\n", resultMessages.get(0).getData()); + // z.show + context = getInterpreterContext(); + result = + flinkInterpreter.interpret("z.show(btenv.sqlQuery(\"select * from source_table\"))", context); + resultMessages = context.out.toInterpreterResultMessage(); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, resultMessages.size()); + assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType()); + assertEquals("id\tname\n1\ta\n2\tb\n", resultMessages.get(0).getData()); + // define scala udf result = flinkInterpreter.interpret( "class AddOne extends ScalarFunction {\n" + diff --git a/notebook/Flink Tutorial/6. Batch ETL_2EW19CSPA.zpln b/notebook/Flink Tutorial/6. Batch ETL_2EW19CSPA.zpln index d558a76..40c8cc2 100644 --- a/notebook/Flink Tutorial/6. Batch ETL_2EW19CSPA.zpln +++ b/notebook/Flink Tutorial/6. Batch ETL_2EW19CSPA.zpln @@ -49,7 +49,7 @@ "title": "Download bank data", "text": "%sh\n\ncd /tmp\nrm -rf bank*\nwget https://archive.ics.uci.edu/ml/machine-learning-databases/00222/bank.zip\nunzip bank.zip\n# upload data to hdfs if you want to run it in yarn mode\n# hadoop fs -put /tmp/bank.csv /tmp/bank.csv\n", "user": "anonymous", - "dateUpdated": "2020-04-27 11:37:36.288", + "dateUpdated": "2020-05-05 22:52:51.576", "config": { "runOnSelectionChange": true, "title": true, @@ -75,7 +75,7 @@ "msg": [ { "type": "TEXT", - "data": "--2020-04-27 11:37:36-- https://archive.ics.uci.edu/ml/machine-learning-databases/00222/bank.zip\nResolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252\nConnecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:443... connected.\nHTTP request sent, awaiting response... 200 OK\nLength: 579043 (565K) [application/x-httpd-php]\nSaving to: ‘bank.zip’\n\n 0K .......... .......... .......... .......... .......... 8% 88.2K 6s\n 50K ... [...] + "data": "--2020-05-05 22:52:51-- https://archive.ics.uci.edu/ml/machine-learning-databases/00222/bank.zip\nResolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252\nConnecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:443... connected.\nHTTP request sent, awaiting response... 200 OK\nLength: 579043 (565K) [application/x-httpd-php]\nSaving to: ‘bank.zip’\n\n 0K .......... .......... .......... .......... .......... 8% 105K 5s\n 50K ... [...] } ] }, @@ -84,8 +84,8 @@ "jobName": "paragraph_1578045094400_1030344935", "id": "paragraph_1578045094400_1030344935", "dateCreated": "2020-01-03 17:51:34.400", - "dateStarted": "2020-04-27 11:37:36.293", - "dateFinished": "2020-04-27 11:37:40.794", + "dateStarted": "2020-05-05 22:52:51.581", + "dateFinished": "2020-05-05 22:52:59.324", "status": "FINISHED" }, { @@ -135,7 +135,7 @@ "title": "Define source table which represents the raw data", "text": "%flink.bsql\n\nDROP TABLE IF EXISTS bank_raw;\nCREATE TABLE bank_raw (\n content STRING\n) WITH (\n\u0027format.field-delimiter\u0027\u003d\u0027\\n\u0027,\n\u0027connector.type\u0027\u003d\u0027filesystem\u0027,\n\u0027format.derive-schema\u0027\u003d\u0027true\u0027,\n\u0027connector.path\u0027\u003d\u0027/tmp/bank.csv\u0027,\n\u0027format.type\u0027\u003d\u0027csv\u0027\n);", "user": "anonymous", - "dateUpdated": "2020-04-27 11:39:53.363", + "dateUpdated": "2020-05-05 23:01:30.843", "config": { "colWidth": 6.0, "fontSize": 9.0, @@ -170,15 +170,15 @@ "jobName": "paragraph_1578044954921_-1188487356", "id": "paragraph_1578044954921_-1188487356", "dateCreated": "2020-01-03 17:49:14.921", - "dateStarted": "2020-04-27 11:38:36.094", - "dateFinished": "2020-04-27 11:38:36.393", + "dateStarted": "2020-05-05 23:01:30.849", + "dateFinished": "2020-05-05 23:01:41.675", "status": "FINISHED" }, { "title": "Define sink table which represents the cleaned data", "text": "%flink.bsql\n\nDROP TABLE IF EXISTS bank;\nCREATE TABLE bank (\n age int, \n job string,\n marital string,\n education string,\n `default` string,\n balance string,\n housing string,\n loan string,\n contact string, \n `day` string,\n `month` string,\n duration int,\n campaign int,\n pdays int,\n previous int,\n poutcome string,\n y string\n) WITH (\n\u0027format.field-delimiter\u0027\u003d\u0027,\u0027,\n\u0027connector.t [...] "user": "anonymous", - "dateUpdated": "2020-04-27 11:39:50.826", + "dateUpdated": "2020-05-05 23:01:32.335", "config": { "runOnSelectionChange": true, "title": true, @@ -213,15 +213,15 @@ "jobName": "paragraph_1578045204379_-1427374232", "id": "paragraph_1578045204379_-1427374232", "dateCreated": "2020-01-03 17:53:24.379", - "dateStarted": "2020-04-27 11:38:37.856", - "dateFinished": "2020-04-27 11:38:38.118", + "dateStarted": "2020-05-05 23:01:41.334", + "dateFinished": "2020-05-05 23:01:41.675", "status": "FINISHED" }, { "title": "Show tables", "text": "%flink.bsql\n\nshow tables", "user": "anonymous", - "dateUpdated": "2020-04-27 11:40:15.544", + "dateUpdated": "2020-05-05 22:52:11.086", "config": { "colWidth": 12.0, "fontSize": 9.0, @@ -282,15 +282,15 @@ "jobName": "paragraph_1587958743728_1444404682", "id": "paragraph_1587958743728_1444404682", "dateCreated": "2020-04-27 11:39:03.728", - "dateStarted": "2020-04-27 11:39:06.222", - "dateFinished": "2020-04-27 11:39:06.500", + "dateStarted": "2020-05-05 22:52:11.091", + "dateFinished": "2020-05-05 22:52:11.283", "status": "FINISHED" }, { "title": "Define UDTF ParseFunction to parse the raw data", "text": "%flink\n\nimport org.apache.flink.api.java.typeutils.RowTypeInfo\nimport org.apache.flink.api.common.typeinfo.Types\nimport org.apache.flink.api.java.typeutils._\nimport org.apache.flink.api.scala.typeutils._\nimport org.apache.flink.api.scala._\n\nclass Person(val age:Int, val job: String, val marital: String, val education: String, val default: String, val balance: String, val housing: String, val loan: String, val contact: String, val day: String, val month: String, val [...] "user": "anonymous", - "dateUpdated": "2020-04-27 11:38:03.810", + "dateUpdated": "2020-05-05 23:01:45.688", "config": { "runOnSelectionChange": true, "title": true, @@ -325,15 +325,15 @@ "jobName": "paragraph_1578888628353_1621411444", "id": "paragraph_1578888628353_1621411444", "dateCreated": "2020-01-13 12:10:28.359", - "dateStarted": "2020-04-27 11:38:03.814", - "dateFinished": "2020-04-27 11:38:06.223", + "dateStarted": "2020-05-05 23:01:45.693", + "dateFinished": "2020-05-05 23:01:48.276", "status": "FINISHED" }, { "title": "Clean data", "text": "%sh\n\nrm -rf /tmp/bank_cleaned\n#hadoop fs -rmr /tmp/bank_cleaned", "user": "anonymous", - "dateUpdated": "2020-04-27 11:38:44.643", + "dateUpdated": "2020-05-05 22:52:42.774", "config": { "runOnSelectionChange": true, "title": true, @@ -363,15 +363,15 @@ "jobName": "paragraph_1579061020460_-113987164", "id": "paragraph_1579061020460_-113987164", "dateCreated": "2020-01-15 12:03:40.468", - "dateStarted": "2020-04-27 11:38:44.647", - "dateFinished": "2020-04-27 11:38:44.672", + "dateStarted": "2020-05-05 22:52:42.780", + "dateFinished": "2020-05-05 22:52:42.836", "status": "FINISHED" }, { "title": "Parse the data and write it into sink table", "text": "%flink.bsql\n\ninsert into bank select T.* from bank_raw, LATERAL TABLE(parse(content)) as T(age, job, marital, education, `default`, balance, housing, loan, contact, `day`, `month`, duration, campaign, pdays, previous, poutcome, y) ", "user": "anonymous", - "dateUpdated": "2020-04-27 11:38:47.002", + "dateUpdated": "2020-05-05 22:53:04.445", "config": { "runOnSelectionChange": true, "title": true, @@ -406,15 +406,15 @@ "jobName": "paragraph_1578669828368_-1923137601", "id": "paragraph_1578669828368_-1923137601", "dateCreated": "2020-01-10 23:23:48.368", - "dateStarted": "2020-04-27 11:38:47.006", - "dateFinished": "2020-04-27 11:38:50.488", + "dateStarted": "2020-05-05 22:53:04.450", + "dateFinished": "2020-05-05 22:53:05.856", "status": "FINISHED" }, { "title": "Preview output data", "text": "%flink.bsql\n\nselect * from bank limit 10\n", "user": "anonymous", - "dateUpdated": "2020-04-27 11:40:23.766", + "dateUpdated": "2020-05-05 23:01:51.464", "config": { "colWidth": 12.0, "fontSize": 9.0, @@ -493,37 +493,199 @@ "jobName": "paragraph_1578068480238_-1678045273", "id": "paragraph_1578068480238_-1678045273", "dateCreated": "2020-01-04 00:21:20.267", - "dateStarted": "2020-04-27 11:40:23.770", - "dateFinished": "2020-04-27 11:40:24.710", + "dateStarted": "2020-05-05 23:01:51.469", + "dateFinished": "2020-05-05 23:01:55.009", "status": "FINISHED" }, { - "text": "%flink.bsql\n", + "title": "Display table via z.show", + "text": "%flink\n\nval table \u003d btenv.sqlQuery(\"select * from bank limit 10\")\nz.show(table)", "user": "anonymous", - "dateUpdated": "2020-02-02 16:41:35.121", + "dateUpdated": "2020-05-05 23:33:44.788", "config": { "colWidth": 12.0, "fontSize": 9.0, "enabled": true, - "results": {}, + "results": { + "1": { + "graph": { + "mode": "table", + "height": 300.0, + "optionOpen": false, + "setting": { + "table": { + "tableGridState": {}, + "tableColumnTypeState": { + "names": { + "age": "string", + "job": "string", + "marital": "string", + "education": "string", + "default": "string", + "balance": "string", + "housing": "string", + "loan": "string", + "contact": "string", + "day": "string", + "month": "string", + "duration": "string", + "campaign": "string", + "pdays": "string", + "previous": "string", + "poutcome": "string", + "y": "string" + }, + "updated": false + }, + "tableOptionSpecHash": "[{\"name\":\"useFilter\",\"valueType\":\"boolean\",\"defaultValue\":false,\"widget\":\"checkbox\",\"description\":\"Enable filter for columns\"},{\"name\":\"showPagination\",\"valueType\":\"boolean\",\"defaultValue\":false,\"widget\":\"checkbox\",\"description\":\"Enable pagination for better navigation\"},{\"name\":\"showAggregationFooter\",\"valueType\":\"boolean\",\"defaultValue\":false,\"widget\":\"checkbox\",\"description\":\"Enable a footer [...] + "tableOptionValue": { + "useFilter": false, + "showPagination": false, + "showAggregationFooter": false + }, + "updated": false, + "initialized": false + } + }, + "commonSetting": {} + } + } + }, "editorSetting": { - "language": "sql", + "language": "scala", "editOnDblClick": false, "completionKey": "TAB", "completionSupport": true }, - "editorMode": "ace/mode/sql" + "editorMode": "ace/mode/scala", + "title": true }, "settings": { "params": {}, "forms": {} }, + "results": { + "code": "SUCCESS", + "msg": [ + { + "type": "TEXT", + "data": "\u001b[1m\u001b[34mtable\u001b[0m: \u001b[1m\u001b[32morg.apache.flink.table.api.Table\u001b[0m \u003d UnnamedTable$3\n" + }, + { + "type": "TABLE", + "data": "age\tjob\tmarital\teducation\tdefault\tbalance\thousing\tloan\tcontact\tday\tmonth\tduration\tcampaign\tpdays\tprevious\tpoutcome\ty\n30\tunemployed\tmarried\tprimary\tno\t1787\tno\tno\tcellular\t19\toct\t79\t1\t-1\t0\tunknown\tno\n33\tservices\tmarried\tsecondary\tno\t4789\tyes\tyes\tcellular\t11\tmay\t220\t1\t339\t4\tfailure\tno\n35\tmanagement\tsingle\ttertiary\tno\t1350\tyes\tno\tcellular\t16\tapr\t185\t1\t330\t1\tfailure\tno\n30\tmanagement\tmarried\ttertiary\tn [...] + } + ] + }, "apps": [], "progressUpdateIntervalMs": 500, "jobName": "paragraph_1579061037737_-1577558456", "id": "paragraph_1579061037737_-1577558456", "dateCreated": "2020-01-15 12:03:57.737", + "dateStarted": "2020-05-05 23:03:09.350", + "dateFinished": "2020-05-05 23:03:11.061", + "status": "FINISHED" + }, + { + "title": "Display table via z.show in PyFlink", + "text": "%flink.pyflink\n\ntable \u003d bt_env.sql_query(\"select * from bank limit 10\")\nz.show(table)", + "user": "anonymous", + "dateUpdated": "2020-05-05 23:37:48.878", + "config": { + "colWidth": 12.0, + "fontSize": 9.0, + "enabled": true, + "results": { + "0": { + "graph": { + "mode": "table", + "height": 300.0, + "optionOpen": false, + "setting": { + "table": { + "tableGridState": {}, + "tableColumnTypeState": { + "names": { + "age": "string", + "job": "string", + "marital": "string", + "education": "string", + "default": "string", + "balance": "string", + "housing": "string", + "loan": "string", + "contact": "string", + "day": "string", + "month": "string", + "duration": "string", + "campaign": "string", + "pdays": "string", + "previous": "string", + "poutcome": "string", + "y": "string" + }, + "updated": false + }, + "tableOptionSpecHash": "[{\"name\":\"useFilter\",\"valueType\":\"boolean\",\"defaultValue\":false,\"widget\":\"checkbox\",\"description\":\"Enable filter for columns\"},{\"name\":\"showPagination\",\"valueType\":\"boolean\",\"defaultValue\":false,\"widget\":\"checkbox\",\"description\":\"Enable pagination for better navigation\"},{\"name\":\"showAggregationFooter\",\"valueType\":\"boolean\",\"defaultValue\":false,\"widget\":\"checkbox\",\"description\":\"Enable a footer [...] + "tableOptionValue": { + "useFilter": false, + "showPagination": false, + "showAggregationFooter": false + }, + "updated": false, + "initialized": false + } + }, + "commonSetting": {} + } + } + }, + "editorSetting": { + "language": "python", + "editOnDblClick": false, + "completionKey": "TAB", + "completionSupport": true + }, + "editorMode": "ace/mode/python", + "title": true + }, + "settings": { + "params": {}, + "forms": {} + }, + "results": { + "code": "SUCCESS", + "msg": [ + { + "type": "TABLE", + "data": "age\tjob\tmarital\teducation\tdefault\tbalance\thousing\tloan\tcontact\tday\tmonth\tduration\tcampaign\tpdays\tprevious\tpoutcome\ty\n30\tunemployed\tmarried\tprimary\tno\t1787\tno\tno\tcellular\t19\toct\t79\t1\t-1\t0\tunknown\tno\n33\tservices\tmarried\tsecondary\tno\t4789\tyes\tyes\tcellular\t11\tmay\t220\t1\t339\t4\tfailure\tno\n35\tmanagement\tsingle\ttertiary\tno\t1350\tyes\tno\tcellular\t16\tapr\t185\t1\t330\t1\tfailure\tno\n30\tmanagement\tmarried\ttertiary\tn [...] + } + ] + }, + "apps": [], + "progressUpdateIntervalMs": 500, + "jobName": "paragraph_1588690392097_1159956807", + "id": "paragraph_1588690392097_1159956807", + "dateCreated": "2020-05-05 22:53:12.097", + "dateStarted": "2020-05-05 23:37:07.989", + "dateFinished": "2020-05-05 23:37:15.984", "status": "FINISHED" + }, + { + "text": "%flink.pyflink\n", + "user": "anonymous", + "dateUpdated": "2020-05-05 23:37:07.989", + "config": {}, + "settings": { + "params": {}, + "forms": {} + }, + "apps": [], + "progressUpdateIntervalMs": 500, + "jobName": "paragraph_1588693027989_1331448600", + "id": "paragraph_1588693027989_1331448600", + "dateCreated": "2020-05-05 23:37:07.989", + "status": "READY" } ], "name": "6. Batch ETL",