This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4aaa65744b2 [fix](Nereids) Use the schema saved during planning as the schema of the original target table (#47337) 4aaa65744b2 is described below commit 4aaa65744b2dc5dba22da2d0e511d3e87f342147 Author: morrySnow <zhangwen...@selectdb.com> AuthorDate: Fri Jan 24 14:56:28 2025 +0800 [fix](Nereids) Use the schema saved during planning as the schema of the original target table (#47337) ### What problem does this PR solve? Related PR: #47033 #45045 Problem Summary: because schema change does not involve recreating the table object, but rather rebuilding the full schema. So, we should use the schema saved during planning as the schema of the original target table. --- .../java/org/apache/doris/nereids/StatementContext.java | 7 +++++++ .../pattern/generator/PlanPatternGeneratorAnalyzer.java | 4 +--- .../doris/nereids/rules/analysis/CollectRelation.java | 14 +++++++++++--- .../plans/commands/insert/InsertIntoTableCommand.java | 15 ++++++--------- 4 files changed, 25 insertions(+), 15 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java index 597cef2d47e..75353f446a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java @@ -18,6 +18,7 @@ package org.apache.doris.nereids; import org.apache.doris.analysis.StatementBase; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.View; import org.apache.doris.catalog.constraint.TableIdentifier; @@ -178,6 +179,8 @@ public class StatementContext implements Closeable { private final Map<List<String>, TableIf> insertTargetTables = Maps.newHashMap(); // save view's def and sql mode to avoid them change before lock private final Map<List<String>, Pair<String, Long>> viewInfos = Maps.newHashMap(); + // save insert into schema to avoid schema changed between two read locks + private final List<Column> insertTargetSchema = new ArrayList<>(); // for create view support in nereids // key is the start and end position of the sql substring that needs to be replaced, @@ -281,6 +284,10 @@ public class StatementContext implements Closeable { return tables; } + public List<Column> getInsertTargetSchema() { + return insertTargetSchema; + } + public void setTables(Map<List<String>, TableIf> tables) { this.tables.clear(); this.tables.putAll(tables); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/generator/PlanPatternGeneratorAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/generator/PlanPatternGeneratorAnalyzer.java index 99d7c308dac..23e7b5eca76 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/generator/PlanPatternGeneratorAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/generator/PlanPatternGeneratorAnalyzer.java @@ -19,7 +19,6 @@ package org.apache.doris.nereids.pattern.generator; import org.apache.doris.nereids.pattern.generator.javaast.ClassDeclaration; -import java.lang.reflect.Modifier; import java.util.List; import java.util.Map; import java.util.Optional; @@ -45,8 +44,7 @@ public class PlanPatternGeneratorAnalyzer { Map<ClassDeclaration, Set<String>> planClassMap = analyzer.getParentClassMap().entrySet().stream() .filter(kv -> kv.getValue().contains("org.apache.doris.nereids.trees.plans.Plan")) .filter(kv -> !kv.getKey().name.equals("GroupPlan")) - .filter(kv -> !Modifier.isAbstract(kv.getKey().modifiers.mod) - && kv.getKey() instanceof ClassDeclaration) + .filter(kv -> kv.getKey() instanceof ClassDeclaration) .collect(Collectors.toMap(kv -> (ClassDeclaration) kv.getKey(), kv -> kv.getValue())); List<PlanPatternGenerator> generators = planClassMap.entrySet() diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java index 9426ab4d382..92a4fb76d49 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java @@ -41,6 +41,7 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalCTE; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias; +import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink; import org.apache.doris.nereids.util.RelationUtil; import com.google.common.collect.ImmutableList; @@ -75,8 +76,8 @@ public class CollectRelation implements AnalysisRuleFactory { unboundRelation() .thenApply(this::collectFromUnboundRelation) .toRule(RuleType.COLLECT_TABLE_FROM_RELATION), - unboundTableSink() - .thenApply(this::collectFromUnboundTableSink) + unboundLogicalSink() + .thenApply(this::collectFromUnboundSink) .toRule(RuleType.COLLECT_TABLE_FROM_SINK), any().whenNot(UnboundRelation.class::isInstance) .whenNot(UnboundTableSink.class::isInstance) @@ -124,7 +125,7 @@ public class CollectRelation implements AnalysisRuleFactory { return null; } - private Plan collectFromUnboundTableSink(MatchingContext<UnboundTableSink<Plan>> ctx) { + private Plan collectFromUnboundSink(MatchingContext<UnboundLogicalSink<Plan>> ctx) { List<String> nameParts = ctx.root.getNameParts(); switch (nameParts.size()) { case 1: @@ -182,6 +183,13 @@ public class CollectRelation implements AnalysisRuleFactory { if (tableFrom == TableFrom.QUERY) { collectMTMVCandidates(table, cascadesContext); } + if (tableFrom == TableFrom.INSERT_TARGET) { + if (!cascadesContext.getStatementContext().getInsertTargetSchema().isEmpty()) { + LOG.warn("collect insert target table '{}' more than once.", tableQualifier); + } + cascadesContext.getStatementContext().getInsertTargetSchema().clear(); + cascadesContext.getStatementContext().getInsertTargetSchema().addAll(table.getFullSchema()); + } if (table instanceof View) { parseAndCollectFromView(tableQualifier, (View) table, cascadesContext); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 76c72f82f90..39c5909d4f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -28,7 +28,6 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; import org.apache.doris.common.profile.ProfileManager.ProfileType; import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.common.util.MetaLockUtils; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.datasource.jdbc.JdbcExternalTable; @@ -73,7 +72,6 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.Comparator; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -186,9 +184,7 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, // lock after plan and check does table's schema changed to ensure we lock table order by id. TableIf newestTargetTableIf = RelationUtil.getTable(qualifiedTargetTableName, ctx.getEnv()); - List<TableIf> targetTables = Lists.newArrayList(targetTableIf, newestTargetTableIf); - targetTables.sort(Comparator.comparing(TableIf::getId)); - MetaLockUtils.readLockTables(targetTables); + newestTargetTableIf.readLock(); try { if (targetTableIf.getId() != newestTargetTableIf.getId()) { LOG.warn("insert plan failed {} times. query id is {}. table id changed from {} to {}", @@ -196,10 +192,11 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, targetTableIf.getId(), newestTargetTableIf.getId()); continue; } - if (!targetTableIf.getFullSchema().equals(newestTargetTableIf.getFullSchema())) { + // Use the schema saved during planning as the schema of the original target table. + if (!ctx.getStatementContext().getInsertTargetSchema().equals(newestTargetTableIf.getFullSchema())) { LOG.warn("insert plan failed {} times. query id is {}. table schema changed from {} to {}", retryTimes, DebugUtil.printId(ctx.queryId()), - targetTableIf.getFullSchema(), newestTargetTableIf.getFullSchema()); + ctx.getStatementContext().getInsertTargetSchema(), newestTargetTableIf.getFullSchema()); continue; } if (!insertExecutor.isEmptyInsert()) { @@ -209,9 +206,9 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, buildResult.physicalSink ); } - MetaLockUtils.readUnlockTables(targetTables); + newestTargetTableIf.readUnlock(); } catch (Throwable e) { - MetaLockUtils.readUnlockTables(targetTables); + newestTargetTableIf.readUnlock(); // the abortTxn in onFail need to acquire table write lock if (insertExecutor != null) { insertExecutor.onFail(e); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org