This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 1cfe5237410 [fix](group_commit) Need to wait wal to be deleted when creating MaterializedView (#30956) 1cfe5237410 is described below commit 1cfe5237410e18684ddb4cea601be5ac0590ab79 Author: huanghaibin <284824...@qq.com> AuthorDate: Wed Feb 21 08:58:31 2024 +0800 [fix](group_commit) Need to wait wal to be deleted when creating MaterializedView (#30956) --- be/src/olap/wal/wal_manager.cpp | 2 +- .../doris/alter/MaterializedViewHandler.java | 10 ++ .../org/apache/doris/alter/SchemaChangeJobV2.java | 33 +----- .../org/apache/doris/load/GroupCommitManager.java | 28 +++++ .../data/insert_p0/insert_group_commit_into.out | 18 +++ .../insert_p0/insert_group_commit_into.groovy | 126 +++++++++++++++++++++ 6 files changed, 186 insertions(+), 31 deletions(-) diff --git a/be/src/olap/wal/wal_manager.cpp b/be/src/olap/wal/wal_manager.cpp index 06e1bd37d72..10a979f89dd 100644 --- a/be/src/olap/wal/wal_manager.cpp +++ b/be/src/olap/wal/wal_manager.cpp @@ -289,6 +289,7 @@ Status WalManager::_load_wals() { continue; } } + _exec_env->wal_mgr()->add_wal_queue(wal.tb_id, wal.wal_id); WARN_IF_ERROR(add_recover_wal(wal.db_id, wal.tb_id, wal.wal_id, wal.wal_path), fmt::format("Failed to add recover wal={}", wal.wal_path)); } @@ -406,7 +407,6 @@ Status WalManager::_replay_background() { Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id, std::string wal) { - add_wal_queue(table_id, wal_id); std::lock_guard<std::shared_mutex> wrlock(_table_lock); std::shared_ptr<WalTable> table_ptr; auto it = _table_map.find(table_id); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index aa00989cfa4..1dfe7759ec8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -219,6 +219,10 @@ public class MaterializedViewHandler extends AlterHandler { olapTable.setState(OlapTableState.ROLLUP); + // wait wal delete + Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId()); + Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId()); + Env.getCurrentEnv().getEditLog().logAlterJob(rollupJobV2); LOG.info("finished to create materialized view job: {}", rollupJobV2.getJobId()); } finally { @@ -301,6 +305,11 @@ public class MaterializedViewHandler extends AlterHandler { // ATTN: This order is not mandatory, because database lock will protect us, // but this order is more reasonable olapTable.setState(OlapTableState.ROLLUP); + + // wait wal delete + Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId()); + Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId()); + // 2 batch submit rollup job List<AlterJobV2> rollupJobV2List = new ArrayList<>(rollupNameJobMap.values()); batchAddAlterJobV2(rollupJobV2List); @@ -1170,6 +1179,7 @@ public class MaterializedViewHandler extends AlterHandler { private void onJobDone(AlterJobV2 alterJob) { removeJobFromRunningQueue(alterJob); if (removeAlterJobV2FromTableNotFinalStateJobMap(alterJob)) { + Env.getCurrentEnv().getGroupCommitManager().unblockTable(alterJob.getTableId()); changeTableStatus(alterJob.getDbId(), alterJob.getTableId(), OlapTableState.NORMAL); LOG.info("set table's state to NORMAL, table id: {}, job id: {}", alterJob.getTableId(), alterJob.getJobId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 2efbee47393..70787936105 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -40,7 +40,6 @@ import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.MetaNotFoundException; @@ -536,7 +535,9 @@ public class SchemaChangeJobV2 extends AlterJobV2 { } return; } - waitWalFinished(); + Env.getCurrentEnv().getGroupCommitManager().blockTable(tableId); + Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(tableId); + Env.getCurrentEnv().getGroupCommitManager().unblockTable(tableId); /* * all tasks are finished. check the integrity. * we just check whether all new replicas are healthy. @@ -599,34 +600,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 { LOG.info("set table's state to NORMAL, table id: {}, job id: {}", tableId, jobId); } - private void waitWalFinished() { - // wait wal done here - Env.getCurrentEnv().getGroupCommitManager().blockTable(tableId); - LOG.info("block group commit for table={} when schema change", tableId); - List<Long> aliveBeIds = Env.getCurrentSystemInfo().getAllBackendIds(true); - long expireTime = System.currentTimeMillis() + Config.check_wal_queue_timeout_threshold; - while (true) { - LOG.info("wait for wal queue size to be empty"); - boolean walFinished = Env.getCurrentEnv().getGroupCommitManager() - .isPreviousWalFinished(tableId, aliveBeIds); - if (walFinished) { - LOG.info("all wal is finished for table={}", tableId); - break; - } else if (System.currentTimeMillis() > expireTime) { - LOG.warn("waitWalFinished time out for table={}", tableId); - break; - } else { - try { - Thread.sleep(100); - } catch (InterruptedException ie) { - LOG.warn("failed to wait for wal for table={} when schema change", tableId, ie); - } - } - } - Env.getCurrentEnv().getGroupCommitManager().unblockTable(tableId); - LOG.info("unblock group commit for table={} when schema change", tableId); - } - private void onFinished(OlapTable tbl) { // replace the origin index with shadow index, set index state as NORMAL for (Partition partition : tbl.getPartitions()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java index 12410945e9f..c4bf1e03c9c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java @@ -45,11 +45,39 @@ public class GroupCommitManager { } public void blockTable(long tableId) { + LOG.info("block group commit for table={} when schema change", tableId); blockedTableIds.add(tableId); } public void unblockTable(long tableId) { blockedTableIds.remove(tableId); + LOG.info("unblock group commit for table={} when schema change", tableId); + } + + /** + * Waiting All WAL files to be deleted. + */ + public void waitWalFinished(long tableId) { + List<Long> aliveBeIds = Env.getCurrentSystemInfo().getAllBackendIds(true); + long expireTime = System.currentTimeMillis() + Config.check_wal_queue_timeout_threshold; + while (true) { + LOG.info("wait for wal queue size to be empty"); + boolean walFinished = Env.getCurrentEnv().getGroupCommitManager() + .isPreviousWalFinished(tableId, aliveBeIds); + if (walFinished) { + LOG.info("all wal is finished for table={}", tableId); + break; + } else if (System.currentTimeMillis() > expireTime) { + LOG.warn("waitWalFinished time out for table={}", tableId); + break; + } else { + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.warn("failed to wait for wal for table={} when schema change", tableId, ie); + } + } + } } /** diff --git a/regression-test/data/insert_p0/insert_group_commit_into.out b/regression-test/data/insert_p0/insert_group_commit_into.out index dfb3a3b41c0..71a1473e1e6 100644 --- a/regression-test/data/insert_p0/insert_group_commit_into.out +++ b/regression-test/data/insert_p0/insert_group_commit_into.out @@ -94,6 +94,15 @@ q 50 -- !sql -- 0 service_46da0dab-e27d-4820-aea2-9bfc15741615 1697032066304 0 3229b7cd-f3a2-4359-aa24-946388c9cc54 0 CgEwEiQzMjI5YjdjZC1mM2EyLTQzNTktYWEyNC05NDYzODhjOWNjNTQaggQY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVm [...] +-- !order -- +2023-06-10 cib2205045_1_1s 0.0000 168939.0 0.0000 0.0 0.0000 0.0 0.0000 day + +-- !order2 -- +1 1 1 +2 2 2 +3 3 3 +4 4 4 + -- !sql -- 1 a 10 2 b -1 @@ -189,3 +198,12 @@ q 50 -- !sql -- 0 service_46da0dab-e27d-4820-aea2-9bfc15741615 1697032066304 0 3229b7cd-f3a2-4359-aa24-946388c9cc54 0 CgEwEiQzMjI5YjdjZC1mM2EyLTQzNTktYWEyNC05NDYzODhjOWNjNTQaggQY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVm [...] +-- !order -- +2023-06-10 cib2205045_1_1s 0.0000 168939.0 0.0000 0.0 0.0000 0.0 0.0000 day + +-- !order2 -- +1 1 1 +2 2 2 +3 3 3 +4 4 4 + diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy b/regression-test/suites/insert_p0/insert_group_commit_into.groovy index 391e106aa2f..b7c6cc4e3c4 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy @@ -318,5 +318,131 @@ suite("insert_group_commit_into") { } finally { // try_sql("DROP TABLE ${table}") } + + // table with MaterializedView + tableName = "insert_group_commit_into_mv" + table = dbName + "." + tableName + def table_tmp = dbName + ".test_table_tmp" + try { + // create table + sql """ drop table if exists ${table}; """ + sql """CREATE table ${table} ( + `ordernum` varchar(65533) NOT NULL , + `dnt` datetime NOT NULL , + `data` json NULL + ) ENGINE=OLAP + DUPLICATE KEY(`ordernum`, `dnt`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`ordernum`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + );""" + sql """drop table if exists ${table_tmp};""" + sql """CREATE TABLE ${table_tmp} ( + `dnt` varchar(200) NULL, + `ordernum` varchar(200) NULL, + `type` varchar(20) NULL, + `powers` double SUM NULL, + `p0` double REPLACE NULL, + `heatj` double SUM NULL, + `j0` double REPLACE NULL, + `heatg` double SUM NULL, + `g0` double REPLACE NULL, + `solar` double SUM NULL + ) ENGINE=OLAP + AGGREGATE KEY(`dnt`, `ordernum`, `type`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`ordernum`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); """ + sql """DROP MATERIALIZED VIEW IF EXISTS ods_zn_dnt_max1 ON ${table};""" + sql """create materialized view ods_zn_dnt_max1 as + select ordernum,max(dnt) as dnt from ${table} + group by ordernum + ORDER BY ordernum;""" + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { + sql """ set group_commit = async_mode; """ + if (item == "nereids") { + sql """ set enable_nereids_dml = true; """ + sql """ set enable_nereids_planner=true; """ + //sql """ set enable_fallback_to_original_planner=false; """ + } else { + sql """ set enable_nereids_dml = false; """ + } + + // 1. insert into + int count = 0; + while (count < 30) { + try { + group_commit_insert """ + insert into ${table} values('cib2205045_1_1s','2023/6/10 3:55:33','{"DB1":168939,"DNT":"2023-06-10 03:55:33"}');""", 1 + break + } catch (Exception e) { + logger.info("got exception:" + e) + if (e.getMessage().contains("is blocked on schema change")) { + Thread.sleep(1000) + } + count++ + } + } + group_commit_insert """insert into ${table} values('cib2205045_1_1s','2023/6/10 3:56:33','{"DB1":168939,"DNT":"2023-06-10 03:56:33"}');""", 1 + group_commit_insert """insert into ${table} values('cib2205045_1_1s','2023/6/10 3:57:33','{"DB1":168939,"DNT":"2023-06-10 03:57:33"}');""", 1 + group_commit_insert """insert into ${table} values('cib2205045_1_1s','2023/6/10 3:58:33','{"DB1":168939,"DNT":"2023-06-10 03:58:33"}');""", 1 + + getRowCount(4) + + qt_order """select + '2023-06-10', + tmp.ordernum, + cast(nvl(if(tmp.p0-tmp1.p0>0,tmp.p0-tmp1.p0,tmp.p0-tmp.p1),0) as decimal(10,4)), + nvl(tmp.p0,0), + cast(nvl(if(tmp.j0-tmp1.j0>0,tmp.j0-tmp1.j0,tmp.j0-tmp.j1)*277.78,0) as decimal(10,4)), + nvl(tmp.j0,0), + cast(nvl(if(tmp.g0-tmp1.g0>0,tmp.g0-tmp1.g0,tmp.g0-tmp.g1)*277.78,0) as decimal(10,4)), + nvl(tmp.g0,0), + cast(nvl(tmp.solar,0) as decimal(20,4)), + 'day' + from + ( + select + ordernum, + max(ljrl1) g0,min(ljrl1) g1, + max(ljrl2) j0,min(ljrl2) j1, + max(db1) p0,min(db1) p1, + max(fzl)*1600*0.278 solar + from( + select ordernum,dnt, + cast(if(json_extract(data,'\$.LJRL1')=0 or json_extract(data,'\$.LJRL1') like '%E%',null,json_extract(data,'\$.LJRL1')) as double) ljrl1, + cast(if(json_extract(data,'\$.LJRL2')=0 or json_extract(data,'\$.LJRL2') like '%E%',null,json_extract(data,'\$.LJRL2')) as double) ljrl2, + first_value(cast(if(json_extract(data,'\$.FZL')=0 or json_extract(data,'\$.FZL') like '%E%',null, + json_extract(data,'\$.FZL')) as double)) over (partition by ordernum order by dnt desc) fzl, + cast(if(json_extract(data,'\$.DB1')=0 or json_extract(data,'\$.DB1') like '%E%',null,json_extract(data,'\$.DB1')) as double) db1 + from ${table} + )a1 + group by ordernum + )tmp left join ( + select + ordernum,MAX(p0) p0,MAX(j0) j0,MAX(g0) g0 + from ${table_tmp} + group by ordernum + )tmp1 + on tmp.ordernum=tmp1.ordernum;""" + qt_order2 """ + SELECT + row_number() over(partition by add_date order by pc_num desc) + ,row_number() over(partition by add_date order by vc_num desc) + ,row_number() over(partition by add_date order by vt_num desc) + FROM ( + SELECT + cast(dnt as datev2) add_date + ,row_number() over(order by dnt) pc_num + ,row_number() over(order by dnt) vc_num + ,row_number() over(order by dnt) vt_num + FROM ${table} + ) t;""" + } + } finally { + } } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org