This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1cfe5237410 [fix](group_commit) Need to wait wal to be deleted when 
creating MaterializedView (#30956)
1cfe5237410 is described below

commit 1cfe5237410e18684ddb4cea601be5ac0590ab79
Author: huanghaibin <284824...@qq.com>
AuthorDate: Wed Feb 21 08:58:31 2024 +0800

    [fix](group_commit) Need to wait wal to be deleted when creating 
MaterializedView (#30956)
---
 be/src/olap/wal/wal_manager.cpp                    |   2 +-
 .../doris/alter/MaterializedViewHandler.java       |  10 ++
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |  33 +-----
 .../org/apache/doris/load/GroupCommitManager.java  |  28 +++++
 .../data/insert_p0/insert_group_commit_into.out    |  18 +++
 .../insert_p0/insert_group_commit_into.groovy      | 126 +++++++++++++++++++++
 6 files changed, 186 insertions(+), 31 deletions(-)

diff --git a/be/src/olap/wal/wal_manager.cpp b/be/src/olap/wal/wal_manager.cpp
index 06e1bd37d72..10a979f89dd 100644
--- a/be/src/olap/wal/wal_manager.cpp
+++ b/be/src/olap/wal/wal_manager.cpp
@@ -289,6 +289,7 @@ Status WalManager::_load_wals() {
                 continue;
             }
         }
+        _exec_env->wal_mgr()->add_wal_queue(wal.tb_id, wal.wal_id);
         WARN_IF_ERROR(add_recover_wal(wal.db_id, wal.tb_id, wal.wal_id, 
wal.wal_path),
                       fmt::format("Failed to add recover wal={}", 
wal.wal_path));
     }
