This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 87b9425772 [Bug](materialized-view) fix where clause not analyzed after fe restart (#22268) 87b9425772 is described below commit 87b94257727056b890fc98689e0c2b6e14169f13 Author: Pxl <pxl...@qq.com> AuthorDate: Thu Jul 27 18:34:44 2023 +0800 [Bug](materialized-view) fix where clause not analyzed after fe restart (#22268) fix where clause not analyzed after fe restart --- .../java/org/apache/doris/analysis/Analyzer.java | 15 +++--- .../apache/doris/analysis/NativeInsertStmt.java | 2 +- .../doris/load/loadv2/LoadingTaskPlanner.java | 2 +- .../plans/commands/InsertIntoTableCommand.java | 3 +- .../org/apache/doris/planner/OlapTableSink.java | 12 +++-- .../apache/doris/planner/StreamLoadPlanner.java | 4 +- .../apache/doris/planner/OlapTableSinkTest.java | 8 +-- .../test_partial_update_schema_change.groovy | 61 +++++++++++++++++----- 8 files changed, 76 insertions(+), 31 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java index 1e44278eef..761aae1442 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java @@ -146,9 +146,6 @@ public class Analyzer { private String schemaWild; private String schemaTable; // table used in DESCRIBE Table - // True if the corresponding select block has a limit and/or offset clause. - private boolean hasLimitOffsetClause = false; - // Current depth of nested analyze() calls. Used for enforcing a // maximum expr-tree depth. Needs to be manually maintained by the user // of this Analyzer with incrementCallDepth() and decrementCallDepth(). @@ -657,6 +654,14 @@ public class Analyzer { } } + public void registerTupleDescriptor(TupleDescriptor desc) { + tupleByAlias.put(desc.getAlias(), desc); + for (SlotDescriptor slot : desc.getSlots()) { + String key = desc.getAlias() + "." + slot.getColumn().getName(); + slotRefMap.put(key, slot); + } + } + /** * Creates an returns an empty TupleDescriptor for the given table ref and registers * it against all its legal aliases. For tables refs with an explicit alias, only the @@ -1818,10 +1823,6 @@ public class Analyzer { return hasEmptySpjResultSet; } - public void setHasLimitOffsetClause(boolean hasLimitOffset) { - this.hasLimitOffsetClause = hasLimitOffset; - } - /** * Register all conjuncts in 'conjuncts' that make up the On-clause of the given * right-hand side of a join. Assigns each conjunct a unique id. If rhsRef is diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index 02b6e40f14..adf12e0ddf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -849,7 +849,7 @@ public class NativeInsertStmt extends InsertStmt { public void complete() throws UserException { if (!isExplain() && targetTable instanceof OlapTable) { - ((OlapTableSink) dataSink).complete(); + ((OlapTableSink) dataSink).complete(analyzer); // add table indexes to transaction state TransactionState txnState = Env.getCurrentGlobalTransactionMgr() .getTransactionState(db.getId(), transactionId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index 982eb813e6..fa86ab06b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -171,7 +171,7 @@ public class LoadingTaskPlanner { OlapTableSink olapTableSink = new OlapTableSink(table, destTupleDesc, partitionIds, Config.enable_single_replica_load); olapTableSink.init(loadId, txnId, dbId, timeoutS, sendBatchParallelism, false, strictMode); - olapTableSink.complete(); + olapTableSink.complete(analyzer); // 3. Plan fragment PlanFragment sinkFragment = new PlanFragment(new PlanFragmentId(0), scanNode, DataPartition.RANDOM); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java index ab373c7886..3155eab2e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.trees.plans.commands; +import org.apache.doris.analysis.Analyzer; import org.apache.doris.catalog.Env; import org.apache.doris.common.DdlException; import org.apache.doris.common.util.ProfileManager.ProfileType; @@ -123,7 +124,7 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, ctx.getExecTimeout(), ctx.getSessionVariable().getSendBatchParallelism(), false, false); - sink.complete(); + sink.complete(new Analyzer(Env.getCurrentEnv(), ctx)); TransactionState state = Env.getCurrentGlobalTransactionMgr().getTransactionState( physicalOlapTableSink.getDatabase().getId(), txn.getTxnId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index 0bc30f978a..b09ebcff7a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -17,6 +17,7 @@ package org.apache.doris.planner; +import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.TupleDescriptor; @@ -164,7 +165,7 @@ public class OlapTableSink extends DataSink { } // must called after tupleDescriptor is computed - public void complete() throws UserException { + public void complete(Analyzer analyzer) throws UserException { TOlapTableSink tSink = tDataSink.getOlapTableSink(); tSink.setTableId(dstTable.getId()); @@ -176,7 +177,7 @@ public class OlapTableSink extends DataSink { } tSink.setNumReplicas(numReplicas); tSink.setNeedGenRollup(dstTable.shouldLoadToNewRollup()); - tSink.setSchema(createSchema(tSink.getDbId(), dstTable)); + tSink.setSchema(createSchema(tSink.getDbId(), dstTable, analyzer)); tSink.setPartition(createPartition(tSink.getDbId(), dstTable)); List<TOlapTableLocationParam> locationParams = createLocation(dstTable); tSink.setLocation(locationParams.get(0)); @@ -214,7 +215,7 @@ public class OlapTableSink extends DataSink { return tDataSink; } - private TOlapTableSchemaParam createSchema(long dbId, OlapTable table) { + private TOlapTableSchemaParam createSchema(long dbId, OlapTable table, Analyzer analyzer) throws AnalysisException { TOlapTableSchemaParam schemaParam = new TOlapTableSchemaParam(); schemaParam.setDbId(dbId); schemaParam.setTableId(table.getId()); @@ -253,6 +254,11 @@ public class OlapTableSink extends DataSink { if (indexMeta.getWhereClause() != null) { Expr expr = indexMeta.getWhereClause().clone(); expr.replaceSlot(tupleDescriptor); + if (analyzer != null) { + tupleDescriptor.setTable(table); + analyzer.registerTupleDescriptor(tupleDescriptor); + expr.analyze(analyzer); + } indexSchema.setWhereClause(expr.treeToThrift()); } indexSchema.setColumnsDesc(columnsDesc); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 40d5b99f34..ac5d6c595c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -266,7 +266,7 @@ public class StreamLoadPlanner { olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout, taskInfo.getSendBatchParallelism(), taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode()); olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns); - olapTableSink.complete(); + olapTableSink.complete(analyzer); // for stream load, we only need one fragment, ScanNode -> DataSink. // OlapTableSink can dispatch data to corresponding node. @@ -476,7 +476,7 @@ public class StreamLoadPlanner { olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout, taskInfo.getSendBatchParallelism(), taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode()); olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns); - olapTableSink.complete(); + olapTableSink.complete(analyzer); // for stream load, we only need one fragment, ScanNode -> DataSink. // OlapTableSink can dispatch data to corresponding node. diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java index c6da7304d5..dc98026a00 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java @@ -108,7 +108,7 @@ public class OlapTableSinkTest { dstTable.getPartitionInfo().setIsMutable(partition.getId(), true); OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(2L), false); sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false); - sink.complete(); + sink.complete(null); LOG.info("sink is {}", sink.toThrift()); LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL)); } @@ -146,7 +146,7 @@ public class OlapTableSinkTest { OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(p1.getId()), false); sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false); try { - sink.complete(); + sink.complete(null); } catch (UserException e) { // CHECKSTYLE IGNORE THIS LINE } @@ -170,7 +170,7 @@ public class OlapTableSinkTest { OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(unknownPartId), false); sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false); - sink.complete(); + sink.complete(null); LOG.info("sink is {}", sink.toThrift()); LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL)); } @@ -208,7 +208,7 @@ public class OlapTableSinkTest { OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(p1.getId()), false); sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false); try { - sink.complete(); + sink.complete(null); } catch (UserException e) { // CHECKSTYLE IGNORE THIS LINE } diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy index 905c622159..9883f19dcd 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy @@ -64,12 +64,15 @@ suite("test_partial_update_schema_change", "p0") { // schema change sql " ALTER table ${tableName} add column c10 INT DEFAULT '0' " + def try_times=100 while(true){ def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1000) if(res[0][9].toString() == "FINISHED"){ break; } - Thread.sleep(500) + assert(try_times>0) + try_times-- } // test load data without new column @@ -174,12 +177,15 @@ suite("test_partial_update_schema_change", "p0") { // schema change sql " ALTER table ${tableName} DROP COLUMN c8 " + try_times=100 while(true){ def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1000) if(res[0][9].toString() == "FINISHED"){ break; } - Thread.sleep(500) + assert(try_times>0) + try_times-- } // test load data without delete column @@ -283,12 +289,15 @@ suite("test_partial_update_schema_change", "p0") { // schema change sql " ALTER table ${tableName} MODIFY COLUMN c2 double " + try_times=100 while(true){ def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1000) if(res[0][9].toString() == "FINISHED"){ break; } - Thread.sleep(500) + assert(try_times>0) + try_times-- } // test load data with update column @@ -358,20 +367,27 @@ suite("test_partial_update_schema_change", "p0") { // schema change sql " ALTER table ${tableName} ADD COLUMN c1 int key null " + try_times=100 while(true){ def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1000) if(res[0][9].toString() == "FINISHED"){ break; } - Thread.sleep(500) + assert(try_times>0) + try_times-- } + sql " ALTER table ${tableName} ADD COLUMN c2 int null " + try_times=100 while(true){ def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1000) if(res[0][9].toString() == "FINISHED"){ break; } - Thread.sleep(500) + assert(try_times>0) + try_times-- } // test load data with all key column @@ -451,12 +467,15 @@ suite("test_partial_update_schema_change", "p0") { qt_sql10 " select * from ${tableName} order by c0 " sql " CREATE INDEX test ON ${tableName} (c1) USING BITMAP " + try_times=100 while(true){ def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1000) if(res[0][9].toString() == "FINISHED"){ break; } - Thread.sleep(500) + assert(try_times>0) + try_times-- } //test load data with create index @@ -605,12 +624,15 @@ suite("test_partial_update_schema_change", "p0") { // schema change sql " ALTER table ${tableName} add column c10 INT DEFAULT '0' " + try_times=100 while(true){ def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1000) if(res[0][9].toString() == "FINISHED"){ break; } - Thread.sleep(500) + assert(try_times>0) + try_times-- } // test load data without new column @@ -714,12 +736,15 @@ suite("test_partial_update_schema_change", "p0") { // schema change sql " ALTER table ${tableName} DROP COLUMN c8 " + try_times=100 while(true){ def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1000) if(res[0][9].toString() == "FINISHED"){ break; } - Thread.sleep(500) + assert(try_times>0) + try_times-- } // test load data without delete column @@ -822,12 +847,15 @@ suite("test_partial_update_schema_change", "p0") { // schema change sql " ALTER table ${tableName} MODIFY COLUMN c2 double " + try_times=100 while(true){ def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1000) if(res[0][9].toString() == "FINISHED"){ break; } - Thread.sleep(500) + assert(try_times>0) + try_times-- } // test load data with update column @@ -896,20 +924,26 @@ suite("test_partial_update_schema_change", "p0") { // schema change sql " ALTER table ${tableName} ADD COLUMN c1 int key null " + try_times=100 while(true){ def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1000) if(res[0][9].toString() == "FINISHED"){ break; } - Thread.sleep(500) + assert(try_times>0) + try_times-- } sql " ALTER table ${tableName} ADD COLUMN c2 int null " + try_times=100 while(true){ def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1000) if(res[0][9].toString() == "FINISHED"){ break; } - Thread.sleep(500) + assert(try_times>0) + try_times-- } // test load data with all key column @@ -987,12 +1021,15 @@ suite("test_partial_update_schema_change", "p0") { qt_sql23 " select * from ${tableName} order by c0 " sql " CREATE INDEX test ON ${tableName} (c1) USING BITMAP " + try_times=100 while(true){ def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1000) if(res[0][9].toString() == "FINISHED"){ break; } - Thread.sleep(500) + assert(try_times>0) + try_times-- } //test load data with create index --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org