This is an automated email from the ASF dual-hosted git repository. zykkk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 232f120edc [Improve](Job)Support other types of Job query interfaces (#24172) 232f120edc is described below commit 232f120edc713955741898af94698dca247ae7de Author: Calvin Kirs <acm_mas...@163.com> AuthorDate: Tue Sep 12 13:55:56 2023 +0800 [Improve](Job)Support other types of Job query interfaces (#24172) - Support MTMV job - Task info add create time and sql - Optimize scheduling logic --- .../main/java/org/apache/doris/common/Config.java | 4 +- fe/fe-core/src/main/cup/sql_parser.cup | 6 +- .../org/apache/doris/analysis/ShowJobStmt.java | 44 +++++++++----- .../org/apache/doris/analysis/ShowJobTaskStmt.java | 29 ++++++--- .../main/java/org/apache/doris/catalog/Env.java | 1 + .../java/org/apache/doris/qe/ShowExecutor.java | 5 +- .../doris/scheduler/constants/JobCategory.java | 13 +++-- .../doris/scheduler/disruptor/TaskDisruptor.java | 20 +++---- .../doris/scheduler/disruptor/TaskEvent.java | 2 + .../doris/scheduler/disruptor/TaskHandler.java | 32 +++++++--- .../doris/scheduler/executor/JobExecutor.java | 3 +- .../doris/scheduler/executor/SqlJobExecutor.java | 14 +++-- .../scheduler/executor/TransientTaskExecutor.java | 2 +- .../ExecutorResult.java} | 29 +++++---- .../java/org/apache/doris/scheduler/job/Job.java | 13 ++++- .../org/apache/doris/scheduler/job/JobTask.java | 18 +++++- .../apache/doris/scheduler/job/TimerJobTask.java | 9 ++- .../doris/scheduler/manager/JobTaskManager.java | 26 +++++++++ .../doris/scheduler/manager/TimerJobManager.java | 68 ++++++++++++++++------ .../scheduler/disruptor/TaskDisruptorTest.java | 7 ++- .../scheduler/disruptor/TimerJobManagerTest.java | 6 +- 21 files changed, 241 insertions(+), 110 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 54a0e814a4..fd5258e0c0 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1554,7 +1554,7 @@ public class Config extends ConfigBase { public static boolean enable_pipeline_load = false; @ConfField - public static int scheduler_job_task_max_saved_count = 10; + public static int scheduler_job_task_max_saved_count = 20; /** * The number of async tasks that can be queued. @See TaskDisruptor @@ -1568,7 +1568,7 @@ public class Config extends ConfigBase { * if we have a lot of async tasks, we need more threads to consume them. Sure, it's depends on the cpu cores. */ @ConfField - public static int async_task_consumer_thread_num = 10; + public static int async_task_consumer_thread_num = 5; // enable_workload_group should be immutable and temporarily set to mutable during the development test phase @ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL) diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 94fcd2260f..99c65ffee2 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -2689,15 +2689,15 @@ resume_job_stmt ::= show_job_stmt ::= KW_SHOW KW_JOBS {: - RESULT = new ShowJobStmt(null,null); + RESULT = new ShowJobStmt(null,null,null); :} | KW_SHOW KW_JOB KW_FOR job_label:jobLabel {: - RESULT = new ShowJobStmt(jobLabel,null); + RESULT = new ShowJobStmt(null,jobLabel,null); :} | KW_SHOW KW_JOB KW_TASKS KW_FOR job_label:jobLabel {: - RESULT = new ShowJobTaskStmt(jobLabel); + RESULT = new ShowJobTaskStmt(null,jobLabel); :} ; pause_job_stmt ::= diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobStmt.java index 56fc916b2a..7ad3ce343c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobStmt.java @@ -26,18 +26,21 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ShowResultSetMetaData; +import org.apache.doris.scheduler.constants.JobCategory; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; +import lombok.Getter; +import org.apache.commons.lang3.StringUtils; import java.util.List; /** * SHOW JOB [FOR JobName] * eg: show event - * return all job in connection db + * return all job in connection db * eg: show event for test - * return job named test in connection db + * return job named test in connection db */ public class ShowJobStmt extends ShowStmt { @@ -57,29 +60,32 @@ public class ShowJobStmt extends ShowStmt { .add("Status") .add("LastExecuteFinishTime") .add("ErrorMsg") + .add("CreateTime") .add("Comment") .build(); + private static final String MTMV_NAME_TITLE = "mtmv_name"; + + private static final String NAME_TITLE = "name"; private final LabelName labelName; + + @Getter private String dbFullName; // optional + + @Getter + private JobCategory jobCategory; // optional + + private String jobCategoryName; // optional + + @Getter private String name; // optional + @Getter private String pattern; // optional - public ShowJobStmt(LabelName labelName, String pattern) { + public ShowJobStmt(String category, LabelName labelName, String pattern) { this.labelName = labelName; this.pattern = pattern; - } - - public String getDbFullName() { - return dbFullName; - } - - public String getName() { - return name; - } - - public String getPattern() { - return pattern; + this.jobCategoryName = category; } @Override @@ -87,6 +93,11 @@ public class ShowJobStmt extends ShowStmt { super.analyze(analyzer); checkAuth(); checkLabelName(analyzer); + if (StringUtils.isBlank(jobCategoryName)) { + this.jobCategory = JobCategory.SQL; + } else { + this.jobCategory = JobCategory.valueOf(jobCategoryName.toUpperCase()); + } } private void checkAuth() throws AnalysisException { @@ -118,6 +129,9 @@ public class ShowJobStmt extends ShowStmt { ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder(); for (String title : TITLE_NAMES) { + if (this.jobCategory.equals(JobCategory.MTMV) && title.equals(NAME_TITLE)) { + builder.addColumn(new Column(MTMV_NAME_TITLE, ScalarType.createVarchar(30))); + } builder.addColumn(new Column(title, ScalarType.createVarchar(30))); } return builder.build(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java index e7b04d9ece..74dca72fb7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java @@ -25,9 +25,12 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; import org.apache.doris.qe.ShowResultSetMetaData; +import org.apache.doris.scheduler.constants.JobCategory; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; +import lombok.Getter; +import org.apache.commons.lang3.StringUtils; import java.util.List; @@ -40,27 +43,30 @@ public class ShowJobTaskStmt extends ShowStmt { new ImmutableList.Builder<String>() .add("JobId") .add("TaskId") + .add("CreateTime") .add("StartTime") .add("EndTime") .add("Status") + .add("ExecuteSql") .add("Result") .add("ErrorMsg") .build(); + @Getter private final LabelName labelName; + + @Getter + private JobCategory jobCategory; // optional + + private String jobCategoryName; // optional + @Getter private String dbFullName; // optional + @Getter private String name; // optional - public ShowJobTaskStmt(LabelName labelName) { + public ShowJobTaskStmt(String category, LabelName labelName) { this.labelName = labelName; - } - - public String getDbFullName() { - return dbFullName; - } - - public String getName() { - return name; + this.jobCategoryName = category; } @Override @@ -68,6 +74,11 @@ public class ShowJobTaskStmt extends ShowStmt { super.analyze(analyzer); CreateJobStmt.checkAuth(); checkLabelName(analyzer); + if (StringUtils.isBlank(jobCategoryName)) { + this.jobCategory = JobCategory.SQL; + } else { + this.jobCategory = JobCategory.valueOf(jobCategoryName.toUpperCase()); + } } private void checkLabelName(Analyzer analyzer) throws AnalysisException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 8ba9335e28..c016ada719 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -1523,6 +1523,7 @@ public class Env { publishVersionDaemon.start(); // Start txn cleaner txnCleaner.start(); + timerJobManager.start(); // Alter getAlterInstance().start(); // Consistency checker diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index 03bdd24d0e..3b4049338b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -193,7 +193,6 @@ import org.apache.doris.mtmv.MTMVJobManager; import org.apache.doris.mtmv.metadata.MTMVJob; import org.apache.doris.mtmv.metadata.MTMVTask; import org.apache.doris.mysql.privilege.PrivPredicate; -import org.apache.doris.scheduler.constants.JobCategory; import org.apache.doris.scheduler.job.Job; import org.apache.doris.scheduler.job.JobTask; import org.apache.doris.statistics.AnalysisInfo; @@ -1441,7 +1440,7 @@ public class ShowExecutor { ShowJobTaskStmt showJobTaskStmt = (ShowJobTaskStmt) stmt; List<List<String>> rows = Lists.newArrayList(); List<Job> jobs = Env.getCurrentEnv().getJobRegister() - .getJobs(showJobTaskStmt.getDbFullName(), showJobTaskStmt.getName(), JobCategory.SQL, + .getJobs(showJobTaskStmt.getDbFullName(), showJobTaskStmt.getName(), showJobTaskStmt.getJobCategory(), null); if (CollectionUtils.isEmpty(jobs)) { resultSet = new ShowResultSet(showJobTaskStmt.getMetaData(), rows); @@ -1470,7 +1469,7 @@ public class ShowExecutor { CaseSensibility.JOB.getCaseSensibility()); } jobList = Env.getCurrentEnv().getJobRegister() - .getJobs(showJobStmt.getDbFullName(), showJobStmt.getName(), JobCategory.SQL, + .getJobs(showJobStmt.getDbFullName(), showJobStmt.getName(), showJobStmt.getJobCategory(), matcher); if (jobList.isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobCategory.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobCategory.java index eb2653b9da..72a967625e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobCategory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobCategory.java @@ -25,25 +25,26 @@ import lombok.Getter; public enum JobCategory { COMMON(1, "common"), SQL(2, "sql"), + MTMV(3, "mtmv"), ; @Getter private int code; @Getter - private String description; + private String name; - JobCategory(int code, String description) { + JobCategory(int code, String name) { this.code = code; - this.description = description; + this.name = name; } - public static JobCategory getJobCategoryByCode(int code) { + public static JobCategory getJobCategoryByName(String name) { for (JobCategory jobCategory : JobCategory.values()) { - if (jobCategory.getCode() == code) { + if (jobCategory.name.equalsIgnoreCase(name)) { return jobCategory; } } - return null; + throw new IllegalArgumentException("Unknown job category name: " + name); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java index 3b59a5187e..dcf346e3eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java @@ -23,8 +23,7 @@ import org.apache.doris.scheduler.manager.TimerJobManager; import org.apache.doris.scheduler.manager.TransientTaskManager; import com.lmax.disruptor.BlockingWaitStrategy; -import com.lmax.disruptor.EventTranslatorOneArg; -import com.lmax.disruptor.EventTranslatorTwoArg; +import com.lmax.disruptor.EventTranslatorThreeArg; import com.lmax.disruptor.TimeoutException; import com.lmax.disruptor.WorkHandler; import com.lmax.disruptor.dsl.Disruptor; @@ -65,12 +64,13 @@ public class TaskDisruptor implements Closeable { private boolean isClosed = false; /** - * The default {@link EventTranslatorOneArg} to use for {@link #tryPublish(Long)}. + * The default {@link EventTranslatorThreeArg} to use for {@link #tryPublish(Long, Long)}. * This is used to avoid creating a new object for each publish. */ - private static final EventTranslatorTwoArg<TaskEvent, Long, TaskType> TRANSLATOR - = (event, sequence, jobId, taskType) -> { + private static final EventTranslatorThreeArg<TaskEvent, Long, Long, TaskType> TRANSLATOR + = (event, sequence, jobId, taskId, taskType) -> { event.setId(jobId); + event.setTaskId(taskId); event.setTaskType(taskType); }; @@ -89,15 +89,15 @@ public class TaskDisruptor implements Closeable { /** * Publishes a job to the disruptor. * - * @param jobId job id + * @param jobId job id */ - public void tryPublish(Long jobId) { + public void tryPublish(Long jobId, Long taskId) { if (isClosed) { log.info("tryPublish failed, disruptor is closed, jobId: {}", jobId); return; } try { - disruptor.publishEvent(TRANSLATOR, jobId, TaskType.TimerJobTask); + disruptor.publishEvent(TRANSLATOR, jobId, taskId, TaskType.TimerJobTask); } catch (Exception e) { log.error("tryPublish failed, jobId: {}", jobId, e); } @@ -106,7 +106,7 @@ public class TaskDisruptor implements Closeable { /** * Publishes a task to the disruptor. * - * @param taskId task id + * @param taskId task id */ public void tryPublishTask(Long taskId) { if (isClosed) { @@ -114,7 +114,7 @@ public class TaskDisruptor implements Closeable { return; } try { - disruptor.publishEvent(TRANSLATOR, taskId, TaskType.TransientTask); + disruptor.publishEvent(TRANSLATOR, taskId, 0L, TaskType.TransientTask); } catch (Exception e) { log.error("tryPublish failed, taskId: {}", taskId, e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskEvent.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskEvent.java index 6c4fcb1076..8b24b8c194 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskEvent.java @@ -37,6 +37,8 @@ public class TaskEvent { */ private Long id; + private Long taskId; + private TaskType taskType; public static final EventFactory<TaskEvent> FACTORY = TaskEvent::new; diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java index 0b309ec3ce..3056502e86 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java @@ -18,8 +18,10 @@ package org.apache.doris.scheduler.disruptor; import org.apache.doris.catalog.Env; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.scheduler.exception.JobException; import org.apache.doris.scheduler.executor.TransientTaskExecutor; +import org.apache.doris.scheduler.job.ExecutorResult; import org.apache.doris.scheduler.job.Job; import org.apache.doris.scheduler.job.JobTask; import org.apache.doris.scheduler.manager.JobTaskManager; @@ -29,8 +31,6 @@ import org.apache.doris.scheduler.manager.TransientTaskManager; import com.lmax.disruptor.WorkHandler; import lombok.extern.slf4j.Slf4j; -import java.util.Objects; - /** * This class represents a work handler for processing event tasks consumed by a Disruptor. * The work handler retrieves the associated event job and executes it if it is running. @@ -89,6 +89,8 @@ public class TaskHandler implements WorkHandler<TaskEvent> { @SuppressWarnings("checkstyle:UnusedLocalVariable") public void onTimerJobTaskHandle(TaskEvent taskEvent) { long jobId = taskEvent.getId(); + long taskId = taskEvent.getTaskId(); + long createTimeMs = jobTaskManager.pollPrepareTaskByTaskId(jobId, taskId); Job job = timerJobManager.getJob(jobId); if (job == null) { log.info("job is null, jobId: {}", jobId); @@ -99,10 +101,11 @@ public class TaskHandler implements WorkHandler<TaskEvent> { return; } log.debug("job is running, eventJobId: {}", jobId); - JobTask jobTask = new JobTask(jobId); + + JobTask jobTask = new JobTask(jobId, taskId, createTimeMs); try { jobTask.setStartTimeMs(System.currentTimeMillis()); - Object result = job.getExecutor().execute(job); + ExecutorResult result = job.getExecutor().execute(job); job.setLatestCompleteExecuteTimeMs(System.currentTimeMillis()); if (job.isCycleJob()) { updateJobStatusIfPastEndTime(job); @@ -110,14 +113,27 @@ public class TaskHandler implements WorkHandler<TaskEvent> { // one time job should be finished after execute updateOnceTimeJobStatus(job); } - String resultStr = Objects.isNull(result) ? "" : result.toString(); + if (null == result) { + log.warn("Job execute failed, jobId: {}, result is null", jobId); + jobTask.setErrorMsg("Job execute failed, result is null"); + jobTask.setIsSuccessful(false); + timerJobManager.pauseJob(jobId); + return; + } + String resultStr = GsonUtils.GSON.toJson(result.getResult()); jobTask.setExecuteResult(resultStr); - jobTask.setIsSuccessful(true); + jobTask.setIsSuccessful(result.isSuccess()); + if (!result.isSuccess()) { + log.warn("Job execute failed, jobId: {}, msg : {}", jobId, result.getExecutorSql()); + jobTask.setErrorMsg(result.getExecutorSql()); + timerJobManager.pauseJob(jobId); + } + jobTask.setExecuteSql(result.getExecutorSql()); } catch (Exception e) { log.warn("Job execute failed, jobId: {}, msg : {}", jobId, e.getMessage()); - job.pause(e.getMessage()); jobTask.setErrorMsg(e.getMessage()); jobTask.setIsSuccessful(false); + timerJobManager.pauseJob(jobId); } jobTask.setEndTimeMs(System.currentTimeMillis()); if (null == jobTaskManager) { @@ -143,7 +159,7 @@ public class TaskHandler implements WorkHandler<TaskEvent> { private void updateJobStatusIfPastEndTime(Job job) { if (job.isExpired()) { - job.finish(); + timerJobManager.finishJob(job.getJobId()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java index e67e62f267..a6f2e10306 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java @@ -18,6 +18,7 @@ package org.apache.doris.scheduler.executor; import org.apache.doris.scheduler.exception.JobException; +import org.apache.doris.scheduler.job.ExecutorResult; import org.apache.doris.scheduler.job.Job; /** @@ -39,6 +40,6 @@ public interface JobExecutor<T> { * * @return The result of the event job execution. */ - T execute(Job job) throws JobException; + ExecutorResult execute(Job job) throws JobException; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java index 572e895906..5441532142 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java @@ -23,6 +23,7 @@ import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.scheduler.exception.JobException; +import org.apache.doris.scheduler.job.ExecutorResult; import org.apache.doris.scheduler.job.Job; import org.apache.doris.thrift.TUniqueId; @@ -36,10 +37,9 @@ import java.util.UUID; /** * we use this executor to execute sql job * - * @param <QueryState> the state of sql job, we can record the state of sql job */ @Slf4j -public class SqlJobExecutor<QueryState> implements JobExecutor { +public class SqlJobExecutor implements JobExecutor { @Getter @Setter @@ -51,7 +51,7 @@ public class SqlJobExecutor<QueryState> implements JobExecutor { } @Override - public String execute(Job job) throws JobException { + public ExecutorResult<String> execute(Job job) throws JobException { ConnectContext ctx = new ConnectContext(); ctx.setEnv(Env.getCurrentEnv()); ctx.setCluster(ClusterNamespace.getClusterNameFromFullName(job.getDbName())); @@ -67,9 +67,12 @@ public class SqlJobExecutor<QueryState> implements JobExecutor { try { StmtExecutor executor = new StmtExecutor(ctx, sql); executor.execute(queryId); - return convertExecuteResult(ctx, taskIdString); + String result = convertExecuteResult(ctx, taskIdString); + + return new ExecutorResult<>(result, true, null, sql); } catch (Exception e) { - throw new JobException("execute sql job failed, sql: " + sql + ", error: " + e.getMessage()); + log.warn("execute sql job failed, sql: {}, error: {}", sql, e.getMessage()); + return new ExecutorResult<>(null, false, e.getMessage(), sql); } } @@ -82,6 +85,7 @@ public class SqlJobExecutor<QueryState> implements JobExecutor { throw new JobException("error code: " + ctx.getState().getErrorCode() + ", error msg: " + ctx.getState().getErrorMessage()); } + return "queryId:" + queryId + ",affectedRows : " + ctx.getState().getAffectedRows() + ", warningRows: " + ctx.getState().getWarningRows() + ",infoMsg" + ctx.getState().getInfoMessage(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/TransientTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/TransientTaskExecutor.java index 6f818ae841..f297753bbd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/TransientTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/TransientTaskExecutor.java @@ -22,7 +22,7 @@ import org.apache.doris.scheduler.exception.JobException; /** * A functional interface for executing a memory task. */ -public interface TransientTaskExecutor<T> { +public interface TransientTaskExecutor { /** * Executes the memory task. diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/TransientTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/ExecutorResult.java similarity index 62% copy from fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/TransientTaskExecutor.java copy to fe/fe-core/src/main/java/org/apache/doris/scheduler/job/ExecutorResult.java index 6f818ae841..99df9c9e78 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/TransientTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/ExecutorResult.java @@ -15,24 +15,21 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.scheduler.executor; +package org.apache.doris.scheduler.job; -import org.apache.doris.scheduler.exception.JobException; +import lombok.AllArgsConstructor; +import lombok.Data; -/** - * A functional interface for executing a memory task. - */ -public interface TransientTaskExecutor<T> { +@Data +@AllArgsConstructor +public class ExecutorResult<T> { - /** - * Executes the memory task. - * Exceptions will be caught internally, so there is no need to define or throw them separately. - */ - void execute() throws JobException; + private T result; - /** - * Cancel the memory task. - */ - void cancel() throws JobException; -} + private boolean success; + + private String errorMsg; + private String executorSql; + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java index e65cf07754..ebfdfb04a2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java @@ -87,6 +87,8 @@ public class Job implements Writable { */ @SerializedName("executor") private JobExecutor executor; + @SerializedName("baseName") + private String baseName; @SerializedName("user") private String user; @@ -124,6 +126,9 @@ public class Job implements Writable { @SerializedName("nextExecuteTimeMs") private Long nextExecuteTimeMs = 0L; + @SerializedName("createTimeMs") + private Long createTimeMs = System.currentTimeMillis(); + @SerializedName("comment") private String comment; @@ -209,7 +214,9 @@ public class Job implements Writable { if (endTimeMs != 0L && endTimeMs < System.currentTimeMillis()) { throw new DdlException("endTimeMs must be greater than current time"); } - + if (null != intervalUnit && null != originInterval) { + this.intervalMs = intervalUnit.getParameterValue(originInterval); + } if (isCycleJob && (intervalMs == null || intervalMs <= 0L)) { throw new DdlException("cycle job must set intervalMs"); } @@ -236,6 +243,9 @@ public class Job implements Writable { List<String> row = Lists.newArrayList(); row.add(String.valueOf(jobId)); row.add(dbName); + if (jobCategory.equals(JobCategory.MTMV)) { + row.add(baseName); + } row.add(jobName); row.add(user); row.add(timezone); @@ -256,6 +266,7 @@ public class Job implements Writable { row.add(jobStatus.name()); row.add(latestCompleteExecuteTimeMs <= 0L ? "null" : TimeUtils.longToTimeString(latestCompleteExecuteTimeMs)); row.add(errMsg == null ? "null" : errMsg); + row.add(createTimeMs <= 0L ? "null" : TimeUtils.longToTimeString(createTimeMs)); row.add(comment == null ? "null" : comment); return row; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java index 3bfe4ffc44..04fb552e74 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java @@ -38,27 +38,36 @@ public class JobTask implements Writable { private Long jobId; @SerializedName("taskId") private Long taskId; + @SerializedName("createTimeMs") + private Long createTimeMs; @SerializedName("startTimeMs") private Long startTimeMs; @SerializedName("endTimeMs") private Long endTimeMs; @SerializedName("successful") private Boolean isSuccessful; + + @SerializedName("executeSql") + private String executeSql; @SerializedName("executeResult") private String executeResult; @SerializedName("errorMsg") private String errorMsg; - public JobTask(Long jobId) { + public JobTask(Long jobId, Long taskId, Long createTimeMs) { //it's enough to use nanoTime to identify a task - this.taskId = System.nanoTime(); + this.taskId = taskId; this.jobId = jobId; + this.createTimeMs = createTimeMs; } public List<String> getShowInfo() { List<String> row = Lists.newArrayList(); row.add(String.valueOf(jobId)); row.add(String.valueOf(taskId)); + if (null != createTimeMs) { + row.add(TimeUtils.longToTimeString(createTimeMs)); + } row.add(TimeUtils.longToTimeString(startTimeMs)); row.add(null == endTimeMs ? "null" : TimeUtils.longToTimeString(endTimeMs)); if (endTimeMs == null) { @@ -66,6 +75,11 @@ public class JobTask implements Writable { } else { row.add(isSuccessful ? "SUCCESS" : "FAILED"); } + if (null == executeSql) { + row.add("null"); + } else { + row.add(executeSql); + } if (null == executeResult) { row.add("null"); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/TimerJobTask.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/TimerJobTask.java index 655df9467d..b3a199a8a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/TimerJobTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/TimerJobTask.java @@ -23,8 +23,6 @@ import io.netty.util.Timeout; import io.netty.util.TimerTask; import lombok.Getter; -import java.util.UUID; - /** * This class represents a timer task that can be scheduled by a Netty timer. * When the timer task is triggered, it produces a Job task using the Disruptor. @@ -36,16 +34,17 @@ public class TimerJobTask implements TimerTask { private final Long jobId; // more fields should be added here and record in feature - private final Long taskId = UUID.randomUUID().getMostSignificantBits(); + private final Long taskId; private final Long startTimestamp; private final TaskDisruptor taskDisruptor; - public TimerJobTask(Long jobId, Long startTimestamp, TaskDisruptor taskDisruptor) { + public TimerJobTask(Long jobId, Long taskId, Long startTimestamp, TaskDisruptor taskDisruptor) { this.jobId = jobId; this.startTimestamp = startTimestamp; this.taskDisruptor = taskDisruptor; + this.taskId = taskId; } @Override @@ -53,6 +52,6 @@ public class TimerJobTask implements TimerTask { if (timeout.isCancelled()) { return; } - taskDisruptor.tryPublish(jobId); + taskDisruptor.tryPublish(jobId, taskId); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java index e18a143ec7..5117e2d417 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java @@ -31,6 +31,7 @@ import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -44,6 +45,31 @@ public class JobTaskManager implements Writable { private ConcurrentHashMap<Long, ConcurrentLinkedQueue<JobTask>> jobTaskMap = new ConcurrentHashMap<>(16); + + /** + * taskId -> startTime + * used to record the start time of the task to be executed + * will clear when the task is executed + */ + private static ConcurrentHashMap<Long, Map<Long, Long>> prepareTaskCreateMsMap = new ConcurrentHashMap<>(16); + + public static void addPrepareTaskStartTime(Long jobId, Long taskId, Long startTime) { + prepareTaskCreateMsMap.computeIfAbsent(jobId, k -> new HashMap<>()); + prepareTaskCreateMsMap.get(jobId).put(taskId, startTime); + } + + public static Long pollPrepareTaskByTaskId(Long jobId, Long taskId) { + if (!prepareTaskCreateMsMap.containsKey(jobId)) { + // if the job is not in the map, return current time + return System.currentTimeMillis(); + } + return prepareTaskCreateMsMap.get(jobId).remove(taskId); + } + + public static void clearPrepareTaskByJobId(Long jobId) { + prepareTaskCreateMsMap.remove(jobId); + } + public void addJobTask(JobTask jobTask) { ConcurrentLinkedQueue<JobTask> jobTasks = jobTaskMap .computeIfAbsent(jobTask.getJobId(), k -> new ConcurrentLinkedQueue<>()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java index 9000dba470..314b24f052 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java @@ -23,6 +23,7 @@ import org.apache.doris.common.PatternMatcher; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.scheduler.constants.JobCategory; import org.apache.doris.scheduler.constants.JobStatus; import org.apache.doris.scheduler.disruptor.TaskDisruptor; @@ -50,9 +51,7 @@ import java.util.concurrent.TimeUnit; public class TimerJobManager implements Closeable, Writable { private final ConcurrentHashMap<Long, Job> jobMap = new ConcurrentHashMap<>(128); - private long lastBatchSchedulerTimestamp; - private static final long BATCH_SCHEDULER_INTERVAL_SECONDS = 600; /** @@ -72,7 +71,7 @@ public class TimerJobManager implements Closeable, Writable { /** * scheduler tasks, it's used to scheduler job */ - private final HashedWheelTimer dorisTimer = new HashedWheelTimer(1, TimeUnit.SECONDS, 660); + private HashedWheelTimer dorisTimer; /** * Producer and Consumer model @@ -83,8 +82,18 @@ public class TimerJobManager implements Closeable, Writable { private TaskDisruptor disruptor; public TimerJobManager() { - dorisTimer.start(); this.lastBatchSchedulerTimestamp = System.currentTimeMillis(); + } + + public void start() { + dorisTimer = new HashedWheelTimer(1, TimeUnit.SECONDS, 660); + dorisTimer.start(); + Long currentTimeMs = System.currentTimeMillis(); + jobMap.forEach((jobId, job) -> { + Long nextExecuteTimeMs = findFistExecuteTime(currentTimeMs, job.getStartTimeMs(), + job.getIntervalMs(), job.isCycleJob()); + job.setNextExecuteTimeMs(nextExecuteTimeMs); + }); batchSchedulerTasks(); cycleSystemSchedulerTasks(); } @@ -99,6 +108,9 @@ public class TimerJobManager implements Closeable, Writable { } public void replayCreateJob(Job job) { + if (jobMap.containsKey(job.getJobId())) { + return; + } jobMap.putIfAbsent(job.getJobId(), job); initAndSchedulerJob(job); log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId()) @@ -252,6 +264,20 @@ public class TimerJobManager implements Closeable, Writable { Env.getCurrentEnv().getEditLog().logUpdateJob(job); } + public void finishJob(long jobId) { + Job job = jobMap.get(jobId); + if (jobMap.get(jobId) == null) { + log.warn("update job status failed, jobId: {} not exist", jobId); + } + if (jobMap.get(jobId).getJobStatus().equals(JobStatus.FINISHED)) { + return; + } + cancelJobAllTask(job.getJobId()); + job.setJobStatus(JobStatus.FINISHED); + jobMap.get(job.getJobId()).finish(); + Env.getCurrentEnv().getEditLog().logUpdateJob(job); + } + private Optional<Job> findJob(String dbName, String jobName, JobCategory jobCategory) { return jobMap.values().stream().filter(job -> checkJobMatch(job, dbName, jobName, jobCategory)).findFirst(); } @@ -297,11 +323,15 @@ public class TimerJobManager implements Closeable, Writable { } private List<Long> findTasksBetweenTime(Job job, Long endTimeEndWindow, Long nextExecuteTime) { + List<Long> jobExecuteTimes = new ArrayList<>(); if (!job.isCycleJob() && (nextExecuteTime < endTimeEndWindow)) { jobExecuteTimes.add(nextExecuteTime); return jobExecuteTimes; } + if (job.isCycleJob() && (nextExecuteTime > endTimeEndWindow)) { + return new ArrayList<>(); + } while (endTimeEndWindow >= nextExecuteTime) { if (job.isTaskTimeExceeded()) { break; @@ -345,7 +375,12 @@ public class TimerJobManager implements Closeable, Writable { * Jobs will be re-registered after the task is completed */ private void cycleSystemSchedulerTasks() { - dorisTimer.newTimeout(timeout -> batchSchedulerTasks(), BATCH_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS); + log.info("re-register system scheduler tasks" + TimeUtils.longToTimeString(System.currentTimeMillis())); + dorisTimer.newTimeout(timeout -> { + batchSchedulerTasks(); + cycleSystemSchedulerTasks(); + }, BATCH_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS); + } /** @@ -358,11 +393,12 @@ public class TimerJobManager implements Closeable, Writable { * delay seconds, we just can be second precision */ public void putOneTask(Long jobId, Long startExecuteTime) { - TimerJobTask task = new TimerJobTask(jobId, startExecuteTime, disruptor); if (isClosed) { - log.info("putOneTask failed, scheduler is closed, jobId: {}", task.getJobId()); + log.info("putOneTask failed, scheduler is closed, jobId: {}", jobId); return; } + long taskId = System.nanoTime(); + TimerJobTask task = new TimerJobTask(jobId, taskId, startExecuteTime, disruptor); long delay = getDelaySecond(task.getStartTimestamp()); Timeout timeout = dorisTimer.newTimeout(task, delay, TimeUnit.SECONDS); if (timeout == null) { @@ -371,11 +407,13 @@ public class TimerJobManager implements Closeable, Writable { } if (jobTimeoutMap.containsKey(task.getJobId())) { jobTimeoutMap.get(task.getJobId()).put(task.getTaskId(), timeout); + JobTaskManager.addPrepareTaskStartTime(jobId, taskId, startExecuteTime); return; } Map<Long, Timeout> timeoutMap = new ConcurrentHashMap<>(); timeoutMap.put(task.getTaskId(), timeout); jobTimeoutMap.put(task.getJobId(), timeoutMap); + JobTaskManager.addPrepareTaskStartTime(jobId, taskId, startExecuteTime); } // cancel all task for one job @@ -390,14 +428,7 @@ public class TimerJobManager implements Closeable, Writable { timeout.cancel(); } }); - } - - public void stopTask(Long jobId, Long taskId) { - if (!jobTimeoutMap.containsKey(jobId)) { - return; - } - cancelJobAllTask(jobId); - jobTimeoutMap.get(jobId).remove(taskId); + JobTaskManager.clearPrepareTaskByJobId(jobId); } // get delay time, if startTimestamp is less than now, return 0 @@ -452,7 +483,9 @@ public class TimerJobManager implements Closeable, Writable { } public void putOneJobToQueen(Long jobId) { - disruptor.tryPublish(jobId); + long taskId = System.nanoTime(); + JobTaskManager.addPrepareTaskStartTime(jobId, taskId, System.currentTimeMillis()); + disruptor.tryPublish(jobId, taskId); } @Override @@ -473,8 +506,7 @@ public class TimerJobManager implements Closeable, Writable { int size = in.readInt(); for (int i = 0; i < size; i++) { Job job = Job.readFields(in); - jobMap.put(job.getJobId(), job); - initAndSchedulerJob(job); + jobMap.putIfAbsent(job.getJobId(), job); } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TaskDisruptorTest.java b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TaskDisruptorTest.java index e60587701e..878f8c5594 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TaskDisruptorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TaskDisruptorTest.java @@ -19,6 +19,7 @@ package org.apache.doris.scheduler.disruptor; import org.apache.doris.catalog.Env; import org.apache.doris.scheduler.executor.JobExecutor; +import org.apache.doris.scheduler.job.ExecutorResult; import org.apache.doris.scheduler.job.Job; import org.apache.doris.scheduler.manager.TimerJobManager; import org.apache.doris.scheduler.manager.TransientTaskManager; @@ -64,7 +65,7 @@ public class TaskDisruptorTest { timerJobManager.getJob(anyLong); result = job; }}; - taskDisruptor.tryPublish(job.getJobId()); + taskDisruptor.tryPublish(job.getJobId(), 1L); Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> testEventExecuteFlag); Assertions.assertTrue(testEventExecuteFlag); } @@ -72,9 +73,9 @@ public class TaskDisruptorTest { class TestExecutor implements JobExecutor<Boolean> { @Override - public Boolean execute(Job job) { + public ExecutorResult execute(Job job) { testEventExecuteFlag = true; - return true; + return new ExecutorResult(true, true, null, "null"); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java index fd871be962..3e912b8fd8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java @@ -22,6 +22,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.persist.EditLog; import org.apache.doris.scheduler.constants.JobCategory; import org.apache.doris.scheduler.executor.JobExecutor; +import org.apache.doris.scheduler.job.ExecutorResult; import org.apache.doris.scheduler.job.Job; import org.apache.doris.scheduler.manager.TimerJobManager; import org.apache.doris.scheduler.manager.TransientTaskManager; @@ -60,6 +61,7 @@ public class TimerJobManagerTest { TransientTaskManager transientTaskManager = new TransientTaskManager(); TaskDisruptor taskDisruptor = new TaskDisruptor(this.timerJobManager, transientTaskManager); this.timerJobManager.setDisruptor(taskDisruptor); + timerJobManager.start(); } @Test @@ -166,9 +168,9 @@ public class TimerJobManagerTest { class TestExecutor implements JobExecutor<Boolean> { @Override - public Boolean execute(Job job) { + public ExecutorResult execute(Job job) { log.info("test execute count:{}", testExecuteCount.incrementAndGet()); - return true; + return new ExecutorResult<>(true, true, null, ""); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org