This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 87b9425772 [Bug](materialized-view) fix where clause not analyzed 
after fe restart (#22268)
87b9425772 is described below

commit 87b94257727056b890fc98689e0c2b6e14169f13
Author: Pxl <pxl...@qq.com>
AuthorDate: Thu Jul 27 18:34:44 2023 +0800

    [Bug](materialized-view) fix where clause not analyzed after fe restart 
(#22268)
    
    fix where clause not analyzed after fe restart
---
 .../java/org/apache/doris/analysis/Analyzer.java   | 15 +++---
 .../apache/doris/analysis/NativeInsertStmt.java    |  2 +-
 .../doris/load/loadv2/LoadingTaskPlanner.java      |  2 +-
 .../plans/commands/InsertIntoTableCommand.java     |  3 +-
 .../org/apache/doris/planner/OlapTableSink.java    | 12 +++--
 .../apache/doris/planner/StreamLoadPlanner.java    |  4 +-
 .../apache/doris/planner/OlapTableSinkTest.java    |  8 +--
 .../test_partial_update_schema_change.groovy       | 61 +++++++++++++++++-----
 8 files changed, 76 insertions(+), 31 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
index 1e44278eef..761aae1442 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
@@ -146,9 +146,6 @@ public class Analyzer {
     private String schemaWild;
     private String schemaTable; // table used in DESCRIBE Table
 
-    // True if the corresponding select block has a limit and/or offset clause.
-    private boolean hasLimitOffsetClause = false;
-
     // Current depth of nested analyze() calls. Used for enforcing a
     // maximum expr-tree depth. Needs to be manually maintained by the user
     // of this Analyzer with incrementCallDepth() and decrementCallDepth().
@@ -657,6 +654,14 @@ public class Analyzer {
         }
     }
 
+    public void registerTupleDescriptor(TupleDescriptor desc) {
+        tupleByAlias.put(desc.getAlias(), desc);
+        for (SlotDescriptor slot : desc.getSlots()) {
+            String key = desc.getAlias() + "." + slot.getColumn().getName();
+            slotRefMap.put(key, slot);
+        }
+    }
+
     /**
      * Creates an returns an empty TupleDescriptor for the given table ref and 
registers
      * it against all its legal aliases. For tables refs with an explicit 
alias, only the
@@ -1818,10 +1823,6 @@ public class Analyzer {
         return hasEmptySpjResultSet;
     }
 
-    public void setHasLimitOffsetClause(boolean hasLimitOffset) {
-        this.hasLimitOffsetClause = hasLimitOffset;
-    }
-
     /**
      * Register all conjuncts in 'conjuncts' that make up the On-clause of the 
given
      * right-hand side of a join. Assigns each conjunct a unique id. If rhsRef 
is
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index 02b6e40f14..adf12e0ddf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -849,7 +849,7 @@ public class NativeInsertStmt extends InsertStmt {
 
     public void complete() throws UserException {
         if (!isExplain() && targetTable instanceof OlapTable) {
-            ((OlapTableSink) dataSink).complete();
+            ((OlapTableSink) dataSink).complete(analyzer);
             // add table indexes to transaction state
             TransactionState txnState = Env.getCurrentGlobalTransactionMgr()
                     .getTransactionState(db.getId(), transactionId);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
index 982eb813e6..fa86ab06b1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
@@ -171,7 +171,7 @@ public class LoadingTaskPlanner {
         OlapTableSink olapTableSink = new OlapTableSink(table, destTupleDesc, 
partitionIds,
                 Config.enable_single_replica_load);
         olapTableSink.init(loadId, txnId, dbId, timeoutS, 
sendBatchParallelism, false, strictMode);
-        olapTableSink.complete();
+        olapTableSink.complete(analyzer);
 
         // 3. Plan fragment
         PlanFragment sinkFragment = new PlanFragment(new PlanFragmentId(0), 
scanNode, DataPartition.RANDOM);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
index ab373c7886..3155eab2e7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.nereids.trees.plans.commands;
 
+import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.util.ProfileManager.ProfileType;
@@ -123,7 +124,7 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
                 ctx.getExecTimeout(),
                 ctx.getSessionVariable().getSendBatchParallelism(), false, 
false);
 
-        sink.complete();
+        sink.complete(new Analyzer(Env.getCurrentEnv(), ctx));
         TransactionState state = 
Env.getCurrentGlobalTransactionMgr().getTransactionState(
                 physicalOlapTableSink.getDatabase().getId(),
                 txn.getTxnId());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index 0bc30f978a..b09ebcff7a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.planner;
 
+import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.TupleDescriptor;
@@ -164,7 +165,7 @@ public class OlapTableSink extends DataSink {
     }
 
     // must called after tupleDescriptor is computed
-    public void complete() throws UserException {
+    public void complete(Analyzer analyzer) throws UserException {
         TOlapTableSink tSink = tDataSink.getOlapTableSink();
 
         tSink.setTableId(dstTable.getId());
@@ -176,7 +177,7 @@ public class OlapTableSink extends DataSink {
         }
         tSink.setNumReplicas(numReplicas);
         tSink.setNeedGenRollup(dstTable.shouldLoadToNewRollup());
-        tSink.setSchema(createSchema(tSink.getDbId(), dstTable));
+        tSink.setSchema(createSchema(tSink.getDbId(), dstTable, analyzer));
         tSink.setPartition(createPartition(tSink.getDbId(), dstTable));
         List<TOlapTableLocationParam> locationParams = 
createLocation(dstTable);
         tSink.setLocation(locationParams.get(0));
@@ -214,7 +215,7 @@ public class OlapTableSink extends DataSink {
         return tDataSink;
     }
 
-    private TOlapTableSchemaParam createSchema(long dbId, OlapTable table) {
+    private TOlapTableSchemaParam createSchema(long dbId, OlapTable table, 
Analyzer analyzer) throws AnalysisException {
         TOlapTableSchemaParam schemaParam = new TOlapTableSchemaParam();
         schemaParam.setDbId(dbId);
         schemaParam.setTableId(table.getId());
@@ -253,6 +254,11 @@ public class OlapTableSink extends DataSink {
             if (indexMeta.getWhereClause() != null) {
                 Expr expr = indexMeta.getWhereClause().clone();
                 expr.replaceSlot(tupleDescriptor);
+                if (analyzer != null) {
+                    tupleDescriptor.setTable(table);
+                    analyzer.registerTupleDescriptor(tupleDescriptor);
+                    expr.analyze(analyzer);
+                }
                 indexSchema.setWhereClause(expr.treeToThrift());
             }
             indexSchema.setColumnsDesc(columnsDesc);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 40d5b99f34..ac5d6c595c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -266,7 +266,7 @@ public class StreamLoadPlanner {
         olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout,
                 taskInfo.getSendBatchParallelism(), 
taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode());
         olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, 
partialUpdateInputColumns);
-        olapTableSink.complete();
+        olapTableSink.complete(analyzer);
 
         // for stream load, we only need one fragment, ScanNode -> DataSink.
         // OlapTableSink can dispatch data to corresponding node.
@@ -476,7 +476,7 @@ public class StreamLoadPlanner {
         olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout,
                 taskInfo.getSendBatchParallelism(), 
taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode());
         olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, 
partialUpdateInputColumns);
-        olapTableSink.complete();
+        olapTableSink.complete(analyzer);
 
         // for stream load, we only need one fragment, ScanNode -> DataSink.
         // OlapTableSink can dispatch data to corresponding node.
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java
index c6da7304d5..dc98026a00 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java
@@ -108,7 +108,7 @@ public class OlapTableSinkTest {
         dstTable.getPartitionInfo().setIsMutable(partition.getId(), true);
         OlapTableSink sink = new OlapTableSink(dstTable, tuple, 
Lists.newArrayList(2L), false);
         sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false);
-        sink.complete();
+        sink.complete(null);
         LOG.info("sink is {}", sink.toThrift());
         LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL));
     }
@@ -146,7 +146,7 @@ public class OlapTableSinkTest {
         OlapTableSink sink = new OlapTableSink(dstTable, tuple, 
Lists.newArrayList(p1.getId()), false);
         sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false);
         try {
-            sink.complete();
+            sink.complete(null);
         } catch (UserException e) {
             // CHECKSTYLE IGNORE THIS LINE
         }
@@ -170,7 +170,7 @@ public class OlapTableSinkTest {
 
         OlapTableSink sink = new OlapTableSink(dstTable, tuple, 
Lists.newArrayList(unknownPartId), false);
         sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false);
-        sink.complete();
+        sink.complete(null);
         LOG.info("sink is {}", sink.toThrift());
         LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL));
     }
@@ -208,7 +208,7 @@ public class OlapTableSinkTest {
         OlapTableSink sink = new OlapTableSink(dstTable, tuple, 
Lists.newArrayList(p1.getId()), false);
         sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false);
         try {
-            sink.complete();
+            sink.complete(null);
         } catch (UserException e) {
             // CHECKSTYLE IGNORE THIS LINE
         }
diff --git 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy
 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy
index 905c622159..9883f19dcd 100644
--- 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy
+++ 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy
@@ -64,12 +64,15 @@ suite("test_partial_update_schema_change", "p0") {
     
     // schema change
     sql " ALTER table ${tableName} add column c10 INT DEFAULT '0' "
+    def try_times=100
     while(true){
         def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = 
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+        Thread.sleep(1000)
         if(res[0][9].toString() == "FINISHED"){
             break;
         }
-        Thread.sleep(500)
+        assert(try_times>0)
+        try_times--
     }
     
     // test load data without new column
@@ -174,12 +177,15 @@ suite("test_partial_update_schema_change", "p0") {
     
     // schema change
     sql " ALTER table ${tableName} DROP COLUMN c8 "
+    try_times=100
     while(true){
         def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = 
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+        Thread.sleep(1000)
         if(res[0][9].toString() == "FINISHED"){
             break;
         }
-        Thread.sleep(500)
+        assert(try_times>0)
+        try_times--
     }
 
     // test load data without delete column
@@ -283,12 +289,15 @@ suite("test_partial_update_schema_change", "p0") {
     
     // schema change
     sql " ALTER table ${tableName} MODIFY COLUMN c2 double "
+    try_times=100
     while(true){
         def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = 
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+        Thread.sleep(1000)
         if(res[0][9].toString() == "FINISHED"){
             break;
         }
-        Thread.sleep(500)
+        assert(try_times>0)
+        try_times--
     }
 
     // test load data with update column
@@ -358,20 +367,27 @@ suite("test_partial_update_schema_change", "p0") {
     
     // schema change
     sql " ALTER table ${tableName} ADD COLUMN c1 int key null "
+    try_times=100
     while(true){
         def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = 
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+        Thread.sleep(1000)
         if(res[0][9].toString() == "FINISHED"){
             break;
         }
-        Thread.sleep(500)
+        assert(try_times>0)
+        try_times--
     }
+
     sql " ALTER table ${tableName} ADD COLUMN c2 int null "
+    try_times=100
     while(true){
         def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = 
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+        Thread.sleep(1000)
         if(res[0][9].toString() == "FINISHED"){
             break;
         }
-        Thread.sleep(500)
+        assert(try_times>0)
+        try_times--
     }
 
     // test load data with all key column
@@ -451,12 +467,15 @@ suite("test_partial_update_schema_change", "p0") {
     qt_sql10 " select * from ${tableName} order by c0 "
     
     sql " CREATE INDEX test ON ${tableName} (c1) USING BITMAP "
+    try_times=100
     while(true){
         def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = 
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+        Thread.sleep(1000)
         if(res[0][9].toString() == "FINISHED"){
             break;
         }
-        Thread.sleep(500)
+        assert(try_times>0)
+        try_times--
     }
 
     //test load data with create index
@@ -605,12 +624,15 @@ suite("test_partial_update_schema_change", "p0") {
     
     // schema change
     sql " ALTER table ${tableName} add column c10 INT DEFAULT '0' "
+    try_times=100
     while(true){
         def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = 
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+        Thread.sleep(1000)
         if(res[0][9].toString() == "FINISHED"){
             break;
         }
-        Thread.sleep(500)
+        assert(try_times>0)
+        try_times--
     }
     
     // test load data without new column
@@ -714,12 +736,15 @@ suite("test_partial_update_schema_change", "p0") {
     
     // schema change
     sql " ALTER table ${tableName} DROP COLUMN c8 "
+    try_times=100
     while(true){
         def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = 
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+        Thread.sleep(1000)
         if(res[0][9].toString() == "FINISHED"){
             break;
         }
-        Thread.sleep(500)
+        assert(try_times>0)
+        try_times--
     }
 
     // test load data without delete column
@@ -822,12 +847,15 @@ suite("test_partial_update_schema_change", "p0") {
     
     // schema change
     sql " ALTER table ${tableName} MODIFY COLUMN c2 double "
+    try_times=100
     while(true){
         def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = 
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+        Thread.sleep(1000)
         if(res[0][9].toString() == "FINISHED"){
             break;
         }
-        Thread.sleep(500)
+        assert(try_times>0)
+        try_times--
     }
 
     // test load data with update column
@@ -896,20 +924,26 @@ suite("test_partial_update_schema_change", "p0") {
     
     // schema change
     sql " ALTER table ${tableName} ADD COLUMN c1 int key null "
+    try_times=100
     while(true){
         def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = 
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+        Thread.sleep(1000)
         if(res[0][9].toString() == "FINISHED"){
             break;
         }
-        Thread.sleep(500)
+        assert(try_times>0)
+        try_times--
     }
     sql " ALTER table ${tableName} ADD COLUMN c2 int null "
+    try_times=100
     while(true){
         def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = 
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+        Thread.sleep(1000)
         if(res[0][9].toString() == "FINISHED"){
             break;
         }
-        Thread.sleep(500)
+        assert(try_times>0)
+        try_times--
     }
 
     // test load data with all key column
@@ -987,12 +1021,15 @@ suite("test_partial_update_schema_change", "p0") {
     qt_sql23 " select * from ${tableName} order by c0 "
     
     sql " CREATE INDEX test ON ${tableName} (c1) USING BITMAP "
+    try_times=100
     while(true){
         def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = 
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+        Thread.sleep(1000)
         if(res[0][9].toString() == "FINISHED"){
             break;
         }
-        Thread.sleep(500)
+        assert(try_times>0)
+        try_times--
     }
 
     //test load data with create index


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to