This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 63f7704140a [fix](group commit) Fix the timeout to wait wal finished 
when schema change (#34021)
63f7704140a is described below

commit 63f7704140a9133b716bc17bb47dcb6cb1bc229c
Author: meiyi <myime...@gmail.com>
AuthorDate: Tue Apr 23 21:38:45 2024 +0800

    [fix](group commit) Fix the timeout to wait wal finished when schema change 
(#34021)
---
 be/src/runtime/group_commit_mgr.cpp                | 27 +++++----
 .../doris/alter/MaterializedViewHandler.java       | 16 +++---
 .../data/insert_p0/insert_group_commit_into.out    | 32 +++++------
 .../insert_p0/insert_group_commit_into.groovy      | 66 ++++++++++++++--------
 4 files changed, 83 insertions(+), 58 deletions(-)

diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 5ec20c10aef..62fbbd37979 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -74,11 +74,20 @@ Status LoadBlockQueue::add_block(RuntimeState* 
runtime_state,
             }
         }
     }
-    if (_data_bytes >= _group_commit_data_bytes) {
-        VLOG_DEBUG << "group commit meets commit condition for data size, 
label=" << label
-                   << ", instance_id=" << load_instance_id << ", data_bytes=" 
<< _data_bytes;
-        _need_commit = true;
-        data_size_condition = true;
+    if (!_need_commit) {
+        if (_data_bytes >= _group_commit_data_bytes) {
+            VLOG_DEBUG << "group commit meets commit condition for data size, 
label=" << label
+                       << ", instance_id=" << load_instance_id << ", 
data_bytes=" << _data_bytes;
+            _need_commit = true;
+            data_size_condition = true;
+        }
+        if 
(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 -
+                                                                  _start_time)
+                    .count() >= _group_commit_interval_ms) {
+            VLOG_DEBUG << "group commit meets commit condition for time 
interval, label=" << label
+                       << ", instance_id=" << load_instance_id << ", 
data_bytes=" << _data_bytes;
+            _need_commit = true;
+        }
     }
     _get_cond.notify_all();
     return Status::OK();