@@ -406,7 +407,6 @@ Status WalManager::_replay_background() {
 
 Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, int64_t 
wal_id,
                                    std::string wal) {
-    add_wal_queue(table_id, wal_id);
     std::lock_guard<std::shared_mutex> wrlock(_table_lock);
     std::shared_ptr<WalTable> table_ptr;
     auto it = _table_map.find(table_id);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
index aa00989cfa4..1dfe7759ec8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
@@ -219,6 +219,10 @@ public class MaterializedViewHandler extends AlterHandler {
 
             olapTable.setState(OlapTableState.ROLLUP);
 
+            // wait wal delete
+            
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
+            
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
+
             Env.getCurrentEnv().getEditLog().logAlterJob(rollupJobV2);
             LOG.info("finished to create materialized view job: {}", 
rollupJobV2.getJobId());
         } finally {
@@ -301,6 +305,11 @@ public class MaterializedViewHandler extends AlterHandler {
             // ATTN: This order is not mandatory, because database lock will 
protect us,
             // but this order is more reasonable
             olapTable.setState(OlapTableState.ROLLUP);
+
+            // wait wal delete
+            
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
+            
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
+
             // 2 batch submit rollup job
             List<AlterJobV2> rollupJobV2List = new 
ArrayList<>(rollupNameJobMap.values());
             batchAddAlterJobV2(rollupJobV2List);
@@ -1170,6 +1179,7 @@ public class MaterializedViewHandler extends AlterHandler 
{
     private void onJobDone(AlterJobV2 alterJob) {
         removeJobFromRunningQueue(alterJob);
         if (removeAlterJobV2FromTableNotFinalStateJobMap(alterJob)) {
+            
Env.getCurrentEnv().getGroupCommitManager().unblockTable(alterJob.getTableId());
             changeTableStatus(alterJob.getDbId(), alterJob.getTableId(), 
OlapTableState.NORMAL);
             LOG.info("set table's state to NORMAL, table id: {}, job id: {}", 
alterJob.getTableId(),
                     alterJob.getJobId());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 2efbee47393..70787936105 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -40,7 +40,6 @@ import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.catalog.TabletMeta;
 import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.MarkedCountDownLatch;
 import org.apache.doris.common.MetaNotFoundException;
@@ -536,7 +535,9 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
             }
             return;
         }
-        waitWalFinished();
+        Env.getCurrentEnv().getGroupCommitManager().blockTable(tableId);
+        Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(tableId);
+        Env.getCurrentEnv().getGroupCommitManager().unblockTable(tableId);
         /*
          * all tasks are finished. check the integrity.
          * we just check whether all new replicas are healthy.
@@ -599,34 +600,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         LOG.info("set table's state to NORMAL, table id: {}, job id: {}", 
tableId, jobId);
     }
 
-    private void waitWalFinished() {
-        // wait wal done here
-        Env.getCurrentEnv().getGroupCommitManager().blockTable(tableId);
-        LOG.info("block group commit for table={} when schema change", 
tableId);
-        List<Long> aliveBeIds = 
Env.getCurrentSystemInfo().getAllBackendIds(true);
-        long expireTime = System.currentTimeMillis() + 
Config.check_wal_queue_timeout_threshold;
-        while (true) {
-            LOG.info("wait for wal queue size to be empty");
-            boolean walFinished = Env.getCurrentEnv().getGroupCommitManager()
-                    .isPreviousWalFinished(tableId, aliveBeIds);
-            if (walFinished) {
-                LOG.info("all wal is finished for table={}", tableId);
-                break;
-            } else if (System.currentTimeMillis() > expireTime) {
-                LOG.warn("waitWalFinished time out for table={}", tableId);
-                break;
-            } else {
-                try {
-                    Thread.sleep(100);
-                } catch (InterruptedException ie) {
-                    LOG.warn("failed to wait for wal for table={} when schema 
change", tableId, ie);
-                }
-            }
-        }
-        Env.getCurrentEnv().getGroupCommitManager().unblockTable(tableId);
-        LOG.info("unblock group commit for table={} when schema change", 
tableId);
-    }
-
     private void onFinished(OlapTable tbl) {
         // replace the origin index with shadow index, set index state as 
NORMAL
         for (Partition partition : tbl.getPartitions()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
index 12410945e9f..c4bf1e03c9c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
@@ -45,11 +45,39 @@ public class GroupCommitManager {
     }
 
     public void blockTable(long tableId) {
+        LOG.info("block group commit for table={} when schema change", 
tableId);
         blockedTableIds.add(tableId);
     }
 
     public void unblockTable(long tableId) {
         blockedTableIds.remove(tableId);
+        LOG.info("unblock group commit for table={} when schema change", 
tableId);
+    }
+
+    /**
+     * Waiting All WAL files to be deleted.
+     */
+    public void waitWalFinished(long tableId) {
+        List<Long> aliveBeIds = 
Env.getCurrentSystemInfo().getAllBackendIds(true);
+        long expireTime = System.currentTimeMillis() + 
Config.check_wal_queue_timeout_threshold;
+        while (true) {
+            LOG.info("wait for wal queue size to be empty");
+            boolean walFinished = Env.getCurrentEnv().getGroupCommitManager()
+                    .isPreviousWalFinished(tableId, aliveBeIds);
+            if (walFinished) {
+                LOG.info("all wal is finished for table={}", tableId);
+                break;
+            } else if (System.currentTimeMillis() > expireTime) {
+                LOG.warn("waitWalFinished time out for table={}", tableId);
+                break;
+            } else {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException ie) {
+                    LOG.warn("failed to wait for wal for table={} when schema 
change", tableId, ie);
+                }
+            }
+        }
     }
 
     /**
diff --git a/regression-test/data/insert_p0/insert_group_commit_into.out 
b/regression-test/data/insert_p0/insert_group_commit_into.out
index dfb3a3b41c0..71a1473e1e6 100644
--- a/regression-test/data/insert_p0/insert_group_commit_into.out
+++ b/regression-test/data/insert_p0/insert_group_commit_into.out
@@ -94,6 +94,15 @@ q    50
 -- !sql --
 0      service_46da0dab-e27d-4820-aea2-9bfc15741615    1697032066304   0       
3229b7cd-f3a2-4359-aa24-946388c9cc54    0       
CgEwEiQzMjI5YjdjZC1mM2EyLTQzNTktYWEyNC05NDYzODhjOWNjNTQaggQY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVm
 [...]
 
+-- !order --
+2023-06-10     cib2205045_1_1s 0.0000  168939.0        0.0000  0.0     0.0000  
0.0     0.0000  day
+
+-- !order2 --
+1      1       1
+2      2       2
+3      3       3
+4      4       4
+
 -- !sql --
 1      a       10
 2      b       -1
@@ -189,3 +198,12 @@ q  50
 -- !sql --
 0      service_46da0dab-e27d-4820-aea2-9bfc15741615    1697032066304   0       
3229b7cd-f3a2-4359-aa24-946388c9cc54    0       
CgEwEiQzMjI5YjdjZC1mM2EyLTQzNTktYWEyNC05NDYzODhjOWNjNTQaggQY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVm
 [...]
 
+-- !order --
+2023-06-10     cib2205045_1_1s 0.0000  168939.0        0.0000  0.0     0.0000  
0.0     0.0000  day
+
+-- !order2 --
+1      1       1
+2      2       2
+3      3       3
+4      4       4
+
diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy 
b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
index 391e106aa2f..b7c6cc4e3c4 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
@@ -318,5 +318,131 @@ suite("insert_group_commit_into") {
         } finally {
             // try_sql("DROP TABLE ${table}")
         }
+
+        // table with MaterializedView
+        tableName = "insert_group_commit_into_mv"
+        table = dbName + "." + tableName
+        def table_tmp = dbName + ".test_table_tmp"
+        try {
+            // create table
+            sql """ drop table if exists ${table}; """
+            sql """CREATE table ${table} (
+            `ordernum` varchar(65533) NOT NULL ,
+            `dnt` datetime NOT NULL ,
+            `data` json NULL 
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`ordernum`, `dnt`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`ordernum`) BUCKETS 3
+            PROPERTIES (
+            "replication_allocation" = "tag.location.default: 1"
+            );"""
+            sql """drop table if exists ${table_tmp};"""
+            sql """CREATE TABLE ${table_tmp} (
+            `dnt` varchar(200) NULL,
+            `ordernum` varchar(200) NULL,
+            `type` varchar(20) NULL,
+            `powers` double SUM NULL,
+            `p0` double REPLACE NULL,
+            `heatj` double SUM NULL,
+            `j0` double REPLACE NULL,
+            `heatg` double SUM NULL,
+            `g0` double REPLACE NULL,
+            `solar` double SUM NULL
+            ) ENGINE=OLAP
+            AGGREGATE KEY(`dnt`, `ordernum`, `type`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`ordernum`) BUCKETS 1
+            PROPERTIES (
+            "replication_allocation" = "tag.location.default: 1"
+            ); """
+            sql """DROP MATERIALIZED VIEW IF EXISTS ods_zn_dnt_max1 ON 
${table};"""
+            sql """create materialized view ods_zn_dnt_max1 as
+            select ordernum,max(dnt) as dnt from ${table}
+            group by ordernum
+            ORDER BY ordernum;"""
+            connect(user = context.config.jdbcUser, password = 
context.config.jdbcPassword, url = context.config.jdbcUrl) {
+                sql """ set group_commit = async_mode; """
+                if (item == "nereids") {
+                    sql """ set enable_nereids_dml = true; """
+                    sql """ set enable_nereids_planner=true; """
+                    //sql """ set enable_fallback_to_original_planner=false; 
"""
+                } else {
+                    sql """ set enable_nereids_dml = false; """
+                }
+
+                // 1. insert into
+                int count = 0;
+                while (count < 30) {
+                    try {
+                        group_commit_insert """ 
+                insert into ${table} values('cib2205045_1_1s','2023/6/10 
3:55:33','{"DB1":168939,"DNT":"2023-06-10 03:55:33"}');""", 1
+                        break
+                    } catch (Exception e) {
+                        logger.info("got exception:" + e)
+                        if (e.getMessage().contains("is blocked on schema 
change")) {
+                            Thread.sleep(1000)
+                        }
+                        count++
+                    }
+                }
+                group_commit_insert """insert into ${table} 
values('cib2205045_1_1s','2023/6/10 3:56:33','{"DB1":168939,"DNT":"2023-06-10 
03:56:33"}');""", 1
+                group_commit_insert """insert into ${table} 
values('cib2205045_1_1s','2023/6/10 3:57:33','{"DB1":168939,"DNT":"2023-06-10 
03:57:33"}');""", 1
+                group_commit_insert """insert into ${table} 
values('cib2205045_1_1s','2023/6/10 3:58:33','{"DB1":168939,"DNT":"2023-06-10 
03:58:33"}');""", 1
+
+                getRowCount(4)
+
+                qt_order """select
+                '2023-06-10',
+                tmp.ordernum,
+                cast(nvl(if(tmp.p0-tmp1.p0>0,tmp.p0-tmp1.p0,tmp.p0-tmp.p1),0) 
as decimal(10,4)),
+                nvl(tmp.p0,0),
+                
cast(nvl(if(tmp.j0-tmp1.j0>0,tmp.j0-tmp1.j0,tmp.j0-tmp.j1)*277.78,0) as 
decimal(10,4)),
+                nvl(tmp.j0,0),
+                
cast(nvl(if(tmp.g0-tmp1.g0>0,tmp.g0-tmp1.g0,tmp.g0-tmp.g1)*277.78,0) as 
decimal(10,4)),
+                nvl(tmp.g0,0),
+                cast(nvl(tmp.solar,0) as decimal(20,4)),
+                'day'
+                from 
+                (
+                select
+                    ordernum,
+                    max(ljrl1) g0,min(ljrl1) g1,
+                    max(ljrl2) j0,min(ljrl2) j1,
+                    max(db1) p0,min(db1) p1,
+                    max(fzl)*1600*0.278 solar
+                from(
+                    select ordernum,dnt,
+                            cast(if(json_extract(data,'\$.LJRL1')=0 or 
json_extract(data,'\$.LJRL1') like '%E%',null,json_extract(data,'\$.LJRL1')) as 
double) ljrl1,
+                            cast(if(json_extract(data,'\$.LJRL2')=0 or 
json_extract(data,'\$.LJRL2') like '%E%',null,json_extract(data,'\$.LJRL2')) as 
double) ljrl2,
+                            first_value(cast(if(json_extract(data,'\$.FZL')=0 
or json_extract(data,'\$.FZL') like '%E%',null,
+                            json_extract(data,'\$.FZL')) as double)) over 
(partition by ordernum order by dnt desc) fzl,
+                            cast(if(json_extract(data,'\$.DB1')=0 or 
json_extract(data,'\$.DB1') like '%E%',null,json_extract(data,'\$.DB1')) as 
double) db1
+                    from ${table}
+                        )a1
+                group by ordernum
+                )tmp left join (
+                select
+                    ordernum,MAX(p0) p0,MAX(j0) j0,MAX(g0) g0
+                from ${table_tmp}
+                    group by ordernum
+                )tmp1
+                on tmp.ordernum=tmp1.ordernum;"""
+                qt_order2 """
+                SELECT  
+                row_number() over(partition by add_date order by pc_num desc)
+                ,row_number() over(partition by add_date order by vc_num desc)
+                ,row_number() over(partition by add_date order by vt_num desc)
+                FROM (
+                SELECT  
+                cast(dnt as datev2) add_date
+                ,row_number() over(order by dnt) pc_num
+                ,row_number() over(order by dnt) vc_num
+                ,row_number() over(order by dnt) vt_num
+                FROM ${table}
+                ) t;"""
+            }
+        } finally {
+        }
     }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to