This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d4cea17a4ad [Enhancement] (Nereids) let insert statement support CTE (#36150) d4cea17a4ad is described below commit d4cea17a4ad6e69b06e7f8be3204bf835e4fe10c Author: Liu Zhenlong <49094455+dragonliu2...@users.noreply.github.com> AuthorDate: Thu Jun 13 15:44:32 2024 +0800 [Enhancement] (Nereids) let insert statement support CTE (#36150) Issue Number: close #35784 --- .../antlr4/org/apache/doris/nereids/DorisParser.g4 | 2 +- .../doris/nereids/parser/LogicalPlanBuilder.java | 13 ++-- .../trees/plans/commands/CreateTableCommand.java | 3 +- .../plans/commands/DeleteFromUsingCommand.java | 4 +- .../nereids/trees/plans/commands/LoadCommand.java | 2 +- .../trees/plans/commands/UpdateCommand.java | 4 +- .../plans/commands/UpdateMvByPartitionCommand.java | 2 +- .../commands/insert/InsertIntoTableCommand.java | 7 ++- .../insert/InsertOverwriteTableCommand.java | 11 +++- .../nereids_p0/insert_into_table/insert_cte.out | 32 ++++++++++ .../nereids_p0/insert_into_table/insert_cte.groovy | 70 ++++++++++++++++++++++ 11 files changed, 133 insertions(+), 17 deletions(-) diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 07c3870c9e8..31102bedf3c 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -68,7 +68,7 @@ statementBase | CREATE (EXTERNAL)? TABLE (IF NOT EXISTS)? name=multipartIdentifier LIKE existedTable=multipartIdentifier (WITH ROLLUP (rollupNames=identifierList)?)? #createTableLike - | explain? INSERT (INTO | OVERWRITE TABLE) + | explain? cte? INSERT (INTO | OVERWRITE TABLE) (tableName=multipartIdentifier | DORIS_INTERNAL_TABLE_ID LEFT_PAREN tableId=INTEGER_VALUE RIGHT_PAREN) partitionSpec? // partition define (WITH LABEL labelName=identifier)? cols=identifierList? // label and columns define diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 14995a3bb39..077a1a4d881 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -585,9 +585,13 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate(), DMLCommandType.INSERT, plan); + Optional<LogicalPlan> cte = Optional.empty(); + if (ctx.cte() != null) { + cte = Optional.ofNullable(withCte(plan, ctx.cte())); + } LogicalPlan command; if (isOverwrite) { - command = new InsertOverwriteTableCommand(sink, labelName); + command = new InsertOverwriteTableCommand(sink, labelName, cte); } else { if (ConnectContext.get() != null && ConnectContext.get().isTxnModel() && sink.child() instanceof LogicalInlineTable) { @@ -596,13 +600,10 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { // Now handle it as `insert into select`(a separate load job), should fix it as the legacy. command = new BatchInsertIntoTableCommand(sink); } else { - command = new InsertIntoTableCommand(sink, labelName, Optional.empty()); + command = new InsertIntoTableCommand(sink, labelName, Optional.empty(), cte); } } - if (ctx.explain() != null) { - return withExplain(command, ctx.explain()); - } - return command; + return withExplain(command, ctx.explain()); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java index e70ad87aa57..0594cfb6833 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java @@ -162,7 +162,8 @@ public class CreateTableCommand extends Command implements ForwardWithSync { ImmutableList.of(), ImmutableList.of(), ImmutableList.of(), query); try { if (!FeConstants.runningUnitTest) { - new InsertIntoTableCommand(query, Optional.empty(), Optional.empty()).run(ctx, executor); + new InsertIntoTableCommand(query, Optional.empty(), Optional.empty(), Optional.empty()).run( + ctx, executor); } if (ctx.getState().getStateType() == MysqlStateType.ERR) { handleFallbackFailedCtas(ctx); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java index 8512104073a..aa76ecd6baa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java @@ -77,8 +77,8 @@ public class DeleteFromUsingCommand extends Command implements ForwardWithSync, + ctx.getSessionVariable().printDebugModeVariables()); } // NOTE: delete from using command is executed as insert command, so txn insert can support it - new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), Optional.empty(), Optional.empty()).run(ctx, - executor); + new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), Optional.empty(), Optional.empty(), + Optional.empty()).run(ctx, executor); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java index 1963b2efc23..29c47e9bdc0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java @@ -134,7 +134,7 @@ public class LoadCommand extends Command implements ForwardWithSync { profile.getSummaryProfile().setQueryBeginTime(); if (sourceInfos.size() == 1) { plans = ImmutableList.of(new InsertIntoTableCommand(completeQueryPlan(ctx, sourceInfos.get(0)), - Optional.of(labelName), Optional.empty())); + Optional.of(labelName), Optional.empty(), Optional.empty())); } else { throw new AnalysisException("Multi insert into statements are unsupported."); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java index 3e656f82541..7dea2393ab1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java @@ -94,8 +94,8 @@ public class UpdateCommand extends Command implements ForwardWithSync, Explainab @Override public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { // NOTE: update command is executed as insert command, so txn insert can support it - new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), Optional.empty(), Optional.empty()).run(ctx, - executor); + new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), Optional.empty(), Optional.empty(), + Optional.empty()).run(ctx, executor); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java index 22cca77062f..a2b03e04f42 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java @@ -80,7 +80,7 @@ import java.util.stream.Collectors; */ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand { private UpdateMvByPartitionCommand(LogicalPlan logicalQuery) { - super(logicalQuery, Optional.empty()); + super(logicalQuery, Optional.empty(), Optional.empty()); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 58bebc7c26c..a0268f38a2b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -74,16 +74,18 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, */ private long jobId; private Optional<InsertCommandContext> insertCtx; + private final Optional<LogicalPlan> cte; /** * constructor */ public InsertIntoTableCommand(LogicalPlan logicalQuery, Optional<String> labelName, - Optional<InsertCommandContext> insertCtx) { + Optional<InsertCommandContext> insertCtx, Optional<LogicalPlan> cte) { super(PlanType.INSERT_INTO_TABLE_COMMAND); this.logicalQuery = Objects.requireNonNull(logicalQuery, "logicalQuery should not be null"); this.labelName = Objects.requireNonNull(labelName, "labelName should not be null"); this.insertCtx = insertCtx; + this.cte = cte; } public Optional<String> getLabelName() { @@ -141,6 +143,9 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, try { // 1. process inline table (default values, empty values) this.logicalQuery = (LogicalPlan) InsertUtils.normalizePlan(logicalQuery, targetTableIf); + if (cte.isPresent()) { + this.logicalQuery = ((LogicalPlan) cte.get().withChildren(logicalQuery)); + } LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery, ctx.getStatementContext()); NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java index 34d9c093718..49b8becd29f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java @@ -75,14 +75,17 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS private LogicalPlan logicalQuery; private Optional<String> labelName; + private final Optional<LogicalPlan> cte; /** * constructor */ - public InsertOverwriteTableCommand(LogicalPlan logicalQuery, Optional<String> labelName) { + public InsertOverwriteTableCommand(LogicalPlan logicalQuery, Optional<String> labelName, + Optional<LogicalPlan> cte) { super(PlanType.INSERT_INTO_TABLE_COMMAND); this.logicalQuery = Objects.requireNonNull(logicalQuery, "logicalQuery should not be null"); this.labelName = Objects.requireNonNull(labelName, "labelName should not be null"); + this.cte = cte; } public void setLabelName(Optional<String> labelName) { @@ -111,6 +114,10 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS + " But current table type is " + targetTableIf.getType()); } this.logicalQuery = (LogicalPlan) InsertUtils.normalizePlan(logicalQuery, targetTableIf); + if (cte.isPresent()) { + this.logicalQuery = (LogicalPlan) logicalQuery.withChildren(cte.get().withChildren( + this.logicalQuery.child(0))); + } LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery, ctx.getStatementContext()); NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext()); @@ -181,7 +188,7 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS private void runInsertCommand(LogicalPlan logicalQuery, InsertCommandContext insertCtx, ConnectContext ctx, StmtExecutor executor) throws Exception { InsertIntoTableCommand insertCommand = new InsertIntoTableCommand(logicalQuery, labelName, - Optional.of(insertCtx)); + Optional.of(insertCtx), Optional.empty()); insertCommand.run(ctx, executor); if (ctx.getState().getStateType() == MysqlStateType.ERR) { String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage()); diff --git a/regression-test/data/nereids_p0/insert_into_table/insert_cte.out b/regression-test/data/nereids_p0/insert_into_table/insert_cte.out new file mode 100644 index 00000000000..a9b50f5b2d4 --- /dev/null +++ b/regression-test/data/nereids_p0/insert_into_table/insert_cte.out @@ -0,0 +1,32 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select1 -- +4 + +-- !select2 -- +1 +2 +3 +4 + +-- !select3 -- +1 +2 +3 +4 +5 +6 + +-- !select4 -- +1 +2 +3 + +-- !select5 -- +1 +2 +3 +4 + +-- !select6 -- +1 +4 diff --git a/regression-test/suites/nereids_p0/insert_into_table/insert_cte.groovy b/regression-test/suites/nereids_p0/insert_into_table/insert_cte.groovy new file mode 100644 index 00000000000..76e2742396b --- /dev/null +++ b/regression-test/suites/nereids_p0/insert_into_table/insert_cte.groovy @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("insert_cte") { + def t1 = "insert_cte_tbl_1" + def t2 = "insert_cte_tbl_2" + def t3 = "insert_cte_tbl_3" + + sql " SET enable_nereids_planner = true; " + + sql """ DROP TABLE IF EXISTS $t1 """ + sql """ + create table $t1 ( + k int + ) engine = OLAP + unique key(k) + partition by list (k) + ( + partition p1 values in ("1", "2", "3"), + partition p2 values in ("4", "5", "6") + ) + distributed by hash(k) buckets 3 + properties("replication_num" = "1"); + """ + sql """ DROP TABLE IF EXISTS $t2 """ + sql """ + create table $t2 ( + k int + ) properties("replication_num" = "1"); + """ + + sql """ insert into $t2 values (1), (2), (3), (4), (5), (6); """ + + // test for InsertIntoTableCommand + // 1. insert into values + sql """ with cte1 as (select * from $t2 where k < 4) insert into $t1 values (4); """ + order_qt_select1 """ select * from $t1; """ + // 2. insert into select + sql """ with cte1 as (select * from $t2 where k < 4) insert into $t1 select * from cte1; """ + order_qt_select2 """ select * from $t1; """ + // 3. insert into partition select + sql """ with cte1 as (select * from $t2 where k >= 4) insert into $t1 partition(p2) select * from cte1; """ + order_qt_select3 """ select * from $t1; """ + + // test for InsertOverwriteTableCommand + // 1. insert overwrite table select + sql """ with cte1 as (select * from $t2 where k < 4) insert overwrite table $t1 select * from cte1; """ + order_qt_select4 """ select * from $t1; """ + // 2. insert overwrite table partition select + sql """ with cte1 as (select 4) insert overwrite table $t1 partition(p2) select * from cte1; """ + order_qt_select5 """ select * from $t1; """ + // 3. overwrite auto detect partition + sql """ with cte1 as (select 1) insert overwrite table $t1 partition(*) select * from cte1; """ + order_qt_select6 """ select * from $t1; """ + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org