SaintBacchus commented on code in PR #20441:
URL: https://github.com/apache/doris/pull/20441#discussion_r1220944326


##########
fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java:
##########
@@ -328,86 +318,10 @@ public MTMVTask getTask(String taskId) throws 
AnalysisException {
     }
 
     public void replayCreateJobTask(MTMVTask task) {
-        if (task.getState() == TaskState.SUCCESS || task.getState() == 
TaskState.FAILURE) {
-            if (MTMVUtils.getNowTimeStamp() > task.getExpireTime()) {
-                return;
-            }
-        }
-
-        switch (task.getState()) {
-            case PENDING:
-                String jobName = task.getJobName();
-                MTMVJob job = mtmvJobManager.getJob(jobName);
-                if (job == null) {
-                    LOG.warn("fail to obtain task name {} because task is 
null", jobName);
-                    return;
-                }
-                MTMVTaskExecutor taskExecutor = MTMVUtils.buildTask(job);
-                taskExecutor.setTask(task);
-                arrangeToPendingTask(taskExecutor);
-                break;
-            case RUNNING:
-                task.setState(TaskState.FAILURE);
-                addHistory(task);
-                break;
-            case FAILURE:
-            case SUCCESS:
-                addHistory(task);
-                break;
-            default:
-                break;
-        }
-    }
-
-    public void replayUpdateTask(ChangeMTMVTask changeTask) {
-        TaskState fromStatus = changeTask.getFromStatus();
-        TaskState toStatus = changeTask.getToStatus();
-        Long jobId = changeTask.getJobId();
-        if (fromStatus == TaskState.PENDING) {
-            Queue<MTMVTaskExecutor> taskQueue = getPendingTaskMap().get(jobId);
-            if (taskQueue == null) {
-                return;
-            }
-            if (taskQueue.size() == 0) {
-                getPendingTaskMap().remove(jobId);
-                return;
-            }
-
-            MTMVTaskExecutor pendingTask = taskQueue.poll();
-            MTMVTask status = pendingTask.getTask();
-
-            if (toStatus == TaskState.RUNNING) {
-                if (status.getTaskId().equals(changeTask.getTaskId())) {
-                    status.setState(TaskState.RUNNING);
-                    getRunningTaskMap().put(jobId, pendingTask);
-                }
-            } else if (toStatus == TaskState.FAILURE) {
-                status.setMessage(changeTask.getErrorMessage());
-                status.setErrorCode(changeTask.getErrorCode());
-                status.setState(TaskState.FAILURE);
-                addHistory(status);
-            }
-            if (taskQueue.size() == 0) {
-                getPendingTaskMap().remove(jobId);
-            }
-        } else if (fromStatus == TaskState.RUNNING && (toStatus == 
TaskState.SUCCESS
-                || toStatus == TaskState.FAILURE)) {
-            MTMVTaskExecutor runningTask = getRunningTaskMap().remove(jobId);
-            if (runningTask == null) {
-                return;
-            }
-            MTMVTask status = runningTask.getTask();
-            if (status.getTaskId().equals(changeTask.getTaskId())) {
-                status.setMessage(changeTask.getErrorMessage());
-                status.setErrorCode(changeTask.getErrorCode());
-                status.setState(toStatus);
-                status.setFinishTime(changeTask.getFinishTime());
-                addHistory(status);
-            }
-        } else {
-            LOG.warn("Illegal  Task taskId:{} status transform from {} to {}", 
changeTask.getTaskId(), fromStatus,
-                    toStatus);
+        if (MTMVUtils.getNowTimeStamp() > task.getExpireTime()) {

Review Comment:
   clear expired task in reply. it will cause inconsinst in different fe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to