This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 63f7704140a [fix](group commit) Fix the timeout to wait wal finished when schema change (#34021) 63f7704140a is described below commit 63f7704140a9133b716bc17bb47dcb6cb1bc229c Author: meiyi <myime...@gmail.com> AuthorDate: Tue Apr 23 21:38:45 2024 +0800 [fix](group commit) Fix the timeout to wait wal finished when schema change (#34021) --- be/src/runtime/group_commit_mgr.cpp | 27 +++++---- .../doris/alter/MaterializedViewHandler.java | 16 +++--- .../data/insert_p0/insert_group_commit_into.out | 32 +++++------ .../insert_p0/insert_group_commit_into.groovy | 66 ++++++++++++++-------- 4 files changed, 83 insertions(+), 58 deletions(-) diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 5ec20c10aef..62fbbd37979 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -74,11 +74,20 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state, } } } - if (_data_bytes >= _group_commit_data_bytes) { - VLOG_DEBUG << "group commit meets commit condition for data size, label=" << label - << ", instance_id=" << load_instance_id << ", data_bytes=" << _data_bytes; - _need_commit = true; - data_size_condition = true; + if (!_need_commit) { + if (_data_bytes >= _group_commit_data_bytes) { + VLOG_DEBUG << "group commit meets commit condition for data size, label=" << label + << ", instance_id=" << load_instance_id << ", data_bytes=" << _data_bytes; + _need_commit = true; + data_size_condition = true; + } + if (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - + _start_time) + .count() >= _group_commit_interval_ms) { + VLOG_DEBUG << "group commit meets commit condition for time interval, label=" << label + << ", instance_id=" << load_instance_id << ", data_bytes=" << _data_bytes; + _need_commit = true; + } } _get_cond.notify_all(); return Status::OK(); @@ -90,11 +99,9 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* *eos = false; std::unique_lock l(mutex); if (!_need_commit) { - auto left_milliseconds = - _group_commit_interval_ms - std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now() - _start_time) - .count(); - if (left_milliseconds <= 0) { + if (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - + _start_time) + .count() >= _group_commit_interval_ms) { _need_commit = true; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index b28fb4d9f7b..98b6f8e4e95 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -187,6 +187,9 @@ public class MaterializedViewHandler extends AlterHandler { */ public void processCreateMaterializedView(CreateMaterializedViewStmt addMVClause, Database db, OlapTable olapTable) throws DdlException, AnalysisException { + // wait wal delete + Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId()); + Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId()); olapTable.writeLockOrDdlException(); try { olapTable.checkNormalStateForAlter(); @@ -222,10 +225,6 @@ public class MaterializedViewHandler extends AlterHandler { olapTable.setState(OlapTableState.ROLLUP); - // wait wal delete - Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId()); - Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId()); - Env.getCurrentEnv().getEditLog().logAlterJob(rollupJobV2); LOG.info("finished to create materialized view job: {}", rollupJobV2.getJobId()); } finally { @@ -248,6 +247,11 @@ public class MaterializedViewHandler extends AlterHandler { public void processBatchAddRollup(String rawSql, List<AlterClause> alterClauses, Database db, OlapTable olapTable) throws DdlException, AnalysisException { checkReplicaCount(olapTable); + + // wait wal delete + Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId()); + Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId()); + Map<String, RollupJobV2> rollupNameJobMap = new LinkedHashMap<>(); // save job id for log Set<Long> logJobIdSet = new HashSet<>(); @@ -309,10 +313,6 @@ public class MaterializedViewHandler extends AlterHandler { // but this order is more reasonable olapTable.setState(OlapTableState.ROLLUP); - // wait wal delete - Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId()); - Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId()); - // 2 batch submit rollup job List<AlterJobV2> rollupJobV2List = new ArrayList<>(rollupNameJobMap.values()); batchAddAlterJobV2(rollupJobV2List); diff --git a/regression-test/data/insert_p0/insert_group_commit_into.out b/regression-test/data/insert_p0/insert_group_commit_into.out index 7c39453d473..8c0cc138c47 100644 --- a/regression-test/data/insert_p0/insert_group_commit_into.out +++ b/regression-test/data/insert_p0/insert_group_commit_into.out @@ -1,5 +1,5 @@ -- This file is automatically generated. You should know what you did if you want to edit this --- !sql -- +-- !select1 -- 1 a 10 2 b -1 3 c -1 @@ -7,7 +7,7 @@ 5 q 50 6 \N -1 --- !sql -- +-- !select2 -- 1 a 10 1 a 10 2 b -1 @@ -20,7 +20,7 @@ 6 \N -1 6 \N -1 --- !sql -- +-- !select3 -- 1 a \N 10 1 a \N 10 1 a \N 10 @@ -39,11 +39,11 @@ 6 \N \N -1 6 \N \N -1 --- !sql -- +-- !select4 -- 2 b \N -1 6 \N \N -1 --- !sql -- +-- !select5 -- 1 a 10 5 2 b -1 \N 2 b -1 \N @@ -53,7 +53,7 @@ 6 \N -1 \N 6 \N -1 \N --- !sql -- +-- !select6 -- 1 a 10 1 a 10 2 b -1 @@ -69,7 +69,7 @@ 6 \N -1 6 \N -1 --- !sql -- +-- !select7 -- \N -1 \N -1 \N -1 @@ -103,11 +103,11 @@ q 50 3 3 3 4 4 4 --- !sql -- +-- !select8 -- 1 test 2 or --- !sql -- +-- !select1 -- 1 a 10 2 b -1 3 c -1 @@ -115,7 +115,7 @@ q 50 5 q 50 6 \N -1 --- !sql -- +-- !select2 -- 1 a 10 1 a 10 2 b -1 @@ -128,7 +128,7 @@ q 50 6 \N -1 6 \N -1 --- !sql -- +-- !select3 -- 1 a \N 10 1 a \N 10 1 a \N 10 @@ -147,11 +147,11 @@ q 50 6 \N \N -1 6 \N \N -1 --- !sql -- +-- !select4 -- 2 b \N -1 6 \N \N -1 --- !sql -- +-- !select5 -- 1 a 10 5 2 b -1 \N 2 b -1 \N @@ -161,7 +161,7 @@ q 50 6 \N -1 \N 6 \N -1 \N --- !sql -- +-- !select6 -- 1 a 10 1 a 10 2 b -1 @@ -177,7 +177,7 @@ q 50 6 \N -1 6 \N -1 --- !sql -- +-- !select7 -- \N -1 \N -1 \N -1 @@ -211,7 +211,7 @@ q 50 3 3 3 4 4 4 --- !sql -- +-- !select8 -- 1 test 2 or diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy b/regression-test/suites/insert_p0/insert_group_commit_into.groovy index dc681feb3f2..d5f484c565d 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy @@ -41,7 +41,7 @@ suite("insert_group_commit_into") { sql "use ${dbName};" while (true) { sleep(2000) - def state = sql " show alter table column where tablename = '${tableName}' order by CreateTime desc " + def state = sql " show alter table column where tablename = '${tableName}' order by CreateTime desc limit 1" logger.info("alter table state: ${state}") if (state.size() > 0 && state[0][9] == "FINISHED") { return true @@ -69,6 +69,24 @@ suite("insert_group_commit_into") { return serverInfo } + def group_commit_insert_with_retry = { sql, expected_row_count -> + def retry = 0 + while (true){ + try { + return group_commit_insert(sql, expected_row_count) + } catch (Exception e) { + logger.warn("group_commit_insert failed, retry: " + retry + ", error: " + e.getMessage()) + retry++ + if (e.getMessage().contains("is blocked on schema change") && retry < 20) { + sleep(1500) + continue + } else { + throw e + } + } + } + } + def none_group_commit_insert = { sql, expected_row_count -> def stmt = prepareStatement """ ${sql} """ def result = stmt.executeUpdate() @@ -120,7 +138,7 @@ suite("insert_group_commit_into") { group_commit_insert """ insert into ${table}(id) select 6; """, 1 getRowCount(6) - qt_sql """ select * from ${table} order by id, name, score asc; """ + order_qt_select1 """ select * from ${table} order by id, name, score asc; """ // 2. insert into and delete sql """ delete from ${table} where id = 4; """ @@ -134,19 +152,19 @@ suite("insert_group_commit_into") { group_commit_insert """ insert into ${table}(id) select 6; """, 1 getRowCount(11) - qt_sql """ select * from ${table} order by id, name, score asc; """ + order_qt_select2 """ select * from ${table} order by id, name, score asc; """ // 3. insert into and light schema change: add column group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 group_commit_insert """ insert into ${table}(id) values(4); """, 1 group_commit_insert """ insert into ${table} values (1, 'a', 10),(5, 'q', 50); """, 2 sql """ alter table ${table} ADD column age int after name; """ - group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 - group_commit_insert """ insert into ${table}(id) select 6; """, 1 + group_commit_insert_with_retry """ insert into ${table}(id, name) values(2, 'b'); """, 1 + group_commit_insert_with_retry """ insert into ${table}(id) select 6; """, 1 assertTrue(getAlterTableState(), "add column should success") getRowCount(17) - qt_sql """ select * from ${table} order by id, name,score asc; """ + order_qt_select3 """ select * from ${table} order by id, name,score asc; """ // 4. insert into and truncate table /*sql """ insert into ${table}(name, id) values('c', 3); """ @@ -157,43 +175,43 @@ suite("insert_group_commit_into") { group_commit_insert """ insert into ${table}(id) select 6; """, 1 getRowCount(2) - qt_sql """ select * from ${table} order by id, name, score asc; """ + order_qt_select4 """ select * from ${table} order by id, name, score asc; """ // 5. insert into and schema change: modify column order group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 group_commit_insert """ insert into ${table}(id) values(4); """, 1 - group_commit_insert """ insert into ${table} values (1, 'a', 5, 10),(5, 'q', 6, 50); """, 2 - // sql """ alter table ${table} order by (id, name, score, age); """ - group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 - group_commit_insert """ insert into ${table}(id) select 6; """, 1 + group_commit_insert """ insert into ${table}(id, name, age, score) values (1, 'a', 5, 10),(5, 'q', 6, 50); """, 2 + sql """ alter table ${table} order by (id, name, score, age); """ + group_commit_insert_with_retry """ insert into ${table}(id, name) values(2, 'b'); """, 1 + group_commit_insert_with_retry """ insert into ${table}(id) select 6; """, 1 - // assertTrue(getAlterTableState(), "modify column order should success") + assertTrue(getAlterTableState(), "modify column order should success") getRowCount(8) - qt_sql """ select id, name, score, age from ${table} order by id, name, score asc; """ + order_qt_select5 """ select id, name, score, age from ${table} order by id, name, score asc; """ // 6. insert into and light schema change: drop column group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 group_commit_insert """ insert into ${table}(id) values(4); """, 1 - group_commit_insert """ insert into ${table} values (1, 'a', 5, 10),(5, 'q', 6, 50); """, 2 + group_commit_insert """ insert into ${table}(id, name, age, score) values (1, 'a', 5, 10),(5, 'q', 6, 50); """, 2 sql """ alter table ${table} DROP column age; """ - group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 - group_commit_insert """ insert into ${table}(id) select 6; """, 1 + group_commit_insert_with_retry """ insert into ${table}(id, name) values(2, 'b'); """, 1 + group_commit_insert_with_retry """ insert into ${table}(id) select 6; """, 1 assertTrue(getAlterTableState(), "drop column should success") getRowCount(14) - qt_sql """ select * from ${table} order by id, name, score asc; """ + order_qt_select6 """ select * from ${table} order by id, name, score asc; """ // 7. insert into and add rollup group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 group_commit_insert """ insert into ${table}(id) values(4); """, 1 group_commit_insert """ insert into ${table} values (1, 'a', 10),(5, 'q', 50),(101, 'a', 100); """, 2 - // sql """ alter table ${table} ADD ROLLUP r1(name, score); """ - group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1 - group_commit_insert """ insert into ${table}(id) select 6; """, 1 + sql """ alter table ${table} ADD ROLLUP r1(name, score); """ + group_commit_insert_with_retry """ insert into ${table}(id, name) values(2, 'b'); """, 1 + group_commit_insert_with_retry """ insert into ${table}(id) select 6; """, 1 getRowCount(20) - qt_sql """ select name, score from ${table} order by name asc; """ - + order_qt_select7 """ select name, score from ${table} order by name asc; """ + assertTrue(getAlterTableState(), "add rollup should success") /*if (item == "nereids") { group_commit_insert """ insert into ${table}(id, name, score) values(10 + 1, 'h', 100); """, 1 @@ -208,7 +226,7 @@ suite("insert_group_commit_into") { def rowCount = sql "select count(*) from ${table}" logger.info("row count: " + rowCount) - assertEquals(rowCount[0][0], 23) + assertEquals(23, rowCount[0][0]) } } finally { // try_sql("DROP TABLE ${table}") @@ -476,7 +494,7 @@ suite("insert_group_commit_into") { group_commit_insert """ insert into ${table} values(1, 'test'); """, 1 group_commit_insert """ insert into ${table}(k1,`or`) values (2,"or"); """, 1 getRowCount(2) - qt_sql """ select * from ${table}; """ + order_qt_select8 """ select * from ${table}; """ } } finally { } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org