@@ -90,11 +99,9 @@ Status LoadBlockQueue::get_block(RuntimeState* 
runtime_state, vectorized::Block*
     *eos = false;
     std::unique_lock l(mutex);
     if (!_need_commit) {
-        auto left_milliseconds =
-                _group_commit_interval_ms - 
std::chrono::duration_cast<std::chrono::milliseconds>(
-                                                    
std::chrono::steady_clock::now() - _start_time)
-                                                    .count();
-        if (left_milliseconds <= 0) {
+        if 
(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 -
+                                                                  _start_time)
+                    .count() >= _group_commit_interval_ms) {
             _need_commit = true;
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
index b28fb4d9f7b..98b6f8e4e95 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
@@ -187,6 +187,9 @@ public class MaterializedViewHandler extends AlterHandler {
      */
     public void processCreateMaterializedView(CreateMaterializedViewStmt 
addMVClause, Database db, OlapTable olapTable)
             throws DdlException, AnalysisException {
+        // wait wal delete
+        
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
+        
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
         olapTable.writeLockOrDdlException();
         try {
             olapTable.checkNormalStateForAlter();
@@ -222,10 +225,6 @@ public class MaterializedViewHandler extends AlterHandler {
 
             olapTable.setState(OlapTableState.ROLLUP);
 
-            // wait wal delete
-            
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
-            
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
-
             Env.getCurrentEnv().getEditLog().logAlterJob(rollupJobV2);
             LOG.info("finished to create materialized view job: {}", 
rollupJobV2.getJobId());
         } finally {
@@ -248,6 +247,11 @@ public class MaterializedViewHandler extends AlterHandler {
     public void processBatchAddRollup(String rawSql, List<AlterClause> 
alterClauses, Database db, OlapTable olapTable)
             throws DdlException, AnalysisException {
         checkReplicaCount(olapTable);
+
+        // wait wal delete
+        
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
+        
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
+
         Map<String, RollupJobV2> rollupNameJobMap = new LinkedHashMap<>();
         // save job id for log
         Set<Long> logJobIdSet = new HashSet<>();
@@ -309,10 +313,6 @@ public class MaterializedViewHandler extends AlterHandler {
             // but this order is more reasonable
             olapTable.setState(OlapTableState.ROLLUP);
 
-            // wait wal delete
-            
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
-            
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
-
             // 2 batch submit rollup job
             List<AlterJobV2> rollupJobV2List = new 
ArrayList<>(rollupNameJobMap.values());
             batchAddAlterJobV2(rollupJobV2List);
diff --git a/regression-test/data/insert_p0/insert_group_commit_into.out 
b/regression-test/data/insert_p0/insert_group_commit_into.out
index 7c39453d473..8c0cc138c47 100644
--- a/regression-test/data/insert_p0/insert_group_commit_into.out
+++ b/regression-test/data/insert_p0/insert_group_commit_into.out
@@ -1,5 +1,5 @@
 -- This file is automatically generated. You should know what you did if you 
want to edit this
--- !sql --
+-- !select1 --
 1      a       10
 2      b       -1
 3      c       -1
@@ -7,7 +7,7 @@
 5      q       50
 6      \N      -1
 
--- !sql --
+-- !select2 --
 1      a       10
 1      a       10
 2      b       -1
@@ -20,7 +20,7 @@
 6      \N      -1
 6      \N      -1
 
--- !sql --
+-- !select3 --
 1      a       \N      10
 1      a       \N      10
 1      a       \N      10
@@ -39,11 +39,11 @@
 6      \N      \N      -1
 6      \N      \N      -1
 
--- !sql --
+-- !select4 --
 2      b       \N      -1
 6      \N      \N      -1
 
--- !sql --
+-- !select5 --
 1      a       10      5
 2      b       -1      \N
 2      b       -1      \N
@@ -53,7 +53,7 @@
 6      \N      -1      \N
 6      \N      -1      \N
 
--- !sql --
+-- !select6 --
 1      a       10
 1      a       10
 2      b       -1
@@ -69,7 +69,7 @@
 6      \N      -1
 6      \N      -1
 
--- !sql --
+-- !select7 --
 \N     -1
 \N     -1
 \N     -1
@@ -103,11 +103,11 @@ q 50
 3      3       3
 4      4       4
 
--- !sql --
+-- !select8 --
 1      test
 2      or
 
--- !sql --
+-- !select1 --
 1      a       10
 2      b       -1
 3      c       -1
@@ -115,7 +115,7 @@ q   50
 5      q       50
 6      \N      -1
 
--- !sql --
+-- !select2 --
 1      a       10
 1      a       10
 2      b       -1
@@ -128,7 +128,7 @@ q   50
 6      \N      -1
 6      \N      -1
 
--- !sql --
+-- !select3 --
 1      a       \N      10
 1      a       \N      10
 1      a       \N      10
@@ -147,11 +147,11 @@ q 50
 6      \N      \N      -1
 6      \N      \N      -1
 
--- !sql --
+-- !select4 --
 2      b       \N      -1
 6      \N      \N      -1
 
--- !sql --
+-- !select5 --
 1      a       10      5
 2      b       -1      \N
 2      b       -1      \N
@@ -161,7 +161,7 @@ q   50
 6      \N      -1      \N
 6      \N      -1      \N
 
--- !sql --
+-- !select6 --
 1      a       10
 1      a       10
 2      b       -1
@@ -177,7 +177,7 @@ q   50
 6      \N      -1
 6      \N      -1
 
--- !sql --
+-- !select7 --
 \N     -1
 \N     -1
 \N     -1
@@ -211,7 +211,7 @@ q   50
 3      3       3
 4      4       4
 
--- !sql --
+-- !select8 --
 1      test
 2      or
 
diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy 
b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
index dc681feb3f2..d5f484c565d 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
@@ -41,7 +41,7 @@ suite("insert_group_commit_into") {
         sql "use ${dbName};"
         while (true) {
             sleep(2000)
-            def state = sql " show alter table column where tablename = 
'${tableName}' order by CreateTime desc "
+            def state = sql " show alter table column where tablename = 
'${tableName}' order by CreateTime desc limit 1"
             logger.info("alter table state: ${state}")
             if (state.size() > 0 && state[0][9] == "FINISHED") {
                 return true
@@ -69,6 +69,24 @@ suite("insert_group_commit_into") {
         return serverInfo
     }
 
+    def group_commit_insert_with_retry = { sql, expected_row_count ->
+        def retry = 0
+        while (true){
+            try {
+                return group_commit_insert(sql, expected_row_count)
+            } catch (Exception e) {
+                logger.warn("group_commit_insert failed, retry: " + retry + ", 
error: " + e.getMessage())
+                retry++
+                if (e.getMessage().contains("is blocked on schema change") && 
retry < 20) {
+                    sleep(1500)
+                    continue
+                } else {
+                    throw e
+                }
+            }
+        }
+    }
+
     def none_group_commit_insert = { sql, expected_row_count ->
         def stmt = prepareStatement """ ${sql}  """
         def result = stmt.executeUpdate()
@@ -120,7 +138,7 @@ suite("insert_group_commit_into") {
                 group_commit_insert """ insert into ${table}(id) select 6; 
""", 1
 
                 getRowCount(6)
-                qt_sql """ select * from ${table} order by id, name, score 
asc; """
+                order_qt_select1 """ select * from ${table} order by id, name, 
score asc; """
 
                 // 2. insert into and delete
                 sql """ delete from ${table} where id = 4; """
@@ -134,19 +152,19 @@ suite("insert_group_commit_into") {
                 group_commit_insert """ insert into ${table}(id) select 6; 
""", 1
 
                 getRowCount(11)
-                qt_sql """ select * from ${table} order by id, name, score 
asc; """
+                order_qt_select2 """ select * from ${table} order by id, name, 
score asc; """
 
                 // 3. insert into and light schema change: add column
                 group_commit_insert """ insert into ${table}(name, id) 
values('c', 3);  """, 1
                 group_commit_insert """ insert into ${table}(id) values(4);  
""", 1
                 group_commit_insert """ insert into ${table} values (1, 'a', 
10),(5, 'q', 50);  """, 2
                 sql """ alter table ${table} ADD column age int after name; """
-                group_commit_insert """ insert into ${table}(id, name) 
values(2, 'b');  """, 1
-                group_commit_insert """ insert into ${table}(id) select 6; 
""", 1
+                group_commit_insert_with_retry """ insert into ${table}(id, 
name) values(2, 'b');  """, 1
+                group_commit_insert_with_retry """ insert into ${table}(id) 
select 6; """, 1
 
                 assertTrue(getAlterTableState(), "add column should success")
                 getRowCount(17)
-                qt_sql """ select * from ${table} order by id, name,score asc; 
"""
+                order_qt_select3 """ select * from ${table} order by id, 
name,score asc; """
 
                 // 4. insert into and truncate table
                 /*sql """ insert into ${table}(name, id) values('c', 3);  """
@@ -157,43 +175,43 @@ suite("insert_group_commit_into") {
                 group_commit_insert """ insert into ${table}(id) select 6; 
""", 1
 
                 getRowCount(2)
-                qt_sql """ select * from ${table} order by id, name, score 
asc; """
+                order_qt_select4 """ select * from ${table} order by id, name, 
score asc; """
 
                 // 5. insert into and schema change: modify column order
                 group_commit_insert """ insert into ${table}(name, id) 
values('c', 3);  """, 1
                 group_commit_insert """ insert into ${table}(id) values(4);  
""", 1
-                group_commit_insert """ insert into ${table} values (1, 'a', 
5, 10),(5, 'q', 6, 50);  """, 2
-                // sql """ alter table ${table} order by (id, name, score, 
age); """
-                group_commit_insert """ insert into ${table}(id, name) 
values(2, 'b');  """, 1
-                group_commit_insert """ insert into ${table}(id) select 6; 
""", 1
+                group_commit_insert """ insert into ${table}(id, name, age, 
score) values (1, 'a', 5, 10),(5, 'q', 6, 50);  """, 2
+                sql """ alter table ${table} order by (id, name, score, age); 
"""
+                group_commit_insert_with_retry """ insert into ${table}(id, 
name) values(2, 'b');  """, 1
+                group_commit_insert_with_retry """ insert into ${table}(id) 
select 6; """, 1
 
-                // assertTrue(getAlterTableState(), "modify column order 
should success")
+                assertTrue(getAlterTableState(), "modify column order should 
success")
                 getRowCount(8)
-                qt_sql """ select id, name, score, age from ${table} order by 
id, name, score asc; """
+                order_qt_select5 """ select id, name, score, age from ${table} 
order by id, name, score asc; """
 
                 // 6. insert into and light schema change: drop column
                 group_commit_insert """ insert into ${table}(name, id) 
values('c', 3);  """, 1
                 group_commit_insert """ insert into ${table}(id) values(4);  
""", 1
-                group_commit_insert """ insert into ${table} values (1, 'a', 
5, 10),(5, 'q', 6, 50);  """, 2
+                group_commit_insert """ insert into ${table}(id, name, age, 
score) values (1, 'a', 5, 10),(5, 'q', 6, 50);  """, 2
                 sql """ alter table ${table} DROP column age; """
-                group_commit_insert """ insert into ${table}(id, name) 
values(2, 'b');  """, 1
-                group_commit_insert """ insert into ${table}(id) select 6; 
""", 1
+                group_commit_insert_with_retry """ insert into ${table}(id, 
name) values(2, 'b');  """, 1
+                group_commit_insert_with_retry """ insert into ${table}(id) 
select 6; """, 1
 
                 assertTrue(getAlterTableState(), "drop column should success")
                 getRowCount(14)
-                qt_sql """ select * from ${table} order by id, name, score 
asc; """
+                order_qt_select6 """ select * from ${table} order by id, name, 
score asc; """
 
                 // 7. insert into and add rollup
                 group_commit_insert """ insert into ${table}(name, id) 
values('c', 3);  """, 1
                 group_commit_insert """ insert into ${table}(id) values(4);  
""", 1
                 group_commit_insert """ insert into ${table} values (1, 'a', 
10),(5, 'q', 50),(101, 'a', 100);  """, 2
-                // sql """ alter table ${table} ADD ROLLUP r1(name, score); """
-                group_commit_insert """ insert into ${table}(id, name) 
values(2, 'b');  """, 1
-                group_commit_insert """ insert into ${table}(id) select 6; 
""", 1
+                sql """ alter table ${table} ADD ROLLUP r1(name, score); """
+                group_commit_insert_with_retry """ insert into ${table}(id, 
name) values(2, 'b');  """, 1
+                group_commit_insert_with_retry """ insert into ${table}(id) 
select 6; """, 1
 
                 getRowCount(20)
-                qt_sql """ select name, score from ${table} order by name asc; 
"""
-
+                order_qt_select7 """ select name, score from ${table} order by 
name asc; """
+                assertTrue(getAlterTableState(), "add rollup should success")
 
                 /*if (item == "nereids") {
                     group_commit_insert """ insert into ${table}(id, name, 
score) values(10 + 1, 'h', 100);  """, 1
@@ -208,7 +226,7 @@ suite("insert_group_commit_into") {
 
                 def rowCount = sql "select count(*) from ${table}"
                 logger.info("row count: " + rowCount)
-                assertEquals(rowCount[0][0], 23)
+                assertEquals(23, rowCount[0][0])
             }
         } finally {
             // try_sql("DROP TABLE ${table}")
@@ -476,7 +494,7 @@ suite("insert_group_commit_into") {
                 group_commit_insert """ insert into ${table} values(1, 
'test'); """, 1
                 group_commit_insert """ insert into ${table}(k1,`or`) values 
(2,"or"); """, 1
                 getRowCount(2)
-                qt_sql """ select * from ${table}; """
+                order_qt_select8 """ select * from ${table}; """
             }
         } finally {
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to