This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 22549565a54 [Improve](mtmv) skip the generation of invalid task for refresh mtmv (#46280) 22549565a54 is described below commit 22549565a54a705fd8bf06a63977a3fb2dd8b8fa Author: shee <13843187+qz...@users.noreply.github.com> AuthorDate: Fri Jan 3 19:48:52 2025 +0800 [Improve](mtmv) skip the generation of invalid task for refresh mtmv (#46280) ### What problem does this PR solve? We specified the `excluded_trigger_tables = 'a'` attribute when creating the materialized view. If table `a` is updated frequently, many invalid tasks will be generated, and these tasks do not really refresh the mv, which is unreasonable, too many invalid tasks will wash away useful task information Co-authored-by: garenshi <garen...@tencent.com> --- .../java/org/apache/doris/mtmv/MTMVService.java | 11 +++- regression-test/data/mtmv_p0/test_commit_mtmv.out | 20 ++++++ .../suites/mtmv_p0/test_commit_mtmv.groovy | 72 ++++++++++++++++++++++ 3 files changed, 102 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java index 278811d3a99..26c6bfb10e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java @@ -198,7 +198,7 @@ public class MTMVService implements EventListener { try { // check if mtmv should trigger by event MTMV mtmv = (MTMV) MTMVUtil.getTable(baseTableInfo); - if (mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger().equals(RefreshTrigger.COMMIT)) { + if (canRefresh(mtmv, table)) { jobManager.onCommit(mtmv); } } catch (Exception e) { @@ -206,4 +206,13 @@ public class MTMVService implements EventListener { } } } + + private boolean canRefresh(MTMV mtmv, TableIf table) { + if (mtmv.getExcludedTriggerTables().contains(table.getName())) { + LOG.info("skip refresh mtmv: {}, because exclude trigger table: {}", + mtmv.getName(), table.getName()); + return false; + } + return mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger().equals(RefreshTrigger.COMMIT); + } } diff --git a/regression-test/data/mtmv_p0/test_commit_mtmv.out b/regression-test/data/mtmv_p0/test_commit_mtmv.out index 208638b4c10..433d55ef4b8 100644 --- a/regression-test/data/mtmv_p0/test_commit_mtmv.out +++ b/regression-test/data/mtmv_p0/test_commit_mtmv.out @@ -61,3 +61,23 @@ -- !mv1_replace -- 3 2017-03-15 3 +-- !mv_sag -- +1 1 60 + +-- !task_sag -- +{"triggerMode":"COMMIT","partitions":[],"isComplete":false} + +-- !mv_sag1 -- +1 1 60 + +-- !task_sag1 -- +{"triggerMode":"COMMIT","partitions":[],"isComplete":false} + +-- !mv_sag2 -- +1 1 60 +1 2 70 +2 1 70 + +-- !task_sag2 -- +{"triggerMode":"COMMIT","partitions":[],"isComplete":false} + diff --git a/regression-test/suites/mtmv_p0/test_commit_mtmv.groovy b/regression-test/suites/mtmv_p0/test_commit_mtmv.groovy index d8161a3fc92..bb4c3f8f7ce 100644 --- a/regression-test/suites/mtmv_p0/test_commit_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_commit_mtmv.groovy @@ -149,4 +149,76 @@ suite("test_commit_mtmv") { sql """drop materialized view if exists ${mvName2};""" sql """drop table if exists `${tableName}`""" + //===========test excluded_trigger_tables=========== + def tblStu = "test_commit_mtmv_tbl_stu" + def tblGrade = "test_commit_mtmv_tbl_grade" + def mvSag = "test_commit_mv_sag" + sql """drop materialized view if exists ${mvSag};""" + sql """drop table if exists `${tblStu}`""" + sql """drop table if exists `${tblGrade}`""" + sql """ + CREATE TABLE `${tblStu}` ( + `sid` int(32) NULL, + `sname` varchar(32) NULL, + ) ENGINE=OLAP + DUPLICATE KEY(`sid`) + DISTRIBUTED BY HASH(`sid`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql """ + CREATE TABLE `${tblGrade}` ( + `sid` int(32) NULL, + `cid` int(32) NULL, + `score` int NULL + ) ENGINE=OLAP + DUPLICATE KEY(`sid`) + DISTRIBUTED BY HASH(`sid`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql """ + CREATE MATERIALIZED VIEW ${mvSag} + BUILD DEFERRED + REFRESH COMPLETE ON commit + DISTRIBUTED BY HASH(`sid`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "excluded_trigger_tables" = "${tblGrade}" + ) + AS select a.sid,b.cid,b.score from ${tblStu} a join ${tblGrade} b on a.sid = b.sid; + """ + + sql """ + insert into ${tblGrade} values(1, 1, 60); + insert into ${tblStu} values(1, 'sam'); + """ + def sagJobName = getJobName(dbName, mvSag); + waitingMTMVTaskFinished(sagJobName) + order_qt_mv_sag "SELECT * FROM ${mvSag} order by sid,cid" + order_qt_task_sag "SELECT TaskContext from tasks('type'='mv') where MvName='${mvSag}' order by CreateTime desc limit 1" + + sql """ + insert into ${tblGrade} values(1, 2, 70); + """ + waitingMTMVTaskFinished(sagJobName) + order_qt_mv_sag1 "SELECT * FROM ${mvSag} order by sid,cid" + order_qt_task_sag1 "SELECT TaskContext from tasks('type'='mv') where MvName='${mvSag}' order by CreateTime desc limit 1" + + sql """ + insert into ${tblGrade} values(2, 1, 70); + insert into ${tblStu} values(2, 'jack'); + """ + + waitingMTMVTaskFinished(sagJobName) + order_qt_mv_sag2 "SELECT * FROM ${mvSag} order by sid,cid" + order_qt_task_sag2 "SELECT TaskContext from tasks('type'='mv') where MvName='${mvSag}' order by CreateTime desc limit 1" + + sql """drop materialized view if exists ${mvSag};""" + sql """drop table if exists `${tblStu}`""" + sql """drop table if exists `${tblGrade}`""" } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org