This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 80a3856 [ZEPPELIN-4740]. Display streaming data in flink table api 80a3856 is described below commit 80a3856bc722f57e4603e276e88920633a15248f Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Thu Apr 9 17:20:05 2020 +0800 [ZEPPELIN-4740]. Display streaming data in flink table api ### What is this PR for? This PR is to allow user to display streaming data in flink table api just like displaying streaming data in stream sql (%flink.ssql). I implement it in both scala and pyflink. Here's one example in flink scala table api ``` val table = stenv.from("cdn_access_log") .select("uuid, ip_to_province(client_ip) as province, response_size, request_time") .groupBy("province") .select( "province, count(uuid) as access_count, sum(response_size) as total_download, sum(response_size) * 1.0 / sum(request_time) as download_speed") z.show(table, streamType="update") ``` ### What type of PR is it? [Feature ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4740 ### How should this be tested? Unit test is added and also verify it manually. ### Screenshots (if appropriate)  ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3729 from zjffdu/ZEPPELIN-4740 and squashes the following commits: bcd25978e [Jeff Zhang] [ZEPPELIN-4740]. Display streaming data in flink table api --- flink/pom.xml | 2 +- .../apache/zeppelin/flink/FlinkInterpreter.java | 4 +- .../java/org/apache/zeppelin/flink/JobManager.java | 6 ++ .../zeppelin/flink/sql/AbstractStreamSqlJob.java | 11 ++- .../src/main/resources/python/zeppelin_ipyflink.py | 5 +- .../src/main/resources/python/zeppelin_pyflink.py | 5 +- .../zeppelin/flink/FlinkScalaInterpreter.scala | 20 +++- .../zeppelin/flink/FlinkZeppelinContext.scala | 53 ++++++++-- .../zeppelin/flink/FlinkInterpreterTest.java | 2 +- .../flink/FlinkStreamSqlInterpreterTest.java | 59 ++++++++++- .../zeppelin/flink/IPyFlinkInterpreterTest.java | 110 +++++++++++++++++++-- .../zeppelin/flink/PyFlinkInterpreterTest.java | 25 ++++- .../apache/zeppelin/flink/SqlInterpreterTest.java | 6 +- 13 files changed, 265 insertions(+), 43 deletions(-) diff --git a/flink/pom.xml b/flink/pom.xml index 58665bb..d74c0d9 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -634,7 +634,7 @@ <!-- set sun.zip.disableMemoryMapping=true because of https://blogs.oracle.com/poonam/crashes-in-zipgetentry https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8191484 --> - <argLine>-Xmx3072m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true</argLine> + <argLine>-Xmx4096m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true</argLine> <environmentVariables> <FLINK_HOME>${project.build.directory}/flink-${flink.version}</FLINK_HOME> diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java index af4de3d..a9afc1d 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java @@ -110,7 +110,7 @@ public class FlinkInterpreter extends Interpreter { } StreamTableEnvironment getStreamTableEnvironment() { - return this.innerIntp.getStreamTableEnvironment(); + return this.innerIntp.getStreamTableEnvironment("blink"); } org.apache.flink.table.api.TableEnvironment getJavaBatchTableEnvironment(String planner) { @@ -122,7 +122,7 @@ public class FlinkInterpreter extends Interpreter { } TableEnvironment getBatchTableEnvironment() { - return this.innerIntp.getBatchTableEnvironment(); + return this.innerIntp.getBatchTableEnvironment("blink"); } JobManager getJobManager() { diff --git a/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java index 527b0f7..8e4e870 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java @@ -140,6 +140,12 @@ public class JobManager { jobProgressPoller.interrupt(); } + public void shutdown() { + for (FlinkJobProgressPoller jobProgressPoller : jobProgressPollerMap.values()) { + jobProgressPoller.cancel(); + } + } + class FlinkJobProgressPoller extends Thread { private String flinkWebUI; diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java b/flink/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java index 30c4f45..9469b89 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java @@ -96,11 +96,15 @@ public abstract class AbstractStreamSqlJob { protected abstract String getType(); public InterpreterResult run(String st) throws IOException { + Table table = stenv.sqlQuery(st); + String tableName = st + "_" + SQL_INDEX.getAndIncrement(); + return run(table, tableName); + } + + public InterpreterResult run(Table table, String tableName) throws IOException { try { int parallelism = Integer.parseInt(context.getLocalProperties() .getOrDefault("parallelism", defaultParallelism + "")); - - Table table = stenv.sqlQuery(st); this.schema = removeTimeAttributes(table.getSchema()); checkTableSchema(schema); @@ -132,7 +136,6 @@ public abstract class AbstractStreamSqlJob { try { stenv.useCatalog("default_catalog"); stenv.useDatabase("default_database"); - String tableName = st + "_" + SQL_INDEX.getAndIncrement(); stenv.registerTableSink(tableName, collectTableSink); table.insertInto(new StreamQueryConfig(), tableName); } finally { @@ -149,7 +152,7 @@ public abstract class AbstractStreamSqlJob { retrievalThread.start(); LOGGER.info("Run job without savePointPath, " + ", parallelism: " + parallelism); - stenv.execute(st); + stenv.execute(tableName); LOGGER.info("Flink Job is finished"); // wait for retrieve thread consume all data LOGGER.info("Waiting for retrieve thread to be done"); diff --git a/flink/src/main/resources/python/zeppelin_ipyflink.py b/flink/src/main/resources/python/zeppelin_ipyflink.py index 0c1f2e7..fe94c9f 100644 --- a/flink/src/main/resources/python/zeppelin_ipyflink.py +++ b/flink/src/main/resources/python/zeppelin_ipyflink.py @@ -60,7 +60,10 @@ class IPyFlinkZeppelinContext(PyZeppelinContext): def show(self, obj, **kwargs): from pyflink.table import Table if isinstance(obj, Table): - print(self.z.showData(obj._j_table)) + if 'stream_type' in kwargs: + self.z.show(obj._j_table, kwargs['stream_type'], kwargs) + else: + print(self.z.showData(obj._j_table)) else: super(IPyFlinkZeppelinContext, self).show(obj, **kwargs) diff --git a/flink/src/main/resources/python/zeppelin_pyflink.py b/flink/src/main/resources/python/zeppelin_pyflink.py index a2bdaa0..8a401b2 100644 --- a/flink/src/main/resources/python/zeppelin_pyflink.py +++ b/flink/src/main/resources/python/zeppelin_pyflink.py @@ -52,7 +52,10 @@ class PyFlinkZeppelinContext(PyZeppelinContext): def show(self, obj, **kwargs): from pyflink.table import Table if isinstance(obj, Table): - print(self.z.showData(obj._j_table)) + if 'stream_type' in kwargs: + self.z.show(obj._j_table, kwargs['stream_type'], kwargs) + else: + print(self.z.showData(obj._j_table)) else: super(PyFlinkZeppelinContext, self).show(obj, **kwargs) diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala index 32962da..b45637c 100644 --- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala +++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala @@ -360,7 +360,7 @@ class FlinkScalaInterpreter(val properties: Properties) { flinkILoop.interpret("import org.apache.flink.table.functions.AggregateFunction") flinkILoop.interpret("import org.apache.flink.table.functions.TableFunction") - this.z = new FlinkZeppelinContext(this.btenv, this.btenv_2, new InterpreterHookRegistry(), + this.z = new FlinkZeppelinContext(this, new InterpreterHookRegistry(), Integer.parseInt(properties.getProperty("zeppelin.flink.maxResult", "1000"))) val modifiers = new java.util.ArrayList[String]() modifiers.add("@transient") @@ -638,7 +638,6 @@ class FlinkScalaInterpreter(val properties: Properties) { LOGGER.info("Don't close the Remote FlinkCluster") } } - } else { LOGGER.info("Keep cluster alive when closing interpreter") } @@ -647,6 +646,9 @@ class FlinkScalaInterpreter(val properties: Properties) { flinkILoop.closeInterpreter() flinkILoop = null } + if (jobManager != null) { + jobManager.shutdown() + } } private def cleanupStagingDirInternal(appId: ApplicationId): Unit = { @@ -666,9 +668,19 @@ class FlinkScalaInterpreter(val properties: Properties) { def getStreamExecutionEnvironment(): StreamExecutionEnvironment = this.senv - def getBatchTableEnvironment(): TableEnvironment = this.btenv + def getBatchTableEnvironment(planner: String = "blink"): TableEnvironment = { + if (planner == "blink") + this.btenv + else + this.btenv_2 + } - def getStreamTableEnvironment(): StreamTableEnvironment = this.stenv + def getStreamTableEnvironment(planner: String = "blink"): StreamTableEnvironment = { + if (planner == "blink") + this.stenv + else + this.stenv_2 + } def getJavaBatchTableEnvironment(planner: String): TableEnvironment = { if (planner == "blink") { diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala index 4f71622..03be11b 100644 --- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala +++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala @@ -18,16 +18,21 @@ package org.apache.zeppelin.flink +import java.io.IOException +import java.util.concurrent.atomic.AtomicInteger + +import com.google.common.collect.Maps import org.apache.flink.api.scala.DataSet import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.internal.TableImpl import org.apache.flink.table.api.{Table, TableEnvironment, TableUtils} -import org.apache.flink.table.api.scala.BatchTableEnvironment +import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment} import org.apache.flink.types.Row import org.apache.flink.util.StringUtils import org.apache.zeppelin.annotation.ZeppelinApi import org.apache.zeppelin.display.AngularObjectWatcher import org.apache.zeppelin.display.ui.OptionInput.ParamOption +import org.apache.zeppelin.flink.sql.{AppendStreamSqlJob, SingleRowStreamSqlJob, UpdateStreamSqlJob} import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterHookRegistry, ResultMessages, ZeppelinContext} import org.apache.zeppelin.tabledata.TableDataUtils @@ -37,11 +42,11 @@ import scala.collection.{JavaConversions, Seq} /** * ZeppelinContext for Flink */ -class FlinkZeppelinContext(val btenv: TableEnvironment, - val btenv_2: TableEnvironment, +class FlinkZeppelinContext(val flinkInterpreter: FlinkScalaInterpreter, val hooks2: InterpreterHookRegistry, val maxResult2: Int) extends ZeppelinContext(hooks2, maxResult2) { + private val SQL_INDEX = new AtomicInteger(0) private var currentSql: String = _ private val interpreterClassMap = Map( @@ -98,10 +103,10 @@ class FlinkZeppelinContext(val btenv: TableEnvironment, override def showData(obj: Any, maxResult: Int): String = { if (obj.isInstanceOf[DataSet[_]]) { val ds = obj.asInstanceOf[DataSet[_]] - val env = btenv_2.asInstanceOf[BatchTableEnvironment] - val table = env.fromDataSet(ds) + val btenv = flinkInterpreter.getBatchTableEnvironment("flink").asInstanceOf[BatchTableEnvironment] + val table = btenv.fromDataSet(ds) val columnNames: Array[String] = table.getSchema.getFieldNames - val dsRows: DataSet[Row] = env.toDataSet[Row](table) + val dsRows: DataSet[Row] = btenv.toDataSet[Row](table) showTable(columnNames, dsRows.first(maxResult + 1).collect()) } else if (obj.isInstanceOf[Table]) { val rows = JavaConversions.asScalaBuffer(TableUtils.collectToList(obj.asInstanceOf[TableImpl])).toSeq @@ -114,7 +119,8 @@ class FlinkZeppelinContext(val btenv: TableEnvironment, def showFlinkTable(table: Table): String = { val columnNames: Array[String] = table.getSchema.getFieldNames - val dsRows: DataSet[Row] = btenv.asInstanceOf[BatchTableEnvironment].toDataSet[Row](table) + val dsRows: DataSet[Row] = flinkInterpreter.getJavaBatchTableEnvironment("flink") + .asInstanceOf[BatchTableEnvironment].toDataSet[Row](table) showTable(columnNames, dsRows.first(maxResult + 1).collect()) } @@ -124,6 +130,39 @@ class FlinkZeppelinContext(val btenv: TableEnvironment, showTable(columnNames, rows) } + def show(table: Table, streamType: String, configs: Map[String, String] = Map.empty): Unit = { + val stenv = flinkInterpreter.getStreamTableEnvironment() + val context = InterpreterContext.get() + configs.foreach(e => context.getLocalProperties.put(e._1, e._2)) + val tableName = context.getParagraphId.replace("-", "_") + "_" + SQL_INDEX.getAndIncrement() + if (streamType.equalsIgnoreCase("single")) { + val streamJob = new SingleRowStreamSqlJob(flinkInterpreter.getStreamExecutionEnvironment, + stenv, flinkInterpreter.getJobManager, context, flinkInterpreter.getDefaultParallelism) + streamJob.run(table, tableName) + } + else if (streamType.equalsIgnoreCase("append")) { + val streamJob = new AppendStreamSqlJob(flinkInterpreter.getStreamExecutionEnvironment, + stenv, flinkInterpreter.getJobManager, context, flinkInterpreter.getDefaultParallelism) + streamJob.run(table, tableName) + } + else if (streamType.equalsIgnoreCase("update")) { + val streamJob = new UpdateStreamSqlJob(flinkInterpreter.getStreamExecutionEnvironment, + stenv, flinkInterpreter.getJobManager, context, flinkInterpreter.getDefaultParallelism) + streamJob.run(table, tableName) + } + else throw new IOException("Unrecognized stream type: " + streamType) + } + + /** + * Called by python + * @param table + * @param streamType + * @param configs + */ + def show(table: Table, streamType: String, configs: java.util.Map[String, String]): Unit = { + show(table, streamType, JavaConversions.mapAsScalaMap(configs).toMap) + } + @ZeppelinApi def select(name: String, options: Seq[(Any, String)]): Any = select(name, null, options) diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java index 9af387d..6a20aca 100644 --- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java +++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java @@ -216,7 +216,7 @@ public class FlinkInterpreterTest { assertEquals(InterpreterResult.Code.SUCCESS, result.code()); context = getInterpreterContext(); result = interpreter.interpret("z.show(data)", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code()); List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage(); assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType()); assertEquals("_1\t_2\n1\tjeff\n2\tandy\n3\tjames\n", resultMessages.get(0).getData()); diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java index c01dbff..d920e7e 100644 --- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java +++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java @@ -19,10 +19,6 @@ package org.apache.zeppelin.flink; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; @@ -65,6 +61,23 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest { } @Test + public void testSingleStreamTableApi() throws IOException, InterpreterException { + String initStreamScalaScript = IOUtils.toString(getClass().getResource("/init_stream.scala")); + InterpreterContext context = getInterpreterContext(); + InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript, context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + context = getInterpreterContext(); + String code = "val table = stenv.sqlQuery(\"select max(rowtime), count(1) from log\")\nz.show(table,streamType=\"single\", configs = Map(\"template\" -> \"Total Count: {1} <br/> {0}\"))"; + result = flinkInterpreter.interpret(code, context); + assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code()); + List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage(); + assertEquals(InterpreterResult.Type.HTML, resultMessages.get(0).getType()); + assertTrue(resultMessages.toString(), + resultMessages.get(0).getData().contains("Total Count")); + } + + @Test public void testUpdateStreamSql() throws IOException, InterpreterException { String initStreamScalaScript = IOUtils.toString(getClass().getResource("/init_stream.scala")); InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript, @@ -83,6 +96,23 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest { } @Test + public void testUpdateStreamTableApi() throws IOException, InterpreterException { + String initStreamScalaScript = IOUtils.toString(getClass().getResource("/init_stream.scala")); + InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript, + getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + InterpreterContext context = getInterpreterContext(); + String code = "val table = stenv.sqlQuery(\"select url, count(1) as pv from log group by url\")\nz.show(table, streamType=\"update\")"; + result = flinkInterpreter.interpret(code, context); + assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code()); + List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage(); + assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType()); + assertTrue(resultMessages.toString(), + resultMessages.get(0).getData().contains("url\tpv\n")); + } + + @Test public void testAppendStreamSql() throws IOException, InterpreterException { String initStreamScalaScript = IOUtils.toString(getClass().getResource("/init_stream.scala")); InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript, @@ -102,6 +132,25 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest { } @Test + public void testAppendStreamTableApi() throws IOException, InterpreterException { + String initStreamScalaScript = IOUtils.toString(getClass().getResource("/init_stream.scala")); + InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript, + getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + InterpreterContext context = getInterpreterContext(); + String code = "val table = stenv.sqlQuery(\"select TUMBLE_START(rowtime, INTERVAL '5' SECOND) as " + + "start_time, url, count(1) as pv from log group by " + + "TUMBLE(rowtime, INTERVAL '5' SECOND), url\")\nz.show(table, streamType=\"append\")"; + result = flinkInterpreter.interpret(code, context); + assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code()); + List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage(); + assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType()); + assertTrue(resultMessages.toString(), + resultMessages.get(0).getData().contains("url\tpv\n")); + } + + @Test public void testStreamUDF() throws IOException, InterpreterException { String initStreamScalaScript = IOUtils.toString(getClass().getResource("/init_stream.scala")); InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript, @@ -118,7 +167,7 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest { context.getLocalProperties().put("type", "update"); result = sqlInterpreter.interpret("select myupper(url), count(1) as pv from " + "log group by url", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code()); // assertEquals(InterpreterResult.Type.TABLE, // updatedOutput.toInterpreterResultMessage().getType()); // assertTrue(updatedOutput.toInterpreterResultMessage().getData(), diff --git a/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java index eb678a2..1a604d3 100644 --- a/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java +++ b/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java @@ -19,6 +19,7 @@ package org.apache.zeppelin.flink; import com.google.common.io.Files; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -26,6 +27,7 @@ import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResultMessage; import org.apache.zeppelin.interpreter.LazyOpenInterpreter; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient; import org.apache.zeppelin.python.IPythonInterpreterTest; @@ -35,6 +37,7 @@ import org.junit.Test; import java.io.IOException; import java.util.ArrayList; +import java.util.List; import java.util.Properties; import static junit.framework.TestCase.assertTrue; @@ -95,7 +98,7 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest { } @Test - public void testBatchIPyFlink() throws InterpreterException { + public void testBatchIPyFlink() throws InterpreterException, IOException { testBatchPyFlink(interpreter, flinkScalaInterpreter); } @@ -104,8 +107,23 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest { testStreamPyFlink(interpreter, flinkScalaInterpreter); } - public static void testBatchPyFlink(Interpreter pyflinkInterpreter, Interpreter flinkScalaInterpreter) throws InterpreterException { - InterpreterContext context = createInterpreterContext(mock(RemoteInterpreterEventClient.class)); + @Test + public void testSingleStreamTableApi() throws InterpreterException, IOException { + testSingleStreamTableApi(interpreter, flinkScalaInterpreter); + } + + @Test + public void testUpdateStreamTableApi() throws InterpreterException, IOException { + testUpdateStreamTableApi(interpreter, flinkScalaInterpreter); + } + + @Test + public void testAppendStreamTableApi() throws InterpreterException, IOException { + testAppendStreamTableApi(interpreter, flinkScalaInterpreter); + } + + public static void testBatchPyFlink(Interpreter pyflinkInterpreter, Interpreter flinkScalaInterpreter) throws InterpreterException, IOException { + InterpreterContext context = createInterpreterContext(); InterpreterResult result = pyflinkInterpreter.interpret( "import tempfile\n" + "import os\n" + @@ -135,7 +153,7 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest { assertEquals(InterpreterResult.Code.SUCCESS, result.code()); // use group by - context = createInterpreterContext(mock(RemoteInterpreterEventClient.class)); + context = createInterpreterContext(); result = pyflinkInterpreter.interpret( "import tempfile\n" + "import os\n" + @@ -176,7 +194,7 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest { context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - context = createInterpreterContext(mock(RemoteInterpreterEventClient.class)); + context = createInterpreterContext(); result = pyflinkInterpreter.interpret( "import tempfile\n" + "import os\n" + @@ -204,6 +222,28 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest { "bt_env.execute(\"batch_job3\")" , context); assertEquals(result.toString(),InterpreterResult.Code.SUCCESS, result.code()); + + // z.show + context = createInterpreterContext(); + result = pyflinkInterpreter.interpret( + "import tempfile\n" + + "import os\n" + + "import shutil\n" + + "sink_path = tempfile.gettempdir() + '/streaming.csv'\n" + + "if os.path.exists(sink_path):\n" + + " if os.path.isfile(sink_path):\n" + + " os.remove(sink_path)\n" + + " else:\n" + + " shutil.rmtree(sink_path)\n" + + "b_env.set_parallelism(1)\n" + + "t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])\n" + + "z.show(t)" + , context); + assertEquals(result.toString(),InterpreterResult.Code.SUCCESS, result.code()); + List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage(); + assertEquals(new String(context.out.toByteArray()), 1, resultMessages.size()); + assertEquals(new String(context.out.toByteArray()), InterpreterResult.Type.TABLE, resultMessages.get(0).getType()); + assertEquals(new String(context.out.toByteArray()), "a\tb\tc\n1\thi\thello\n2\thi\thello\n", resultMessages.get(0).getData()); } @Override @@ -223,7 +263,7 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest { } public static void testStreamPyFlink(Interpreter interpreter, Interpreter flinkScalaInterpreter) throws InterpreterException, IOException { - InterpreterContext context = createInterpreterContext(mock(RemoteInterpreterEventClient.class)); + InterpreterContext context = createInterpreterContext(); InterpreterResult result = interpreter.interpret( "import tempfile\n" + "import os\n" + @@ -253,13 +293,65 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest { assertEquals(InterpreterResult.Code.SUCCESS, result.code()); } - private static InterpreterContext createInterpreterContext( - RemoteInterpreterEventClient mockRemoteEventClient) { + public static void testSingleStreamTableApi(Interpreter interpreter, + Interpreter flinkScalaInterpreter) throws IOException, InterpreterException { + String initStreamScalaScript = IOUtils.toString(IPyFlinkInterpreterTest.class.getResource("/init_stream.scala")); + InterpreterContext context = createInterpreterContext(); + InterpreterResult result = flinkScalaInterpreter.interpret(initStreamScalaScript, context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + context = createInterpreterContext(); + String code = "table = st_env.sql_query('select max(rowtime), count(1) from log')\nz.show(table,stream_type='single',template = 'Total Count: {1} <br/> {0}')"; + result = interpreter.interpret(code, context); + assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code()); + List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage(); + assertEquals(InterpreterResult.Type.HTML, resultMessages.get(0).getType()); + assertTrue(resultMessages.toString(), + resultMessages.get(0).getData().contains("Total Count")); + } + + public static void testUpdateStreamTableApi(Interpreter interpreter, + Interpreter flinkScalaInterpreter) throws IOException, InterpreterException { + String initStreamScalaScript = IOUtils.toString(IPyFlinkInterpreterTest.class.getResource("/init_stream.scala")); + InterpreterContext context = createInterpreterContext(); + InterpreterResult result = flinkScalaInterpreter.interpret(initStreamScalaScript, context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + context = createInterpreterContext(); + String code = "table = st_env.sql_query('select url, count(1) as pv from log group by url')\nz.show(table,stream_type='update')"; + result = interpreter.interpret(code, context); + assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code()); + List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage(); + assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType()); + assertTrue(resultMessages.toString(), + resultMessages.get(0).getData().contains("url\tpv\n")); + } + + public static void testAppendStreamTableApi(Interpreter interpreter, + Interpreter flinkScalaInterpreter) throws IOException, InterpreterException { + String initStreamScalaScript = IOUtils.toString(IPyFlinkInterpreterTest.class.getResource("/init_stream.scala")); + InterpreterContext context = createInterpreterContext(); + InterpreterResult result = flinkScalaInterpreter.interpret(initStreamScalaScript, context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + context = createInterpreterContext(); + String code = "table = st_env.sql_query(\"select TUMBLE_START(rowtime, INTERVAL '5' SECOND) as " + + "start_time, url, count(1) as pv from log group by " + + "TUMBLE(rowtime, INTERVAL '5' SECOND), url\")\nz.show(table,stream_type='append')"; + result = interpreter.interpret(code, context); + assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code()); + List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage(); + assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType()); + assertTrue(resultMessages.toString(), + resultMessages.get(0).getData().contains("url\tpv\n")); + } + + private static InterpreterContext createInterpreterContext() { return InterpreterContext.builder() .setNoteId("noteId") .setParagraphId("paragraphId") - .setIntpEventClient(mockRemoteEventClient) .setInterpreterOut(new InterpreterOutput(null)) + .setIntpEventClient(mock(RemoteInterpreterEventClient.class)) .build(); } diff --git a/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java index 7bbc1dd..d6a24a2 100644 --- a/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java +++ b/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java @@ -31,16 +31,12 @@ import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput; import org.apache.zeppelin.interpreter.LazyOpenInterpreter; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient; import org.apache.zeppelin.python.PythonInterpreterTest; -import org.junit.After; -import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.LinkedList; import java.util.Properties; -import static junit.framework.TestCase.assertTrue; -import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; @@ -109,11 +105,30 @@ public class PyFlinkInterpreterTest extends PythonInterpreterTest { } @Test - public void testPyFlink() throws InterpreterException, IOException { + public void testBatchPyFlink() throws InterpreterException, IOException { IPyFlinkInterpreterTest.testBatchPyFlink(interpreter, flinkScalaInterpreter); + } + + @Test + public void testStreamIPyFlink() throws InterpreterException, IOException { IPyFlinkInterpreterTest.testStreamPyFlink(interpreter, flinkScalaInterpreter); } + @Test + public void testSingleStreamTableApi() throws InterpreterException, IOException { + IPyFlinkInterpreterTest.testSingleStreamTableApi(interpreter, flinkScalaInterpreter); + } + + @Test + public void testUpdateStreamTableApi() throws InterpreterException, IOException { + IPyFlinkInterpreterTest.testUpdateStreamTableApi(interpreter, flinkScalaInterpreter); + } + + @Test + public void testAppendStreamTableApi() throws InterpreterException, IOException { + IPyFlinkInterpreterTest.testAppendStreamTableApi(interpreter, flinkScalaInterpreter); + } + protected InterpreterContext getInterpreterContext() { appendOutput = ""; InterpreterContext context = InterpreterContext.builder() diff --git a/flink/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java index 0dbcdc3..34dc05c 100644 --- a/flink/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java +++ b/flink/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java @@ -42,10 +42,8 @@ import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterOutput; -import org.apache.zeppelin.interpreter.InterpreterOutputListener; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResultMessage; -import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient; import org.junit.After; import org.junit.Before; @@ -62,7 +60,8 @@ import java.io.PrintWriter; import java.util.List; import java.util.Properties; -import static org.apache.zeppelin.interpreter.InterpreterResult.*; +import static org.apache.zeppelin.interpreter.InterpreterResult.Code; +import static org.apache.zeppelin.interpreter.InterpreterResult.Type; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -358,6 +357,7 @@ public abstract class SqlInterpreterTest { protected InterpreterContext getInterpreterContext() { return InterpreterContext.builder() + .setParagraphId("paragraphId") .setInterpreterOut(new InterpreterOutput(null)) .setAngularObjectRegistry(new AngularObjectRegistry("flink", null)) .setIntpEventClient(mock(RemoteInterpreterEventClient.class))