This is an automated email from the ASF dual-hosted git repository. adonisling pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 8df4a94826 [fix](MTMV) Tasks leak when dropping job (#17984) 8df4a94826 is described below commit 8df4a948269398faac679893634e06afbccbb52b Author: huangzhaowei <huangzhaowei....@bytedance.com> AuthorDate: Tue Mar 21 23:22:17 2023 +0800 [fix](MTMV) Tasks leak when dropping job (#17984) 1. Divide MTMV regression tests into 4 suites 2. Try to remove tasks which were killed by dropping job actions in running map. --- .../java/org/apache/doris/mtmv/MTMVJobManager.java | 2 +- .../org/apache/doris/mtmv/MTMVTaskManager.java | 28 ++++++--- .../org/apache/doris/mtmv/MTMVJobManagerTest.java | 8 +-- regression-test/data/mtmv_p0/test_refresh_mtmv.out | 6 ++ ...t_create_mtmv.groovy => test_alter_mtmv.groovy} | 61 ++---------------- ...te_mtmv.groovy => test_create_both_mtmv.groovy} | 73 +++------------------- .../suites/mtmv_p0/test_create_mtmv.groovy | 65 +------------------ ...create_mtmv.groovy => test_refresh_mtmv.groovy} | 53 ++-------------- 8 files changed, 53 insertions(+), 243 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java index a6f10128a5..c6ff81c082 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java @@ -157,7 +157,7 @@ public class MTMVJobManager { Metric.MetricUnit.NOUNIT, "Total task number of mtmv.") { @Override public Integer getValue() { - return getTaskManager().getAllHistory().size(); + return getTaskManager().getHistoryTasks().size(); } }; totalTask.addLabel(new MetricLabel("type", "TOTAL-TASK")); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java index 2de94e5201..cbac71b9bb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java @@ -69,7 +69,7 @@ public class MTMVTaskManager { private final ReentrantLock reentrantLock = new ReentrantLock(true); // keep track of all the completed tasks - private final Deque<MTMVTask> historyQueue = Queues.newLinkedBlockingDeque(); + private final Deque<MTMVTask> historyTasks = Queues.newLinkedBlockingDeque(); private ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(1); @@ -276,11 +276,11 @@ public class MTMVTaskManager { } private void addHistory(MTMVTask task) { - historyQueue.addFirst(task); + historyTasks.addFirst(task); } - public Deque<MTMVTask> getAllHistory() { - return historyQueue; + public Deque<MTMVTask> getHistoryTasks() { + return historyTasks; } public List<MTMVTask> showAllTasks() { @@ -295,7 +295,7 @@ public class MTMVTaskManager { } taskList.addAll( getRunningTaskMap().values().stream().map(MTMVTaskExecutor::getTask).collect(Collectors.toList())); - taskList.addAll(getAllHistory()); + taskList.addAll(getHistoryTasks()); } else { for (Queue<MTMVTaskExecutor> pTaskQueue : getPendingTaskMap().values()) { taskList.addAll( @@ -305,7 +305,7 @@ public class MTMVTaskManager { taskList.addAll(getRunningTaskMap().values().stream().map(MTMVTaskExecutor::getTask) .filter(u -> u.getDBName().equals(dbName)).collect(Collectors.toList())); taskList.addAll( - getAllHistory().stream().filter(u -> u.getDBName().equals(dbName)).collect(Collectors.toList())); + getHistoryTasks().stream().filter(u -> u.getDBName().equals(dbName)).collect(Collectors.toList())); } return taskList.stream().sorted().collect(Collectors.toList()); @@ -417,7 +417,7 @@ public class MTMVTaskManager { return; } try { - Deque<MTMVTask> taskHistory = getAllHistory(); + List<MTMVTask> taskHistory = showAllTasks(); for (MTMVTask task : taskHistory) { if (task.getJobName().equals(jobName)) { clearTasks.add(task.getTaskId()); @@ -438,7 +438,7 @@ public class MTMVTaskManager { return; } try { - Deque<MTMVTask> taskHistory = getAllHistory(); + Deque<MTMVTask> taskHistory = getHistoryTasks(); for (MTMVTask task : taskHistory) { long expireTime = task.getExpireTime(); if (currentTime > expireTime) { @@ -460,7 +460,17 @@ public class MTMVTaskManager { } try { Set<String> taskSet = new HashSet<>(taskIds); - getAllHistory().removeIf(mtmvTask -> taskSet.contains(mtmvTask.getTaskId())); + // Pending tasks will be clear directly. So we don't drop it again here. + // Check the running task since the task was killed but was not move to the history queue. + for (long key : runningTaskMap.keySet()) { + MTMVTaskExecutor executor = runningTaskMap.get(key); + // runningTaskMap may be removed in the runningIterator + if (executor != null && taskSet.contains(executor.getTask().getTaskId())) { + runningTaskMap.remove(key); + } + } + // Try to remove history tasks. + getHistoryTasks().removeIf(mtmvTask -> taskSet.contains(mtmvTask.getTaskId())); if (!isReplay) { Env.getCurrentEnv().getEditLog().logDropMTMVTasks(taskIds); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java index 55cf3dceb0..9c5b98df11 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java @@ -58,15 +58,15 @@ public class MTMVJobManagerTest extends TestWithFeService { public void testSchedulerJob() throws DdlException, InterruptedException { MTMVJobManager jobManager = new MTMVJobManager(); jobManager.start(); - Assertions.assertTrue(jobManager.getTaskManager().getAllHistory().isEmpty()); + Assertions.assertTrue(jobManager.getTaskManager().getHistoryTasks().isEmpty()); MTMVJob job = MTMVUtilsTest.createSchedulerJob(); jobManager.createJob(job, false); Assertions.assertEquals(1, jobManager.showJobs(MTMVUtilsTest.dbName).size()); - while (jobManager.getTaskManager().getAllHistory().isEmpty()) { + while (jobManager.getTaskManager().getHistoryTasks().isEmpty()) { Thread.sleep(1000L); System.out.println("Loop once"); } - Assertions.assertTrue(jobManager.getTaskManager().getAllHistory().size() > 0); + Assertions.assertTrue(jobManager.getTaskManager().getHistoryTasks().size() > 0); } @Test @@ -83,7 +83,7 @@ public class MTMVJobManagerTest extends TestWithFeService { System.out.println("Loop once"); } - Assertions.assertEquals(1, jobManager.getTaskManager().getAllHistory().size()); + Assertions.assertEquals(1, jobManager.getTaskManager().getHistoryTasks().size()); Assertions.assertEquals(1, jobManager.getTaskManager().showAllTasks().size()); Assertions.assertEquals(1, jobManager.getTaskManager().showTasks(MTMVUtilsTest.dbName).size()); Assertions.assertEquals(1, diff --git a/regression-test/data/mtmv_p0/test_refresh_mtmv.out b/regression-test/data/mtmv_p0/test_refresh_mtmv.out new file mode 100644 index 0000000000..75d5531799 --- /dev/null +++ b/regression-test/data/mtmv_p0/test_refresh_mtmv.out @@ -0,0 +1,6 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select -- +clz 200 +lisi 300 +zhangsang 200 + diff --git a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy b/regression-test/suites/mtmv_p0/test_alter_mtmv.groovy similarity index 63% copy from regression-test/suites/mtmv_p0/test_create_mtmv.groovy copy to regression-test/suites/mtmv_p0/test_alter_mtmv.groovy index 476ab32a52..631e4997f8 100644 --- a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_alter_mtmv.groovy @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -suite("test_create_mtmv") { - def tableName = "t_user" - def tableNamePv = "t_user_pv" - def mvName = "multi_mv" +suite("test_alter_mtmv") { + def tableName = "t_test_alter_mtmv_user" + def tableNamePv = "t_test_alter_mtmv_pv" + def mvName = "multi_mv_test_alter_mtmv" sql """ ADMIN SET FRONTEND CONFIG("enable_mtmv_scheduler_framework"="true"); """ @@ -68,6 +68,7 @@ suite("test_create_mtmv") { SELECT ${tableName}.username, ${tableNamePv}.pv FROM ${tableName}, ${tableNamePv} WHERE ${tableName}.id=${tableNamePv}.id; """ + // waiting the task to be finished. def show_task_meta = sql_meta "SHOW MTMV TASK ON ${mvName}" def index = show_task_meta.indexOf(['State', 'CHAR']) def query = "SHOW MTMV TASK ON ${mvName}" @@ -82,55 +83,6 @@ suite("test_create_mtmv") { Thread.sleep(1000); } while (state.equals('PENDING') || state.equals('RUNNING')) - assertEquals 'SUCCESS', state, show_task_result.last().toString() - order_qt_select "SELECT * FROM ${mvName}" - - sql """ - DROP MATERIALIZED VIEW ${mvName} - """ - - // test only one job created when build IMMEDIATE and start time is before now. - sql """ - CREATE MATERIALIZED VIEW ${mvName} - BUILD IMMEDIATE REFRESH COMPLETE - start with "2022-11-03 00:00:00" next 1 DAY - KEY(username) - DISTRIBUTED BY HASH (username) buckets 1 - PROPERTIES ('replication_num' = '1') - AS - SELECT ${tableName}.username, ${tableNamePv}.pv FROM ${tableName}, ${tableNamePv} WHERE ${tableName}.id=${tableNamePv}.id; - """ - // wait task to be finished to avoid task leak in suite. - state = "PENDING" - do { - show_task_result = sql "${query}" - if (!show_task_result.isEmpty()) { - state = show_task_result.last().get(index) - } - println "The state of ${query} is ${state}" - Thread.sleep(1000); - } while (state.equals('PENDING') || state.equals('RUNNING')) - - def show_job_result = sql "SHOW MTMV JOB ON ${mvName}" - assertEquals 1, show_job_result.size() - - // test REFRESH make sure only define one mv and already run a task. - sql """ - REFRESH MATERIALIZED VIEW ${mvName} COMPLETE - """ - state = "PENDING" - do { - show_task_result = sql "${query}" - if (!show_task_result.isEmpty()) { - state = show_task_result.last().get(index) - } - println "The state of ${query} is ${state}" - Thread.sleep(1000); - } while (state.equals('PENDING') || state.equals('RUNNING')) - - assertEquals 'SUCCESS', state, show_task_result.last().toString() - assertEquals 2, show_task_result.size() - // test alter mtmv sql """ alter MATERIALIZED VIEW ${mvName} REFRESH COMPLETE start with "2022-11-03 00:00:00" next 2 DAY @@ -139,7 +91,7 @@ suite("test_create_mtmv") { def scheduleIndex = show_job_meta.indexOf(['Schedule', 'CHAR']) show_job_result = sql "SHOW MTMV JOB ON ${mvName}" - assertEquals 1, show_job_result.size() + assertEquals 1, show_job_result.size(), show_job_result.toString() assertEquals 'START 2022-11-03T00:00 EVERY(2 DAYS)', show_job_result.last().get(scheduleIndex).toString(), show_job_result.last().toString() @@ -147,4 +99,3 @@ suite("test_create_mtmv") { DROP MATERIALIZED VIEW ${mvName} """ } - diff --git a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy b/regression-test/suites/mtmv_p0/test_create_both_mtmv.groovy similarity index 59% copy from regression-test/suites/mtmv_p0/test_create_mtmv.groovy copy to regression-test/suites/mtmv_p0/test_create_both_mtmv.groovy index 476ab32a52..bf692de248 100644 --- a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_create_both_mtmv.groovy @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -suite("test_create_mtmv") { - def tableName = "t_user" - def tableNamePv = "t_user_pv" - def mvName = "multi_mv" +suite("test_create_both_mtmv") { + def tableName = "t_test_create_both_mtmv_user" + def tableNamePv = "t_test_create_both_mtmv_user_pv" + def mvName = "multi_mv_test_create_both_mtmv" sql """ ADMIN SET FRONTEND CONFIG("enable_mtmv_scheduler_framework"="true"); """ @@ -58,16 +58,18 @@ suite("test_create_mtmv") { sql """drop materialized view if exists ${mvName}""" + // test only one job created when build IMMEDIATE and start time is before now. sql """ CREATE MATERIALIZED VIEW ${mvName} BUILD IMMEDIATE REFRESH COMPLETE + start with "2022-11-03 00:00:00" next 1 DAY KEY(username) DISTRIBUTED BY HASH (username) buckets 1 PROPERTIES ('replication_num' = '1') AS SELECT ${tableName}.username, ${tableNamePv}.pv FROM ${tableName}, ${tableNamePv} WHERE ${tableName}.id=${tableNamePv}.id; """ - + // wait task to be finished to avoid task leak in suite. def show_task_meta = sql_meta "SHOW MTMV TASK ON ${mvName}" def index = show_task_meta.indexOf(['State', 'CHAR']) def query = "SHOW MTMV TASK ON ${mvName}" @@ -82,69 +84,10 @@ suite("test_create_mtmv") { Thread.sleep(1000); } while (state.equals('PENDING') || state.equals('RUNNING')) - assertEquals 'SUCCESS', state, show_task_result.last().toString() - order_qt_select "SELECT * FROM ${mvName}" - - sql """ - DROP MATERIALIZED VIEW ${mvName} - """ - - // test only one job created when build IMMEDIATE and start time is before now. - sql """ - CREATE MATERIALIZED VIEW ${mvName} - BUILD IMMEDIATE REFRESH COMPLETE - start with "2022-11-03 00:00:00" next 1 DAY - KEY(username) - DISTRIBUTED BY HASH (username) buckets 1 - PROPERTIES ('replication_num' = '1') - AS - SELECT ${tableName}.username, ${tableNamePv}.pv FROM ${tableName}, ${tableNamePv} WHERE ${tableName}.id=${tableNamePv}.id; - """ - // wait task to be finished to avoid task leak in suite. - state = "PENDING" - do { - show_task_result = sql "${query}" - if (!show_task_result.isEmpty()) { - state = show_task_result.last().get(index) - } - println "The state of ${query} is ${state}" - Thread.sleep(1000); - } while (state.equals('PENDING') || state.equals('RUNNING')) - def show_job_result = sql "SHOW MTMV JOB ON ${mvName}" - assertEquals 1, show_job_result.size() - - // test REFRESH make sure only define one mv and already run a task. - sql """ - REFRESH MATERIALIZED VIEW ${mvName} COMPLETE - """ - state = "PENDING" - do { - show_task_result = sql "${query}" - if (!show_task_result.isEmpty()) { - state = show_task_result.last().get(index) - } - println "The state of ${query} is ${state}" - Thread.sleep(1000); - } while (state.equals('PENDING') || state.equals('RUNNING')) - - assertEquals 'SUCCESS', state, show_task_result.last().toString() - assertEquals 2, show_task_result.size() - - // test alter mtmv - sql """ - alter MATERIALIZED VIEW ${mvName} REFRESH COMPLETE start with "2022-11-03 00:00:00" next 2 DAY - """ - show_job_meta = sql_meta "SHOW MTMV JOB ON ${mvName}" - def scheduleIndex = show_job_meta.indexOf(['Schedule', 'CHAR']) - - show_job_result = sql "SHOW MTMV JOB ON ${mvName}" - assertEquals 1, show_job_result.size() - - assertEquals 'START 2022-11-03T00:00 EVERY(2 DAYS)', show_job_result.last().get(scheduleIndex).toString(), show_job_result.last().toString() + assertEquals 1, show_job_result.size(), show_job_result.toString() sql """ DROP MATERIALIZED VIEW ${mvName} """ } - diff --git a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy b/regression-test/suites/mtmv_p0/test_create_mtmv.groovy index 476ab32a52..fa22595a1d 100644 --- a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_create_mtmv.groovy @@ -16,9 +16,9 @@ // under the License. suite("test_create_mtmv") { - def tableName = "t_user" - def tableNamePv = "t_user_pv" - def mvName = "multi_mv" + def tableName = "t_test_create_mtmv_user" + def tableNamePv = "t_test_create_mtmv_user_pv" + def mvName = "multi_mv_test_create_mtmv" sql """ ADMIN SET FRONTEND CONFIG("enable_mtmv_scheduler_framework"="true"); """ @@ -88,63 +88,4 @@ suite("test_create_mtmv") { sql """ DROP MATERIALIZED VIEW ${mvName} """ - - // test only one job created when build IMMEDIATE and start time is before now. - sql """ - CREATE MATERIALIZED VIEW ${mvName} - BUILD IMMEDIATE REFRESH COMPLETE - start with "2022-11-03 00:00:00" next 1 DAY - KEY(username) - DISTRIBUTED BY HASH (username) buckets 1 - PROPERTIES ('replication_num' = '1') - AS - SELECT ${tableName}.username, ${tableNamePv}.pv FROM ${tableName}, ${tableNamePv} WHERE ${tableName}.id=${tableNamePv}.id; - """ - // wait task to be finished to avoid task leak in suite. - state = "PENDING" - do { - show_task_result = sql "${query}" - if (!show_task_result.isEmpty()) { - state = show_task_result.last().get(index) - } - println "The state of ${query} is ${state}" - Thread.sleep(1000); - } while (state.equals('PENDING') || state.equals('RUNNING')) - - def show_job_result = sql "SHOW MTMV JOB ON ${mvName}" - assertEquals 1, show_job_result.size() - - // test REFRESH make sure only define one mv and already run a task. - sql """ - REFRESH MATERIALIZED VIEW ${mvName} COMPLETE - """ - state = "PENDING" - do { - show_task_result = sql "${query}" - if (!show_task_result.isEmpty()) { - state = show_task_result.last().get(index) - } - println "The state of ${query} is ${state}" - Thread.sleep(1000); - } while (state.equals('PENDING') || state.equals('RUNNING')) - - assertEquals 'SUCCESS', state, show_task_result.last().toString() - assertEquals 2, show_task_result.size() - - // test alter mtmv - sql """ - alter MATERIALIZED VIEW ${mvName} REFRESH COMPLETE start with "2022-11-03 00:00:00" next 2 DAY - """ - show_job_meta = sql_meta "SHOW MTMV JOB ON ${mvName}" - def scheduleIndex = show_job_meta.indexOf(['Schedule', 'CHAR']) - - show_job_result = sql "SHOW MTMV JOB ON ${mvName}" - assertEquals 1, show_job_result.size() - - assertEquals 'START 2022-11-03T00:00 EVERY(2 DAYS)', show_job_result.last().get(scheduleIndex).toString(), show_job_result.last().toString() - - sql """ - DROP MATERIALIZED VIEW ${mvName} - """ } - diff --git a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy b/regression-test/suites/mtmv_p0/test_refresh_mtmv.groovy similarity index 66% copy from regression-test/suites/mtmv_p0/test_create_mtmv.groovy copy to regression-test/suites/mtmv_p0/test_refresh_mtmv.groovy index 476ab32a52..8a7cfbac97 100644 --- a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_refresh_mtmv.groovy @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -suite("test_create_mtmv") { - def tableName = "t_user" - def tableNamePv = "t_user_pv" - def mvName = "multi_mv" +suite("test_refresh_mtmv") { + def tableName = "t_test_refresh_mtmv_user" + def tableNamePv = "t_test_refresh_mtmv_user_pv" + def mvName = "multi_mv_test_refresh_mtmv" sql """ ADMIN SET FRONTEND CONFIG("enable_mtmv_scheduler_framework"="true"); """ @@ -68,6 +68,7 @@ suite("test_create_mtmv") { SELECT ${tableName}.username, ${tableNamePv}.pv FROM ${tableName}, ${tableNamePv} WHERE ${tableName}.id=${tableNamePv}.id; """ + // waiting the task to be finished. def show_task_meta = sql_meta "SHOW MTMV TASK ON ${mvName}" def index = show_task_meta.indexOf(['State', 'CHAR']) def query = "SHOW MTMV TASK ON ${mvName}" @@ -85,35 +86,6 @@ suite("test_create_mtmv") { assertEquals 'SUCCESS', state, show_task_result.last().toString() order_qt_select "SELECT * FROM ${mvName}" - sql """ - DROP MATERIALIZED VIEW ${mvName} - """ - - // test only one job created when build IMMEDIATE and start time is before now. - sql """ - CREATE MATERIALIZED VIEW ${mvName} - BUILD IMMEDIATE REFRESH COMPLETE - start with "2022-11-03 00:00:00" next 1 DAY - KEY(username) - DISTRIBUTED BY HASH (username) buckets 1 - PROPERTIES ('replication_num' = '1') - AS - SELECT ${tableName}.username, ${tableNamePv}.pv FROM ${tableName}, ${tableNamePv} WHERE ${tableName}.id=${tableNamePv}.id; - """ - // wait task to be finished to avoid task leak in suite. - state = "PENDING" - do { - show_task_result = sql "${query}" - if (!show_task_result.isEmpty()) { - state = show_task_result.last().get(index) - } - println "The state of ${query} is ${state}" - Thread.sleep(1000); - } while (state.equals('PENDING') || state.equals('RUNNING')) - - def show_job_result = sql "SHOW MTMV JOB ON ${mvName}" - assertEquals 1, show_job_result.size() - // test REFRESH make sure only define one mv and already run a task. sql """ REFRESH MATERIALIZED VIEW ${mvName} COMPLETE @@ -129,22 +101,9 @@ suite("test_create_mtmv") { } while (state.equals('PENDING') || state.equals('RUNNING')) assertEquals 'SUCCESS', state, show_task_result.last().toString() - assertEquals 2, show_task_result.size() - - // test alter mtmv - sql """ - alter MATERIALIZED VIEW ${mvName} REFRESH COMPLETE start with "2022-11-03 00:00:00" next 2 DAY - """ - show_job_meta = sql_meta "SHOW MTMV JOB ON ${mvName}" - def scheduleIndex = show_job_meta.indexOf(['Schedule', 'CHAR']) - - show_job_result = sql "SHOW MTMV JOB ON ${mvName}" - assertEquals 1, show_job_result.size() - - assertEquals 'START 2022-11-03T00:00 EVERY(2 DAYS)', show_job_result.last().get(scheduleIndex).toString(), show_job_result.last().toString() + assertEquals 2, show_task_result.size(), show_task_result.toString() sql """ DROP MATERIALIZED VIEW ${mvName} """ } - --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org