This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0-beta in repository https://gitbox.apache.org/repos/asf/doris.git
commit f60d7d2429104e8cd23dd669b052da0794f41230 Author: zhangdong <493738...@qq.com> AuthorDate: Thu Jun 8 10:46:25 2023 +0800 [feature-wip](MTMV) Sync finish status only for tasks (#20441) MTMV tasks keep finish status only to reduce the loss caused by logging. After changes, unfinished tasks will be lost directly when FE master restarts. --- .../org/apache/doris/journal/JournalEntity.java | 3 +- .../java/org/apache/doris/mtmv/MTMVJobManager.java | 19 +-- .../apache/doris/mtmv/MTMVTaskExecutorPool.java | 6 - .../org/apache/doris/mtmv/MTMVTaskManager.java | 141 ++------------------- .../apache/doris/mtmv/metadata/ChangeMTMVTask.java | 131 ------------------- .../doris/mtmv/metadata/MTMVCheckpointData.java | 20 +-- .../java/org/apache/doris/persist/EditLog.java | 7 - .../org/apache/doris/persist/OperationType.java | 1 + 8 files changed, 16 insertions(+), 312 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 37a7f1a1a6..6b1dc9e06c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -55,7 +55,6 @@ import org.apache.doris.load.loadv2.LoadJobFinalOperation; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.load.sync.SyncJob; import org.apache.doris.mtmv.metadata.ChangeMTMVJob; -import org.apache.doris.mtmv.metadata.ChangeMTMVTask; import org.apache.doris.mtmv.metadata.DropMTMVJob; import org.apache.doris.mtmv.metadata.DropMTMVTask; import org.apache.doris.mtmv.metadata.MTMVJob; @@ -762,7 +761,7 @@ public class JournalEntity implements Writable { break; } case OperationType.OP_CHANGE_MTMV_TASK: { - data = ChangeMTMVTask.read(in); + Text.readString(in); isRead = true; break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java index ae3d9007a6..f1d976a7f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java @@ -26,10 +26,8 @@ import org.apache.doris.metric.Metric; import org.apache.doris.metric.MetricLabel; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mtmv.MTMVUtils.JobState; -import org.apache.doris.mtmv.MTMVUtils.TaskRetryPolicy; import org.apache.doris.mtmv.MTMVUtils.TriggerMode; import org.apache.doris.mtmv.metadata.ChangeMTMVJob; -import org.apache.doris.mtmv.metadata.ChangeMTMVTask; import org.apache.doris.mtmv.metadata.MTMVCheckpointData; import org.apache.doris.mtmv.metadata.MTMVJob; import org.apache.doris.mtmv.metadata.MTMVJob.JobSchedule; @@ -88,8 +86,6 @@ public class MTMVJobManager { public void start() { if (isStarted.compareAndSet(false, true)) { - taskManager.clearUnfinishedTasks(); - // check the scheduler before using it // since it may be shutdown when master change to follower without process shutdown. if (periodScheduler.isShutdown()) { @@ -219,11 +215,9 @@ public class MTMVJobManager { periodFutureMap.put(job.getId(), future); periodNum++; } else if (job.getTriggerMode() == TriggerMode.ONCE) { - if (job.getRetryPolicy() == TaskRetryPolicy.ALWAYS || job.getRetryPolicy() == TaskRetryPolicy.TIMES) { - MTMVTaskExecuteParams executeOption = new MTMVTaskExecuteParams(); - submitJobTask(job.getName(), executeOption); - onceNum++; - } + MTMVTaskExecuteParams executeOption = new MTMVTaskExecuteParams(); + submitJobTask(job.getName(), executeOption); + onceNum++; } } LOG.info("Register {} period jobs and {} once jobs in the total {} jobs.", periodNum, onceNum, num); @@ -477,10 +471,6 @@ public class MTMVJobManager { taskManager.replayCreateJobTask(task); } - public void replayUpdateTask(ChangeMTMVTask changeTask) { - taskManager.replayUpdateTask(changeTask); - } - public void replayDropJobTasks(List<String> taskIds) { taskManager.dropTasks(taskIds, true); } @@ -527,7 +517,7 @@ public class MTMVJobManager { public long write(DataOutputStream dos, long checksum) throws IOException { MTMVCheckpointData data = new MTMVCheckpointData(); data.jobs = new ArrayList<>(nameToJobMap.values()); - data.tasks = taskManager.showTasks(null); + data.tasks = Lists.newArrayList(taskManager.getHistoryTasks()); String s = GsonUtils.GSON.toJson(data); Text.writeString(dos, s); return checksum; @@ -553,7 +543,6 @@ public class MTMVJobManager { return mtmvJobManager; } - // for test only public MTMVTaskManager getTaskManager() { return taskManager; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java index f68f68cbaf..0c63c287e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java @@ -17,9 +17,7 @@ package org.apache.doris.mtmv; -import org.apache.doris.catalog.Env; import org.apache.doris.mtmv.MTMVUtils.TaskState; -import org.apache.doris.mtmv.metadata.ChangeMTMVTask; import org.apache.doris.mtmv.metadata.MTMVTask; import org.apache.logging.log4j.LogManager; @@ -78,10 +76,6 @@ public class MTMVTaskExecutorPool { task.setErrorCode(-1); } task.setFinishTime(MTMVUtils.getNowTimeStamp()); - - ChangeMTMVTask changeTask = new ChangeMTMVTask(taskExecutor.getJob().getId(), task, TaskState.RUNNING, - task.getState()); - Env.getCurrentEnv().getEditLog().logChangeMTMVTask(changeTask); }); taskExecutor.setFuture(future); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java index cbac71b9bb..3396b30bbb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java @@ -24,8 +24,6 @@ import org.apache.doris.mtmv.MTMVUtils.JobState; import org.apache.doris.mtmv.MTMVUtils.TaskState; import org.apache.doris.mtmv.MTMVUtils.TriggerMode; import org.apache.doris.mtmv.metadata.ChangeMTMVJob; -import org.apache.doris.mtmv.metadata.ChangeMTMVTask; -import org.apache.doris.mtmv.metadata.MTMVJob; import org.apache.doris.mtmv.metadata.MTMVTask; import org.apache.doris.qe.ConnectContext; @@ -127,7 +125,6 @@ public class MTMVTaskManager { MTMVTask task = taskExecutor.initTask(taskId, MTMVUtils.getNowTimeStamp()); task.setPriority(params.getPriority()); LOG.info("Submit a mtmv task with id: {} of the job {}.", taskId, taskExecutor.getJob().getName()); - Env.getCurrentEnv().getEditLog().logCreateMTMVTask(task); arrangeToPendingTask(taskExecutor); return MTMVUtils.TaskSubmitStatus.SUBMITTED; } @@ -201,7 +198,7 @@ public class MTMVTaskManager { if (finalState == TaskState.FAILURE) { failedTaskCount.incrementAndGet(); } - changeAndLogTaskStatus(taskExecutor.getJobId(), taskExecutor.getTask(), TaskState.RUNNING, finalState); + Env.getCurrentEnv().getEditLog().logCreateMTMVTask(taskExecutor.getTask()); TriggerMode triggerMode = taskExecutor.getJob().getTriggerMode(); if (triggerMode == TriggerMode.ONCE) { @@ -239,19 +236,12 @@ public class MTMVTaskManager { MTMVTaskExecutor pendingTaskExecutor = taskQueue.poll(); taskExecutorPool.executeTask(pendingTaskExecutor); runningTaskMap.put(jobId, pendingTaskExecutor); - // change status from PENDING to Running - changeAndLogTaskStatus(jobId, pendingTaskExecutor.getTask(), TaskState.PENDING, TaskState.RUNNING); currentRunning++; } } } } - private void changeAndLogTaskStatus(long jobId, MTMVTask task, TaskState fromStatus, TaskState toStatus) { - ChangeMTMVTask changeTask = new ChangeMTMVTask(jobId, task, fromStatus, toStatus); - Env.getCurrentEnv().getEditLog().logChangeMTMVTask(changeTask); - } - public boolean tryLock() { try { return reentrantLock.tryLock(5, TimeUnit.SECONDS); @@ -328,86 +318,7 @@ public class MTMVTaskManager { } public void replayCreateJobTask(MTMVTask task) { - if (task.getState() == TaskState.SUCCESS || task.getState() == TaskState.FAILURE) { - if (MTMVUtils.getNowTimeStamp() > task.getExpireTime()) { - return; - } - } - - switch (task.getState()) { - case PENDING: - String jobName = task.getJobName(); - MTMVJob job = mtmvJobManager.getJob(jobName); - if (job == null) { - LOG.warn("fail to obtain task name {} because task is null", jobName); - return; - } - MTMVTaskExecutor taskExecutor = MTMVUtils.buildTask(job); - taskExecutor.setTask(task); - arrangeToPendingTask(taskExecutor); - break; - case RUNNING: - task.setState(TaskState.FAILURE); - addHistory(task); - break; - case FAILURE: - case SUCCESS: - addHistory(task); - break; - default: - break; - } - } - - public void replayUpdateTask(ChangeMTMVTask changeTask) { - TaskState fromStatus = changeTask.getFromStatus(); - TaskState toStatus = changeTask.getToStatus(); - Long jobId = changeTask.getJobId(); - if (fromStatus == TaskState.PENDING) { - Queue<MTMVTaskExecutor> taskQueue = getPendingTaskMap().get(jobId); - if (taskQueue == null) { - return; - } - if (taskQueue.size() == 0) { - getPendingTaskMap().remove(jobId); - return; - } - - MTMVTaskExecutor pendingTask = taskQueue.poll(); - MTMVTask status = pendingTask.getTask(); - - if (toStatus == TaskState.RUNNING) { - if (status.getTaskId().equals(changeTask.getTaskId())) { - status.setState(TaskState.RUNNING); - getRunningTaskMap().put(jobId, pendingTask); - } - } else if (toStatus == TaskState.FAILURE) { - status.setMessage(changeTask.getErrorMessage()); - status.setErrorCode(changeTask.getErrorCode()); - status.setState(TaskState.FAILURE); - addHistory(status); - } - if (taskQueue.size() == 0) { - getPendingTaskMap().remove(jobId); - } - } else if (fromStatus == TaskState.RUNNING && (toStatus == TaskState.SUCCESS - || toStatus == TaskState.FAILURE)) { - MTMVTaskExecutor runningTask = getRunningTaskMap().remove(jobId); - if (runningTask == null) { - return; - } - MTMVTask status = runningTask.getTask(); - if (status.getTaskId().equals(changeTask.getTaskId())) { - status.setMessage(changeTask.getErrorMessage()); - status.setErrorCode(changeTask.getErrorCode()); - status.setState(toStatus); - status.setFinishTime(changeTask.getFinishTime()); - addHistory(status); - } - } else { - LOG.warn("Illegal Task taskId:{} status transform from {} to {}", changeTask.getTaskId(), fromStatus, - toStatus); - } + addHistory(task); } public void clearTasksByJobName(String jobName, boolean isReplay) { @@ -462,11 +373,13 @@ public class MTMVTaskManager { Set<String> taskSet = new HashSet<>(taskIds); // Pending tasks will be clear directly. So we don't drop it again here. // Check the running task since the task was killed but was not move to the history queue. - for (long key : runningTaskMap.keySet()) { - MTMVTaskExecutor executor = runningTaskMap.get(key); - // runningTaskMap may be removed in the runningIterator - if (executor != null && taskSet.contains(executor.getTask().getTaskId())) { - runningTaskMap.remove(key); + if (!isReplay) { + for (long key : runningTaskMap.keySet()) { + MTMVTaskExecutor executor = runningTaskMap.get(key); + // runningTaskMap may be removed in the runningIterator + if (executor != null && taskSet.contains(executor.getTask().getTaskId())) { + runningTaskMap.remove(key); + } } } // Try to remove history tasks. @@ -479,40 +392,4 @@ public class MTMVTaskManager { } LOG.info("drop task history:{}", taskIds); } - - public void clearUnfinishedTasks() { - if (!tryLock()) { - return; - } - try { - Iterator<Long> pendingIter = getPendingTaskMap().keySet().iterator(); - while (pendingIter.hasNext()) { - Queue<MTMVTaskExecutor> tasks = getPendingTaskMap().get(pendingIter.next()); - while (!tasks.isEmpty()) { - MTMVTaskExecutor taskExecutor = tasks.poll(); - taskExecutor.getTask().setMessage("Fe abort the task"); - taskExecutor.getTask().setErrorCode(-1); - taskExecutor.getTask().setState(TaskState.FAILURE); - addHistory(taskExecutor.getTask()); - changeAndLogTaskStatus(taskExecutor.getJobId(), taskExecutor.getTask(), TaskState.PENDING, - TaskState.FAILURE); - } - pendingIter.remove(); - } - Iterator<Long> runningIter = getRunningTaskMap().keySet().iterator(); - while (runningIter.hasNext()) { - MTMVTaskExecutor taskExecutor = getRunningTaskMap().get(runningIter.next()); - taskExecutor.getTask().setMessage("Fe abort the task"); - taskExecutor.getTask().setErrorCode(-1); - taskExecutor.getTask().setState(TaskState.FAILURE); - taskExecutor.getTask().setFinishTime(MTMVUtils.getNowTimeStamp()); - runningIter.remove(); - addHistory(taskExecutor.getTask()); - changeAndLogTaskStatus(taskExecutor.getJobId(), taskExecutor.getTask(), TaskState.RUNNING, - TaskState.FAILURE); - } - } finally { - unlock(); - } - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/ChangeMTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/ChangeMTMVTask.java deleted file mode 100644 index 416cc123d9..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/ChangeMTMVTask.java +++ /dev/null @@ -1,131 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.mtmv.metadata; - -import org.apache.doris.common.io.Text; -import org.apache.doris.common.io.Writable; -import org.apache.doris.mtmv.MTMVUtils.TaskState; -import org.apache.doris.persist.gson.GsonUtils; - -import com.google.gson.annotations.SerializedName; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -public class ChangeMTMVTask implements Writable { - - @SerializedName("jobId") - private long jobId; - - @SerializedName("taskId") - private String taskId; - - @SerializedName("finishTime") - private long finishTime; - - @SerializedName("fromStatus") - TaskState fromStatus; - - @SerializedName("toStatus") - TaskState toStatus; - - @SerializedName("errorCode") - private int errorCode = -1; - - @SerializedName("errorMessage") - private String errorMessage = ""; - - - public ChangeMTMVTask(long jobId, MTMVTask task, TaskState fromStatus, TaskState toStatus) { - this.jobId = jobId; - this.taskId = task.getTaskId(); - this.fromStatus = fromStatus; - this.toStatus = toStatus; - this.finishTime = task.getFinishTime(); - errorCode = task.getErrorCode(); - errorMessage = task.getMessage(); - } - - public long getJobId() { - return jobId; - } - - public void setJobId(long jobId) { - this.jobId = jobId; - } - - public String getTaskId() { - return taskId; - } - - public void setTaskId(String taskId) { - this.taskId = taskId; - } - - public TaskState getFromStatus() { - return fromStatus; - } - - public void setFromStatus(TaskState fromStatus) { - this.fromStatus = fromStatus; - } - - public TaskState getToStatus() { - return toStatus; - } - - public void setToStatus(TaskState toStatus) { - this.toStatus = toStatus; - } - - public int getErrorCode() { - return errorCode; - } - - public void setErrorCode(int errorCode) { - this.errorCode = errorCode; - } - - public String getErrorMessage() { - return errorMessage; - } - - public void setErrorMessage(String errorMessage) { - this.errorMessage = errorMessage; - } - - public long getFinishTime() { - return finishTime; - } - - public void setFinishTime(long finishTime) { - this.finishTime = finishTime; - } - - public static ChangeMTMVTask read(DataInput in) throws IOException { - String json = Text.readString(in); - return GsonUtils.GSON.fromJson(json, ChangeMTMVTask.class); - } - - @Override - public void write(DataOutput out) throws IOException { - String json = GsonUtils.GSON.toJson(this); - Text.writeString(out, json); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVCheckpointData.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVCheckpointData.java index 163faed034..933ce31cf2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVCheckpointData.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/metadata/MTMVCheckpointData.java @@ -17,32 +17,14 @@ package org.apache.doris.mtmv.metadata; -import org.apache.doris.common.io.Text; -import org.apache.doris.common.io.Writable; -import org.apache.doris.persist.gson.GsonUtils; - import com.google.gson.annotations.SerializedName; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; import java.util.List; -public class MTMVCheckpointData implements Writable { +public class MTMVCheckpointData { @SerializedName("jobs") public List<MTMVJob> jobs; @SerializedName("tasks") public List<MTMVTask> tasks; - - @Override - public void write(DataOutput out) throws IOException { - String json = GsonUtils.GSON.toJson(this); - Text.writeString(out, json); - } - - public static MTMVCheckpointData read(DataInput in) throws IOException { - String json = Text.readString(in); - return GsonUtils.GSON.fromJson(json, MTMVCheckpointData.class); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index c64882632f..4f3d137442 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -69,7 +69,6 @@ import org.apache.doris.load.sync.SyncJob; import org.apache.doris.meta.MetaContext; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mtmv.metadata.ChangeMTMVJob; -import org.apache.doris.mtmv.metadata.ChangeMTMVTask; import org.apache.doris.mtmv.metadata.DropMTMVJob; import org.apache.doris.mtmv.metadata.DropMTMVTask; import org.apache.doris.mtmv.metadata.MTMVJob; @@ -917,8 +916,6 @@ public class EditLog { break; } case OperationType.OP_CHANGE_MTMV_TASK: { - final ChangeMTMVTask changeTask = (ChangeMTMVTask) journal.getData(); - env.getMTMVJobManager().replayUpdateTask(changeTask); break; } case OperationType.OP_DROP_MTMV_TASK: { @@ -1694,10 +1691,6 @@ public class EditLog { logEdit(OperationType.OP_CREATE_MTMV_TASK, task); } - public void logChangeMTMVTask(ChangeMTMVTask changeTaskRecord) { - logEdit(OperationType.OP_CHANGE_MTMV_TASK, changeTaskRecord); - } - public void logDropMTMVTasks(List<String> taskIds) { logEdit(OperationType.OP_DROP_MTMV_TASK, new DropMTMVTask(taskIds)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 32f7d958e8..e24055445d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -266,6 +266,7 @@ public class OperationType { public static final short OP_CREATE_MTMV_TASK = 340; public static final short OP_DROP_MTMV_TASK = 341; + @Deprecated public static final short OP_CHANGE_MTMV_TASK = 342; public static final short OP_ALTER_MTMV_STMT = 345; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org