This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 7aec6ffb6ae [fix](mtmv) Fix get mv read lock too late when rewritten by materialized view (#44164) 7aec6ffb6ae is described below commit 7aec6ffb6aef6b41f345cfeaf5fbdd78d19bc5b9 Author: seawinde <w...@selectdb.com> AuthorDate: Tue Nov 19 16:13:40 2024 +0800 [fix](mtmv) Fix get mv read lock too late when rewritten by materialized view (#44164) Problem Summary: When materialized view is rewritten, it would use the mv metadata. Should try to get read lock before use these metadata. or it would cause error. Such as mv def is as following CREATE MATERIALIZED VIEW mv1 BUILD IMMEDIATE REFRESH COMPLETE ON MANUAL DISTRIBUTED BY RANDOM BUCKETS 2 PROPERTIES ('replication_num' = '1') AS select o_orderdate, o_shippriority, o_comment, o.o_code as o_o_code, l_orderkey, l_partkey, l.o_code as l_o_code from orders_same_col o left join lineitem_same_col l on l_orderkey = o_orderkey left join partsupp on ps_partkey = l_partkey and l_suppkey = ps_suppkey; When handling transparent rewriting, a MV scan plan is used for the transparent rewrite. During the initialization of the scan plan, the partitions of the table are retrieved, so it is necessary to attempt to acquire a read lock on the table during initialization. If the read lock is not acquired, subsequent operations may add or delete partitions, and in the later processing of table partitions, calling get Partition may not retrieve the corresponding partition, leading to data errors. --- .../mv/AbstractMaterializedViewRule.java | 12 ++---- .../mv/AsyncMaterializationContext.java | 14 ++++--- .../exploration/mv/MaterializationContext.java | 48 +++++++++++---------- .../exploration/mv/SyncMaterializationContext.java | 25 +++++++---- .../doris/nereids/mv/IdStatisticsMapTest.java | 2 +- .../mv/join/left_outer/outer_join.out | 46 ++++++++++++++++++++ .../mv/join/left_outer/outer_join.groovy | 49 ++++++++++++++++++++++ 7 files changed, 150 insertions(+), 46 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java index 60b5c58d4c5..8e9ef1eaa97 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java @@ -234,7 +234,7 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac continue; } Plan rewrittenPlan; - Plan mvScan = materializationContext.getScanPlan(queryStructInfo); + Plan mvScan = materializationContext.getScanPlan(queryStructInfo, cascadesContext); Plan queryPlan = queryStructInfo.getTopPlan(); if (compensatePredicates.isAlwaysTrue()) { rewrittenPlan = mvScan; @@ -262,12 +262,6 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac // Rewrite query by view rewrittenPlan = rewriteQueryByView(matchMode, queryStructInfo, viewStructInfo, viewToQuerySlotMapping, rewrittenPlan, materializationContext, cascadesContext); - // If rewrite successfully, try to get mv read lock to avoid data inconsistent, - // try to get lock which should added before RBO - if (materializationContext instanceof AsyncMaterializationContext && !materializationContext.isSuccess()) { - cascadesContext.getStatementContext() - .addTableReadLock(((AsyncMaterializationContext) materializationContext).getMtmv()); - } rewrittenPlan = MaterializedViewUtils.rewriteByRules(cascadesContext, childContext -> { Rewriter.getWholeTreeRewriter(childContext).execute(); @@ -379,9 +373,9 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac } trySetStatistics(materializationContext, cascadesContext); rewriteResults.add(rewrittenPlan); - // if rewrite successfully, try to regenerate mv scan because it maybe used again - materializationContext.tryReGenerateScanPlan(cascadesContext); recordIfRewritten(queryStructInfo.getOriginalPlan(), materializationContext, cascadesContext); + // If rewrite successfully, try to clear mv scan currently because it maybe used again + materializationContext.clearScanPlan(cascadesContext); } return rewriteResults; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AsyncMaterializationContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AsyncMaterializationContext.java index 0d88672fed6..96d37ad546a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AsyncMaterializationContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AsyncMaterializationContext.java @@ -57,9 +57,7 @@ public class AsyncMaterializationContext extends MaterializationContext { */ public AsyncMaterializationContext(MTMV mtmv, Plan mvPlan, Plan mvOriginalPlan, List<Table> baseTables, List<Table> baseViews, CascadesContext cascadesContext, StructInfo structInfo) { - super(mvPlan, mvOriginalPlan, MaterializedViewUtils.generateMvScanPlan(mtmv, mtmv.getBaseIndexId(), - mtmv.getPartitionIds(), PreAggStatus.on(), cascadesContext), - cascadesContext, structInfo); + super(mvPlan, mvOriginalPlan, cascadesContext, structInfo); this.mtmv = mtmv; } @@ -110,7 +108,7 @@ public class AsyncMaterializationContext extends MaterializationContext { return Optional.empty(); } RelationId relationId = null; - Optional<LogicalOlapScan> logicalOlapScan = this.getScanPlan(null) + Optional<LogicalOlapScan> logicalOlapScan = this.getScanPlan(null, cascadesContext) .collectFirst(LogicalOlapScan.class::isInstance); if (logicalOlapScan.isPresent()) { relationId = logicalOlapScan.get().getRelationId(); @@ -132,7 +130,13 @@ public class AsyncMaterializationContext extends MaterializationContext { } @Override - public Plan getScanPlan(StructInfo queryInfo) { + public Plan getScanPlan(StructInfo queryInfo, CascadesContext cascadesContext) { + // If try to get scan plan or rewrite successfully, try to get mv read lock to avoid meta data inconsistent, + // try to get lock which should added before RBO + if (!this.isSuccess()) { + cascadesContext.getStatementContext().addTableReadLock(this.getMtmv()); + } + super.getScanPlan(queryInfo, cascadesContext); return scanPlan; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java index df535d59d87..38eba2ac340 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java @@ -105,22 +105,13 @@ public abstract class MaterializationContext { /** * MaterializationContext, this contains necessary info for query rewriting by materialization */ - public MaterializationContext(Plan plan, Plan originalPlan, Plan scanPlan, + public MaterializationContext(Plan plan, Plan originalPlan, CascadesContext cascadesContext, StructInfo structInfo) { this.plan = plan; this.originalPlan = originalPlan; - this.scanPlan = scanPlan; - StatementBase parsedStatement = cascadesContext.getStatementContext().getParsedStatement(); this.enableRecordFailureDetail = parsedStatement != null && parsedStatement.isExplain() && ExplainLevel.MEMO_PLAN == parsedStatement.getExplainOptions().getExplainLevel(); - List<Slot> originalPlanOutput = originalPlan.getOutput(); - List<Slot> scanPlanOutput = this.scanPlan.getOutput(); - if (originalPlanOutput.size() == scanPlanOutput.size()) { - for (int slotIndex = 0; slotIndex < originalPlanOutput.size(); slotIndex++) { - this.exprToScanExprMapping.put(originalPlanOutput.get(slotIndex), scanPlanOutput.get(slotIndex)); - } - } // Construct materialization struct info, catch exception which may cause planner roll back this.structInfo = structInfo == null ? constructStructInfo(plan, originalPlan, cascadesContext, new BitSet()).orElseGet(() -> null) @@ -128,10 +119,6 @@ public abstract class MaterializationContext { this.available = this.structInfo != null; if (available) { this.planOutputShuttledExpressions = this.structInfo.getPlanOutputShuttledExpressions(); - // materialization output expression shuttle, this will be used to expression rewrite - this.shuttledExprToScanExprMapping = ExpressionMapping.generate( - this.planOutputShuttledExpressions, - scanPlanOutput); } } @@ -176,17 +163,19 @@ public abstract class MaterializationContext { * if MaterializationContext is already rewritten successfully, then should generate new scan plan in later * query rewrite, because one plan may hit the materialized view repeatedly and the materialization scan output * should be different. - * This method should be called when query rewrite successfully */ - public void tryReGenerateScanPlan(CascadesContext cascadesContext) { + public void tryGenerateScanPlan(CascadesContext cascadesContext) { + if (!this.isAvailable()) { + return; + } this.scanPlan = doGenerateScanPlan(cascadesContext); - // materialization output expression shuttle, this will be used to expression rewrite - this.shuttledExprToScanExprMapping = ExpressionMapping.generate( - this.planOutputShuttledExpressions, - this.scanPlan.getOutput()); + // Materialization output expression shuttle, this will be used to expression rewrite + List<Slot> scanPlanOutput = this.scanPlan.getOutput(); + this.shuttledExprToScanExprMapping = ExpressionMapping.generate(this.planOutputShuttledExpressions, + scanPlanOutput); + // This is used by normalize statistics column expression Map<Expression, Expression> regeneratedMapping = new HashMap<>(); List<Slot> originalPlanOutput = originalPlan.getOutput(); - List<Slot> scanPlanOutput = this.scanPlan.getOutput(); if (originalPlanOutput.size() == scanPlanOutput.size()) { for (int slotIndex = 0; slotIndex < originalPlanOutput.size(); slotIndex++) { regeneratedMapping.put(originalPlanOutput.get(slotIndex), scanPlanOutput.get(slotIndex)); @@ -195,6 +184,17 @@ public abstract class MaterializationContext { this.exprToScanExprMapping = regeneratedMapping; } + /** + * Should clear scan plan after materializationContext is already rewritten successfully, + * Because one plan may hit the materialized view repeatedly and the materialization scan output + * should be different. + */ + public void clearScanPlan(CascadesContext cascadesContext) { + this.scanPlan = null; + this.shuttledExprToScanExprMapping = null; + this.exprToScanExprMapping = null; + } + public void addSlotMappingToCache(RelationMapping relationMapping, SlotMapping slotMapping) { queryToMaterializationSlotMappingCache.put(relationMapping, slotMapping); } @@ -275,7 +275,11 @@ public abstract class MaterializationContext { return originalPlan; } - public Plan getScanPlan(StructInfo queryStructInfo) { + public Plan getScanPlan(StructInfo queryStructInfo, CascadesContext cascadesContext) { + if (this.scanPlan == null || this.shuttledExprToScanExprMapping == null + || this.exprToScanExprMapping == null) { + tryGenerateScanPlan(cascadesContext); + } return scanPlan; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/SyncMaterializationContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/SyncMaterializationContext.java index 47b01385ac1..e27b3d51743 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/SyncMaterializationContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/SyncMaterializationContext.java @@ -25,6 +25,7 @@ import org.apache.doris.nereids.trees.plans.ObjectId; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PreAggStatus; import org.apache.doris.nereids.trees.plans.RelationId; +import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation; import org.apache.doris.nereids.trees.plans.algebra.Relation; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; @@ -55,9 +56,7 @@ public class SyncMaterializationContext extends MaterializationContext { */ public SyncMaterializationContext(Plan mvPlan, Plan mvOriginalPlan, OlapTable olapTable, long indexId, String indexName, CascadesContext cascadesContext, Statistics statistics) { - super(mvPlan, mvOriginalPlan, - MaterializedViewUtils.generateMvScanPlan(olapTable, indexId, olapTable.getPartitionIds(), - PreAggStatus.unset(), cascadesContext), cascadesContext, null); + super(mvPlan, mvOriginalPlan, cascadesContext, null); this.olapTable = olapTable; this.indexId = indexId; this.indexName = indexName; @@ -100,7 +99,7 @@ public class SyncMaterializationContext extends MaterializationContext { @Override Optional<Pair<Id, Statistics>> getPlanStatistics(CascadesContext cascadesContext) { RelationId relationId = null; - Optional<LogicalOlapScan> scanObj = this.getScanPlan(null) + Optional<LogicalOlapScan> scanObj = this.getScanPlan(null, cascadesContext) .collectFirst(LogicalOlapScan.class::isInstance); if (scanObj.isPresent()) { relationId = scanObj.get().getRelationId(); @@ -109,19 +108,27 @@ public class SyncMaterializationContext extends MaterializationContext { } @Override - public Plan getScanPlan(StructInfo queryStructInfo) { + public Plan getScanPlan(StructInfo queryStructInfo, CascadesContext cascadesContext) { + // Already get lock if sync mv, doesn't need to get lock + super.getScanPlan(queryStructInfo, cascadesContext); if (queryStructInfo == null) { return scanPlan; } - if (queryStructInfo.getRelations().size() == 1 - && queryStructInfo.getRelations().get(0) instanceof LogicalOlapScan - && !((LogicalOlapScan) queryStructInfo.getRelations().get(0)).getSelectedPartitionIds().isEmpty()) { + List<CatalogRelation> queryStructInfoRelations = queryStructInfo.getRelations(); + if (queryStructInfoRelations.size() == 1 + && queryStructInfoRelations.get(0) instanceof LogicalOlapScan + && !((LogicalOlapScan) queryStructInfoRelations.get(0)).getSelectedPartitionIds().isEmpty()) { // Partition prune if sync materialized view return scanPlan.accept(new DefaultPlanRewriter<Void>() { @Override public Plan visitLogicalOlapScan(LogicalOlapScan olapScan, Void context) { + if (!queryStructInfoRelations.get(0).getTable().getFullQualifiers().equals( + olapScan.getTable().getFullQualifiers())) { + // Only the same table, we can do partition prue + return olapScan; + } return olapScan.withSelectedPartitionIds( - ((LogicalOlapScan) queryStructInfo.getRelations().get(0)).getSelectedPartitionIds()); + ((LogicalOlapScan) queryStructInfoRelations.get(0)).getSelectedPartitionIds()); } }, null); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/IdStatisticsMapTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/IdStatisticsMapTest.java index a4c05fa81e6..0090982db00 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/IdStatisticsMapTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/IdStatisticsMapTest.java @@ -76,7 +76,7 @@ public class IdStatisticsMapTest extends SqlTestBase { .rewrite(); // scan plan output will be refreshed after mv rewrite successfully, so need tmp store Set<Slot> materializationScanOutput = c1.getMaterializationContexts().get(0) - .getScanPlan(null).getOutputSet(); + .getScanPlan(null, c1).getOutputSet(); tmpPlanChecker .optimize() .printlnBestPlanTree(); diff --git a/regression-test/data/nereids_rules_p0/mv/join/left_outer/outer_join.out b/regression-test/data/nereids_rules_p0/mv/join/left_outer/outer_join.out index 1a1b846054b..b8e78048d8e 100644 --- a/regression-test/data/nereids_rules_p0/mv/join/left_outer/outer_join.out +++ b/regression-test/data/nereids_rules_p0/mv/join/left_outer/outer_join.out @@ -373,3 +373,49 @@ 2023-12-12 2 mi 108 2 2023-12-12 2 mi 108 2 +-- !query12_0_before -- +2023-12-09 1 yy 95 4 +2023-12-09 1 yy 95 4 +2023-12-09 1 yy 96 4 +2023-12-09 1 yy 96 4 +2023-12-09 1 yy 97 4 +2023-12-09 1 yy 97 4 +2023-12-10 1 yy 100 2 +2023-12-10 1 yy 101 2 +2023-12-10 1 yy 98 2 +2023-12-10 1 yy 99 2 +2023-12-11 2 mm 102 3 +2023-12-11 2 mm 103 3 +2023-12-11 2 mm 104 3 +2023-12-12 2 mi 105 2 +2023-12-12 2 mi 105 2 +2023-12-12 2 mi 106 2 +2023-12-12 2 mi 106 2 +2023-12-12 2 mi 107 2 +2023-12-12 2 mi 107 2 +2023-12-12 2 mi 108 2 +2023-12-12 2 mi 108 2 + +-- !query12_0_after -- +2023-12-09 1 yy 95 4 +2023-12-09 1 yy 95 4 +2023-12-09 1 yy 96 4 +2023-12-09 1 yy 96 4 +2023-12-09 1 yy 97 4 +2023-12-09 1 yy 97 4 +2023-12-10 1 yy 100 2 +2023-12-10 1 yy 101 2 +2023-12-10 1 yy 98 2 +2023-12-10 1 yy 99 2 +2023-12-11 2 mm 102 3 +2023-12-11 2 mm 103 3 +2023-12-11 2 mm 104 3 +2023-12-12 2 mi 105 2 +2023-12-12 2 mi 105 2 +2023-12-12 2 mi 106 2 +2023-12-12 2 mi 106 2 +2023-12-12 2 mi 107 2 +2023-12-12 2 mi 107 2 +2023-12-12 2 mi 108 2 +2023-12-12 2 mi 108 2 + diff --git a/regression-test/suites/nereids_rules_p0/mv/join/left_outer/outer_join.groovy b/regression-test/suites/nereids_rules_p0/mv/join/left_outer/outer_join.groovy index f31a1a77978..faa2c747a83 100644 --- a/regression-test/suites/nereids_rules_p0/mv/join/left_outer/outer_join.groovy +++ b/regression-test/suites/nereids_rules_p0/mv/join/left_outer/outer_join.groovy @@ -759,4 +759,53 @@ suite("outer_join") { async_mv_rewrite_success(db, mv11_0, query11_0, "mv11_0") order_qt_query11_0_after "${query11_0}" sql """ DROP MATERIALIZED VIEW IF EXISTS mv11_0""" + + + def mv12_0 = """ + select + o_orderdate, + o_shippriority, + o_comment, + o.o_code as o_o_code, + l_orderkey, + l_partkey, + l.o_code as l_o_code + from + orders_same_col o left + join lineitem_same_col l on l_orderkey = o_orderkey + left join partsupp on ps_partkey = l_partkey and l_suppkey = ps_suppkey; + """ + + def query12_0 = """ + select + o_orderdate, + o_shippriority, + o_comment, + o.o_code + l_orderkey, + l_partkey + from + orders_same_col o left + join lineitem_same_col l on l_orderkey = o_orderkey + left join partsupp on ps_partkey = l_partkey and l_suppkey = ps_suppkey + where l.o_code <> '91' + union all + select + o_orderdate, + o_shippriority, + o_comment, + o.o_code + l_orderkey, + l_partkey + from + orders_same_col o left + join lineitem_same_col l on l_orderkey = o_orderkey + left join partsupp on ps_partkey = l_partkey and l_suppkey = ps_suppkey + where l.o_code = '92'; + """ + + order_qt_query12_0_before "${query12_0}" + async_mv_rewrite_success(db, mv12_0, query12_0, "mv12_0") + order_qt_query12_0_after "${query12_0}" + sql """ DROP MATERIALIZED VIEW IF EXISTS mv12_0""" } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org