This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git

The following commit(s) were added to refs/heads/master by this push:
     new 4aaa65744b2 [fix](Nereids) Use the schema saved during planning as the 
schema of the original target table (#47337)
4aaa65744b2 is described below

commit 4aaa65744b2dc5dba22da2d0e511d3e87f342147
Author: morrySnow <zhangwen...@selectdb.com>
AuthorDate: Fri Jan 24 14:56:28 2025 +0800

    [fix](Nereids) Use the schema saved during planning as the schema of the 
original target table (#47337)
    
    ### What problem does this PR solve?
    
    Related PR: #47033 #45045
    
    Problem Summary:
    
    because schema change does not involve recreating the table object, but
    rather rebuilding the full schema. So, we should use the schema saved
    during planning as the schema of the original target table.
---
 .../java/org/apache/doris/nereids/StatementContext.java   |  7 +++++++
 .../pattern/generator/PlanPatternGeneratorAnalyzer.java   |  4 +---
 .../doris/nereids/rules/analysis/CollectRelation.java     | 14 +++++++++++---
 .../plans/commands/insert/InsertIntoTableCommand.java     | 15 ++++++---------
 4 files changed, 25 insertions(+), 15 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
index 597cef2d47e..75353f446a4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
@@ -18,6 +18,7 @@
 package org.apache.doris.nereids;
 
 import org.apache.doris.analysis.StatementBase;
+import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.View;
 import org.apache.doris.catalog.constraint.TableIdentifier;
@@ -178,6 +179,8 @@ public class StatementContext implements Closeable {
     private final Map<List<String>, TableIf> insertTargetTables = 
Maps.newHashMap();
     // save view's def and sql mode to avoid them change before lock
     private final Map<List<String>, Pair<String, Long>> viewInfos = 
Maps.newHashMap();
+    // save insert into schema to avoid schema changed between two read locks
+    private final List<Column> insertTargetSchema = new ArrayList<>();
 
     // for create view support in nereids
     // key is the start and end position of the sql substring that needs to be 
replaced,
@@ -281,6 +284,10 @@ public class StatementContext implements Closeable {
         return tables;
     }
 
+    public List<Column> getInsertTargetSchema() {
+        return insertTargetSchema;
+    }
+
     public void setTables(Map<List<String>, TableIf> tables) {
         this.tables.clear();
         this.tables.putAll(tables);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/generator/PlanPatternGeneratorAnalyzer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/generator/PlanPatternGeneratorAnalyzer.java
index 99d7c308dac..23e7b5eca76 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/generator/PlanPatternGeneratorAnalyzer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/generator/PlanPatternGeneratorAnalyzer.java
@@ -19,7 +19,6 @@ package org.apache.doris.nereids.pattern.generator;
 
 import org.apache.doris.nereids.pattern.generator.javaast.ClassDeclaration;
 
-import java.lang.reflect.Modifier;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -45,8 +44,7 @@ public class PlanPatternGeneratorAnalyzer {
         Map<ClassDeclaration, Set<String>> planClassMap = 
analyzer.getParentClassMap().entrySet().stream()
                 .filter(kv -> 
kv.getValue().contains("org.apache.doris.nereids.trees.plans.Plan"))
                 .filter(kv -> !kv.getKey().name.equals("GroupPlan"))
-                .filter(kv -> !Modifier.isAbstract(kv.getKey().modifiers.mod)
-                        && kv.getKey() instanceof ClassDeclaration)
+                .filter(kv -> kv.getKey() instanceof ClassDeclaration)
                 .collect(Collectors.toMap(kv -> (ClassDeclaration) 
kv.getKey(), kv -> kv.getValue()));
 
         List<PlanPatternGenerator> generators = planClassMap.entrySet()
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java
index 9426ab4d382..92a4fb76d49 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java
@@ -41,6 +41,7 @@ import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalCTE;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias;
+import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
 import org.apache.doris.nereids.util.RelationUtil;
 
 import com.google.common.collect.ImmutableList;
@@ -75,8 +76,8 @@ public class CollectRelation implements AnalysisRuleFactory {
                 unboundRelation()
                         .thenApply(this::collectFromUnboundRelation)
                         .toRule(RuleType.COLLECT_TABLE_FROM_RELATION),
-                unboundTableSink()
-                        .thenApply(this::collectFromUnboundTableSink)
+                unboundLogicalSink()
+                        .thenApply(this::collectFromUnboundSink)
                         .toRule(RuleType.COLLECT_TABLE_FROM_SINK),
                 any().whenNot(UnboundRelation.class::isInstance)
                         .whenNot(UnboundTableSink.class::isInstance)
@@ -124,7 +125,7 @@ public class CollectRelation implements AnalysisRuleFactory 
{
         return null;
     }
 
-    private Plan 
collectFromUnboundTableSink(MatchingContext<UnboundTableSink<Plan>> ctx) {
+    private Plan 
collectFromUnboundSink(MatchingContext<UnboundLogicalSink<Plan>> ctx) {
         List<String> nameParts = ctx.root.getNameParts();
         switch (nameParts.size()) {
             case 1:
@@ -182,6 +183,13 @@ public class CollectRelation implements 
AnalysisRuleFactory {
         if (tableFrom == TableFrom.QUERY) {
             collectMTMVCandidates(table, cascadesContext);
         }
+        if (tableFrom == TableFrom.INSERT_TARGET) {
+            if 
(!cascadesContext.getStatementContext().getInsertTargetSchema().isEmpty()) {
+                LOG.warn("collect insert target table '{}' more than once.", 
tableQualifier);
+            }
+            
cascadesContext.getStatementContext().getInsertTargetSchema().clear();
+            
cascadesContext.getStatementContext().getInsertTargetSchema().addAll(table.getFullSchema());
+        }
         if (table instanceof View) {
             parseAndCollectFromView(tableQualifier, (View) table, 
cascadesContext);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 76c72f82f90..39c5909d4f5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -28,7 +28,6 @@ import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.profile.ProfileManager.ProfileType;
 import org.apache.doris.common.util.DebugUtil;
-import org.apache.doris.common.util.MetaLockUtils;
 import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
 import org.apache.doris.datasource.jdbc.JdbcExternalTable;
@@ -73,7 +72,6 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
@@ -186,9 +184,7 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
 
             // lock after plan and check does table's schema changed to ensure 
we lock table order by id.
             TableIf newestTargetTableIf = 
RelationUtil.getTable(qualifiedTargetTableName, ctx.getEnv());
-            List<TableIf> targetTables = Lists.newArrayList(targetTableIf, 
newestTargetTableIf);
-            targetTables.sort(Comparator.comparing(TableIf::getId));
-            MetaLockUtils.readLockTables(targetTables);
+            newestTargetTableIf.readLock();
             try {
                 if (targetTableIf.getId() != newestTargetTableIf.getId()) {
                     LOG.warn("insert plan failed {} times. query id is {}. 
table id changed from {} to {}",
@@ -196,10 +192,11 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
                             targetTableIf.getId(), 
newestTargetTableIf.getId());
                     continue;
                 }
-                if 
(!targetTableIf.getFullSchema().equals(newestTargetTableIf.getFullSchema())) {
+                // Use the schema saved during planning as the schema of the 
original target table.
+                if 
(!ctx.getStatementContext().getInsertTargetSchema().equals(newestTargetTableIf.getFullSchema()))
 {
                     LOG.warn("insert plan failed {} times. query id is {}. 
table schema changed from {} to {}",
                             retryTimes, DebugUtil.printId(ctx.queryId()),
-                            targetTableIf.getFullSchema(), 
newestTargetTableIf.getFullSchema());
+                            ctx.getStatementContext().getInsertTargetSchema(), 
newestTargetTableIf.getFullSchema());
                     continue;
                 }
                 if (!insertExecutor.isEmptyInsert()) {
@@ -209,9 +206,9 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
                             buildResult.physicalSink
                     );
                 }
-                MetaLockUtils.readUnlockTables(targetTables);
+                newestTargetTableIf.readUnlock();
             } catch (Throwable e) {
-                MetaLockUtils.readUnlockTables(targetTables);
+                newestTargetTableIf.readUnlock();
                 // the abortTxn in onFail need to acquire table write lock
                 if (insertExecutor != null) {
                     insertExecutor.onFail(e);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to