SaintBacchus commented on code in PR #20441: URL: https://github.com/apache/doris/pull/20441#discussion_r1220944326
########## fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java: ########## @@ -328,86 +318,10 @@ public MTMVTask getTask(String taskId) throws AnalysisException { } public void replayCreateJobTask(MTMVTask task) { - if (task.getState() == TaskState.SUCCESS || task.getState() == TaskState.FAILURE) { - if (MTMVUtils.getNowTimeStamp() > task.getExpireTime()) { - return; - } - } - - switch (task.getState()) { - case PENDING: - String jobName = task.getJobName(); - MTMVJob job = mtmvJobManager.getJob(jobName); - if (job == null) { - LOG.warn("fail to obtain task name {} because task is null", jobName); - return; - } - MTMVTaskExecutor taskExecutor = MTMVUtils.buildTask(job); - taskExecutor.setTask(task); - arrangeToPendingTask(taskExecutor); - break; - case RUNNING: - task.setState(TaskState.FAILURE); - addHistory(task); - break; - case FAILURE: - case SUCCESS: - addHistory(task); - break; - default: - break; - } - } - - public void replayUpdateTask(ChangeMTMVTask changeTask) { - TaskState fromStatus = changeTask.getFromStatus(); - TaskState toStatus = changeTask.getToStatus(); - Long jobId = changeTask.getJobId(); - if (fromStatus == TaskState.PENDING) { - Queue<MTMVTaskExecutor> taskQueue = getPendingTaskMap().get(jobId); - if (taskQueue == null) { - return; - } - if (taskQueue.size() == 0) { - getPendingTaskMap().remove(jobId); - return; - } - - MTMVTaskExecutor pendingTask = taskQueue.poll(); - MTMVTask status = pendingTask.getTask(); - - if (toStatus == TaskState.RUNNING) { - if (status.getTaskId().equals(changeTask.getTaskId())) { - status.setState(TaskState.RUNNING); - getRunningTaskMap().put(jobId, pendingTask); - } - } else if (toStatus == TaskState.FAILURE) { - status.setMessage(changeTask.getErrorMessage()); - status.setErrorCode(changeTask.getErrorCode()); - status.setState(TaskState.FAILURE); - addHistory(status); - } - if (taskQueue.size() == 0) { - getPendingTaskMap().remove(jobId); - } - } else if (fromStatus == TaskState.RUNNING && (toStatus == TaskState.SUCCESS - || toStatus == TaskState.FAILURE)) { - MTMVTaskExecutor runningTask = getRunningTaskMap().remove(jobId); - if (runningTask == null) { - return; - } - MTMVTask status = runningTask.getTask(); - if (status.getTaskId().equals(changeTask.getTaskId())) { - status.setMessage(changeTask.getErrorMessage()); - status.setErrorCode(changeTask.getErrorCode()); - status.setState(toStatus); - status.setFinishTime(changeTask.getFinishTime()); - addHistory(status); - } - } else { - LOG.warn("Illegal Task taskId:{} status transform from {} to {}", changeTask.getTaskId(), fromStatus, - toStatus); + if (MTMVUtils.getNowTimeStamp() > task.getExpireTime()) { Review Comment: clear expired task in reply. it will cause inconsinst in different fe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org