This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
commit 2a0141d0604627ff9f55879ff911ac2d74742602 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Wed Mar 18 16:11:40 2020 +0800 [ZEPPELIN-4687]. Allow to run multiple sql as one flink job ### What is this PR for? This PR is to allow user to run multiple sql as one flink job in one paragraph. User can just specify paragraph local properties `runAsOne` to be `true` to enable this feature. ### What type of PR is it? [Improvement] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4687 ### How should this be tested? * CI pass ### Screenshots (if appropriate)   ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3692 from zjffdu/ZEPPELIN-4687 and squashes the following commits: f66b8793e [Jeff Zhang] [ZEPPELIN-4687]. Allow to run multiple sql as one job (cherry picked from commit 3c9322cd48df805b70ef80f4212f57efaf95689b) --- .../apache/zeppelin/flink/FlinkSqlInterrpeter.java | 31 +++++++- .../zeppelin/flink/sql/AppendStreamSqlJob.java | 4 +- .../zeppelin/flink/sql/SingleRowStreamSqlJob.java | 5 +- .../zeppelin/flink/sql/UpdateStreamSqlJob.java | 6 +- .../flink/FlinkBatchSqlInterpreterTest.java | 86 +++++++++++++++++++++- .../flink/FlinkStreamSqlInterpreterTest.java | 44 +++++++++++ 6 files changed, 166 insertions(+), 10 deletions(-) diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java index 82ac50e..c85dbf4 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java @@ -202,6 +202,30 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { } } + boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false")); + if (runAsOne) { + try { + lock.lock(); + if (context.getLocalProperties().containsKey("parallelism")) { + this.tbenv.getConfig().getConfiguration() + .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, + Integer.parseInt(context.getLocalProperties().get("parallelism"))); + } + this.tbenv.execute(st); + context.out.write("Insertion successfully.\n"); + } catch (Exception e) { + LOGGER.error("Fail to execute sql as one job", e); + return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e)); + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + this.tbenv.getConfig().getConfiguration() + .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, + defaultSqlParallelism); + } + } + return new InterpreterResult(InterpreterResult.Code.SUCCESS); } @@ -498,7 +522,11 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { } this.tbenv.sqlUpdate(sql); - this.tbenv.execute(sql); + boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false")); + if (!runAsOne) { + this.tbenv.execute(sql); + context.out.write("Insertion successfully.\n"); + } } catch (Exception e) { throw new IOException(e); } finally { @@ -519,7 +547,6 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { } this.tbenv.getConfig().getConfiguration().addAll(flinkInterpreter.getFlinkConfiguration()); } - context.out.write("Insertion successfully.\n"); } private static AttributedString formatCommand(SqlCommand cmd, String description) { diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java b/flink/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java index 2b6add4..25b2064 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java @@ -90,8 +90,8 @@ public class AppendStreamSqlJob extends AbstractStreamSqlJob { // sort it by the first column materializedTable.sort((r1, r2) -> { - String f1 = r1.getField(0).toString(); - String f2 = r2.getField(0).toString(); + String f1 = TableDataUtils.normalizeColumn(StringUtils.arrayAwareToString(r1.getField(0))); + String f2 = TableDataUtils.normalizeColumn(StringUtils.arrayAwareToString(r2.getField(0))); return f1.compareTo(f2); }); diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java b/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java index b5d7824..8dd2bec 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java @@ -22,8 +22,10 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.scala.StreamTableEnvironment; import org.apache.flink.types.Row; +import org.apache.flink.util.StringUtils; import org.apache.zeppelin.flink.JobManager; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.tabledata.TableDataUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,7 +67,8 @@ public class SingleRowStreamSqlJob extends AbstractStreamSqlJob { builder.append("%html\n"); String outputText = template; for (int i = 0; i < latestRow.getArity(); ++i) { - outputText = outputText.replace("{" + i + "}", latestRow.getField(i).toString()); + outputText = outputText.replace("{" + i + "}", + TableDataUtils.normalizeColumn(StringUtils.arrayAwareToString(latestRow.getField(i)))); } builder.append(outputText); return builder.toString(); diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java b/flink/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java index 65f3d86..bfcbce7 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java @@ -75,7 +75,7 @@ public class UpdateStreamSqlJob extends AbstractStreamSqlJob { StringBuilder builder = new StringBuilder(); builder.append("%table\n"); for (int i = 0; i < schema.getFieldCount(); ++i) { - String field = schema.getFieldName(i).get(); + String field = schema.getFieldNames()[i]; builder.append(field); if (i != (schema.getFieldCount() - 1)) { builder.append("\t"); @@ -84,8 +84,8 @@ public class UpdateStreamSqlJob extends AbstractStreamSqlJob { builder.append("\n"); // sort it by the first column materializedTable.sort((r1, r2) -> { - String f1 = r1.getField(0).toString(); - String f2 = r2.getField(0).toString(); + String f1 = TableDataUtils.normalizeColumn(StringUtils.arrayAwareToString(r1.getField(0))); + String f2 = TableDataUtils.normalizeColumn(StringUtils.arrayAwareToString(r2.getField(0))); return f1.compareTo(f2); }); for (Row row : materializedTable) { diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java index c75d7fe..651645b 100644 --- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java +++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java @@ -259,12 +259,12 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest { "'format.type'='csv'\n" + ");", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - + // set parallelism then insert into InterpreterContext context = getInterpreterContext(); result = sqlInterpreter.interpret( "set table.exec.resource.default-parallelism=10;" + - "insert into sink_table select * from source_table", context); + "insert into sink_table select * from source_table", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage(); assertEquals("Insertion successfully.\n", resultMessages.get(0).getData()); @@ -296,4 +296,86 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest { assertTrue(resultMessages.get(0).getData(), resultMessages.get(0).getData().contains("table.invalid_config is not a valid table/sql config")); } + + @Test + public void testMultipleInsertInto() throws InterpreterException, IOException { + hiveShell.execute("create table source_table (id int, name string)"); + hiveShell.execute("insert into source_table values(1, 'a'), (2, 'b')"); + + File destDir = Files.createTempDirectory("flink_test").toFile(); + FileUtils.deleteDirectory(destDir); + InterpreterResult result = sqlInterpreter.interpret( + "CREATE TABLE sink_table (\n" + + "id int,\n" + + "name string" + + ") WITH (\n" + + "'format.field-delimiter'=',',\n" + + "'connector.type'='filesystem',\n" + + "'format.derive-schema'='true',\n" + + "'connector.path'='" + destDir.getAbsolutePath() + "',\n" + + "'format.type'='csv'\n" + + ");", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + File destDir2 = Files.createTempDirectory("flink_test").toFile(); + FileUtils.deleteDirectory(destDir2); + result = sqlInterpreter.interpret( + "CREATE TABLE sink_table2 (\n" + + "id int,\n" + + "name string" + + ") WITH (\n" + + "'format.field-delimiter'=',',\n" + + "'connector.type'='filesystem',\n" + + "'format.derive-schema'='true',\n" + + "'connector.path'='" + destDir2.getAbsolutePath() + "',\n" + + "'format.type'='csv'\n" + + ");", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + // insert into + InterpreterContext context = getInterpreterContext(); + result = sqlInterpreter.interpret( + "insert into sink_table select * from source_table;insert into sink_table2 select * from source_table", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage(); + assertEquals("Insertion successfully.\nInsertion successfully.\n", resultMessages.get(0).getData()); + + // verify insert into via select from sink_table + context = getInterpreterContext(); + result = sqlInterpreter.interpret("select * from sink_table", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + resultMessages = context.out.toInterpreterResultMessage(); + assertEquals("id\tname\n1\ta\n2\tb\n", resultMessages.get(0).getData()); + + context = getInterpreterContext(); + result = sqlInterpreter.interpret("select * from sink_table2", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + resultMessages = context.out.toInterpreterResultMessage(); + assertEquals("id\tname\n1\ta\n2\tb\n", resultMessages.get(0).getData()); + + // insert into (runAsOne) + destDir.delete(); + destDir2.delete(); + + context = getInterpreterContext(); + context.getLocalProperties().put("runAsOne", "true"); + result = sqlInterpreter.interpret( + "insert into sink_table select * from source_table;insert into sink_table2 select * from source_table", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + resultMessages = context.out.toInterpreterResultMessage(); + assertEquals("Insertion successfully.\n", resultMessages.get(0).getData()); + + // verify insert into via select from sink_table + context = getInterpreterContext(); + result = sqlInterpreter.interpret("select * from sink_table", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + resultMessages = context.out.toInterpreterResultMessage(); + assertEquals("id\tname\n1\ta\n2\tb\n", resultMessages.get(0).getData()); + + context = getInterpreterContext(); + result = sqlInterpreter.interpret("select * from sink_table2", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + resultMessages = context.out.toInterpreterResultMessage(); + assertEquals("id\tname\n1\ta\n2\tb\n", resultMessages.get(0).getData()); + } } diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java index baa0f5e..c01dbff 100644 --- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java +++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java @@ -152,6 +152,50 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest { } @Test + public void testMultipleInsertInto() throws InterpreterException, IOException { + hiveShell.execute("create table source_table (id int, name string)"); + + File destDir = Files.createTempDirectory("flink_test").toFile(); + FileUtils.deleteDirectory(destDir); + InterpreterResult result = sqlInterpreter.interpret( + "CREATE TABLE dest_table (\n" + + "id int,\n" + + "name string" + + ") WITH (\n" + + "'format.field-delimiter'=',',\n" + + "'connector.type'='filesystem',\n" + + "'format.derive-schema'='true',\n" + + "'connector.path'='" + destDir.getAbsolutePath() + "',\n" + + "'format.type'='csv'\n" + + ");", getInterpreterContext()); + + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + File destDir2 = Files.createTempDirectory("flink_test").toFile(); + FileUtils.deleteDirectory(destDir2); + result = sqlInterpreter.interpret( + "CREATE TABLE dest_table2 (\n" + + "id int,\n" + + "name string" + + ") WITH (\n" + + "'format.field-delimiter'=',',\n" + + "'connector.type'='filesystem',\n" + + "'format.derive-schema'='true',\n" + + "'connector.path'='" + destDir2.getAbsolutePath() + "',\n" + + "'format.type'='csv'\n" + + ");", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + InterpreterContext context = getInterpreterContext(); + context.getLocalProperties().put("runAsOne", "true"); + result = sqlInterpreter.interpret( + "insert into dest_table select * from source_table;insert into dest_table2 select * from source_table", + context); + + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + } + + @Test public void testCreateTableWithWaterMark() throws InterpreterException, IOException { // create table InterpreterContext context = getInterpreterContext();