This is an automated email from the ASF dual-hosted git repository. zykkk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 08c78a11359 [Feature](Job)Support manual and refactor some execution logic (#26082) 08c78a11359 is described below commit 08c78a11359e3d0c9e6bc89cf32d4d9456c6c4d5 Author: Calvin Kirs <acm_mas...@163.com> AuthorDate: Tue Oct 31 20:35:55 2023 +0800 [Feature](Job)Support manual and refactor some execution logic (#26082) Supports manually triggered JOBs and Tasks Optimize JOB&TASK display logic Refactor the executor to support context passing --- fe/fe-core/src/main/cup/sql_parser.cup | 6 +- .../org/apache/doris/analysis/CreateJobStmt.java | 36 +++++----- .../org/apache/doris/analysis/ShowJobStmt.java | 11 +-- .../org/apache/doris/analysis/ShowJobTaskStmt.java | 4 +- .../main/java/org/apache/doris/catalog/Env.java | 5 +- .../java/org/apache/doris/qe/ShowExecutor.java | 7 +- .../apache/doris/scheduler/constants/JobType.java | 7 +- .../apache/doris/scheduler/constants/TaskType.java | 16 ++++- .../doris/scheduler/disruptor/TaskDisruptor.java | 34 +++++++-- .../doris/scheduler/disruptor/TaskHandler.java | 26 ++++--- .../scheduler/executor/AbstractJobExecutor.java | 54 ++++++++++++++ .../doris/scheduler/executor/JobExecutor.java | 9 +-- .../doris/scheduler/executor/SqlJobExecutor.java | 28 +++----- .../java/org/apache/doris/scheduler/job/Job.java | 82 +++++++++++++--------- .../org/apache/doris/scheduler/job/JobTask.java | 37 +++++++++- .../doris/scheduler/manager/JobTaskManager.java | 17 +++-- .../doris/scheduler/manager/TimerJobManager.java | 75 +++++++++++++++----- .../scheduler/registry/PersistentJobRegister.java | 13 ++++ .../doris/scheduler/registry/TimerJobRegister.java | 5 ++ .../apache/doris/scheduler/disruptor/JobTest.java | 3 +- .../scheduler/disruptor/TaskDisruptorTest.java | 13 +++- .../scheduler/disruptor/TimerJobManagerTest.java | 14 ++-- 22 files changed, 352 insertions(+), 150 deletions(-) diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 80a71b7490d..5933fcffbcb 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -2551,17 +2551,17 @@ resource_desc ::= create_job_stmt ::= KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULER KW_EVERY INTEGER_LITERAL:time_interval ident:time_unit opt_job_starts:startsTime opt_job_ends:endsTime opt_comment:comment KW_DO stmt:executeSql {: - CreateJobStmt stmt = new CreateJobStmt(jobLabel,null,false,time_interval,time_unit, startsTime, endsTime,comment,executeSql); + CreateJobStmt stmt = new CreateJobStmt(jobLabel,"RECURRING",null,time_interval,time_unit, startsTime, endsTime,comment,executeSql); RESULT = stmt; :} | KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULER KW_STREAMING KW_AT STRING_LITERAL:atTime opt_comment:comment KW_DO stmt:executeSql {: - CreateJobStmt stmt = new CreateJobStmt(jobLabel,atTime,true,null,null,null,null,comment,executeSql); + CreateJobStmt stmt = new CreateJobStmt(jobLabel,"STREAMING",atTime,null,null,null,null,comment,executeSql); RESULT = stmt; :} | KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULER KW_AT STRING_LITERAL:atTime opt_comment:comment KW_DO stmt:executeSql {: - CreateJobStmt stmt = new CreateJobStmt(jobLabel,atTime,false,null,null,null,null,comment,executeSql); + CreateJobStmt stmt = new CreateJobStmt(jobLabel,"ONE_TIME",atTime,null,null,null,null,comment,executeSql); RESULT = stmt; :} ; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java index 999d630153d..57f976712c9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java @@ -28,6 +28,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.scheduler.common.IntervalUnit; import org.apache.doris.scheduler.constants.JobCategory; import org.apache.doris.scheduler.constants.JobStatus; +import org.apache.doris.scheduler.constants.JobType; import org.apache.doris.scheduler.executor.SqlJobExecutor; import org.apache.doris.scheduler.job.Job; @@ -41,22 +42,21 @@ import java.util.HashSet; /** * syntax: * CREATE - * [DEFINER = user] - * JOB - * event_name - * ON SCHEDULE schedule - * [COMMENT 'string'] - * DO event_body; + * [DEFINER = user] + * JOB + * event_name + * ON SCHEDULE schedule + * [COMMENT 'string'] + * DO event_body; * schedule: { - * [STREAMING] AT timestamp - * | EVERY interval - * [STARTS timestamp ] - * [ENDS timestamp ] + * [STREAMING] AT timestamp + * | EVERY interval + * [STARTS timestamp ] + * [ENDS timestamp ] * } * interval: - * quantity { DAY | HOUR | MINUTE | - * WEEK | SECOND } - * + * quantity { DAY | HOUR | MINUTE | + * WEEK | SECOND } */ @Slf4j public class CreateJobStmt extends DdlStmt { @@ -90,7 +90,7 @@ public class CreateJobStmt extends DdlStmt { private static HashSet<String> supportStmtClassNamesCache = new HashSet<>(16); - public CreateJobStmt(LabelName labelName, String onceJobStartTimestamp, Boolean isStreamingJob, + public CreateJobStmt(LabelName labelName, String jobTypeName, String onceJobStartTimestamp, Long interval, String intervalTimeUnit, String startsTimeStamp, String endsTimeStamp, String comment, StatementBase doStmt) { this.labelName = labelName; @@ -102,7 +102,8 @@ public class CreateJobStmt extends DdlStmt { this.comment = comment; this.stmt = doStmt; this.job = new Job(); - job.setStreamingJob(isStreamingJob); + JobType jobType = JobType.valueOf(jobTypeName.toUpperCase()); + job.setJobType(jobType); } private String parseExecuteSql(String sql) throws AnalysisException { @@ -136,7 +137,7 @@ public class CreateJobStmt extends DdlStmt { job.setTimezone(timezone); job.setComment(comment); //todo support user define - job.setUser("root"); + job.setUser(ConnectContext.get().getQualifiedUser()); job.setJobStatus(JobStatus.RUNNING); job.setJobCategory(JobCategory.SQL); analyzerSqlStmt(); @@ -172,7 +173,6 @@ public class CreateJobStmt extends DdlStmt { private void analyzerCycleJob() throws UserException { - job.setCycleJob(true); if (null == interval) { throw new AnalysisException("interval is null"); } @@ -214,8 +214,6 @@ public class CreateJobStmt extends DdlStmt { private void analyzerOnceTimeJob() throws UserException { - job.setCycleJob(false); - job.setIntervalMs(0L); long executeAtTimeMillis = TimeUtils.timeStringToLong(onceJobStartTimestamp); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobStmt.java index 7ad3ce343c5..42fb1c508fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobStmt.java @@ -47,19 +47,12 @@ public class ShowJobStmt extends ShowStmt { private static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>() .add("Id") - .add("Db") .add("Name") .add("Definer") - .add("TimeZone") .add("ExecuteType") - .add("ExecuteAt") - .add("ExecuteInterval") - .add("ExecuteIntervalUnit") - .add("Starts") - .add("Ends") + .add("RecurringStrategy") .add("Status") - .add("LastExecuteFinishTime") - .add("ErrorMsg") + .add("lastExecuteTaskStatus") .add("CreateTime") .add("Comment") .build(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java index 118530341ff..db3fb2ef3cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java @@ -41,8 +41,9 @@ public class ShowJobTaskStmt extends ShowStmt { private static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>() - .add("JobId") .add("TaskId") + .add("JobId") + .add("JobName") .add("CreateTime") .add("StartTime") .add("EndTime") @@ -50,6 +51,7 @@ public class ShowJobTaskStmt extends ShowStmt { .add("ExecuteSql") .add("Result") .add("ErrorMsg") + .add("TaskType") .build(); @Getter diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 9383c8b9266..f1cdf47b33f 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -355,6 +355,8 @@ public class Env { private TimerJobManager timerJobManager; private TransientTaskManager transientTaskManager; private JobTaskManager jobTaskManager; + + private TaskDisruptor taskDisruptor; private MasterDaemon labelCleaner; // To clean old LabelInfo, ExportJobInfos private MasterDaemon txnCleaner; // To clean aborted or timeout txns private Daemon feDiskUpdater; // Update fe disk info @@ -629,7 +631,7 @@ public class Env { this.jobTaskManager = new JobTaskManager(); this.timerJobManager = new TimerJobManager(); this.transientTaskManager = new TransientTaskManager(); - TaskDisruptor taskDisruptor = new TaskDisruptor(this.timerJobManager, this.transientTaskManager); + this.taskDisruptor = new TaskDisruptor(this.timerJobManager, this.transientTaskManager); this.timerJobManager.setDisruptor(taskDisruptor); this.transientTaskManager.setDisruptor(taskDisruptor); this.persistentJobRegister = new TimerJobRegister(timerJobManager); @@ -1532,6 +1534,7 @@ public class Env { publishVersionDaemon.start(); // Start txn cleaner txnCleaner.start(); + taskDisruptor.start(); timerJobManager.start(); // Alter getAlterInstance().start(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index 8c72d260876..2aadab78852 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -1428,14 +1428,15 @@ public class ShowExecutor { resultSet = new ShowResultSet(showJobTaskStmt.getMetaData(), rows); return; } - long jobId = jobs.get(0).getJobId(); + Job job = jobs.get(0); + long jobId = job.getJobId(); List<JobTask> jobTasks = Env.getCurrentEnv().getJobTaskManager().getJobTasks(jobId); if (CollectionUtils.isEmpty(jobTasks)) { resultSet = new ShowResultSet(showJobTaskStmt.getMetaData(), rows); return; } - for (JobTask job : jobTasks) { - rows.add(job.getShowInfo()); + for (JobTask jobTask : jobTasks) { + rows.add(jobTask.getShowInfo(job.getJobName())); } resultSet = new ShowResultSet(showJobTaskStmt.getMetaData(), rows); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobType.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobType.java index 4f4467c989a..58f681c4061 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobType.java @@ -29,5 +29,10 @@ public enum JobType { /** * JOB_TYPE_STREAMING is used to identify the streaming job. */ - STREAMING + STREAMING, + + /** + * The job will be executed manually and need to be triggered by the user. + */ + MANUAL } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/TaskType.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/TaskType.java index 525dbe8adaa..996d72584ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/TaskType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/TaskType.java @@ -18,7 +18,19 @@ package org.apache.doris.scheduler.constants; public enum TaskType { - TimerJobTask, + /** + * Usually don't require persistence and are used in various asynchronous tasks, such as export tasks. + * the life cycle of this kind of task is not managed by JOB-scheduler. + */ + TRANSIENT_TASK, - TransientTask + /** + * Tasks generated by scheduled jobs. + */ + SCHEDULER_JOB_TASK, + + /** + * Tasks generated by manual jobs. + */ + MANUAL_JOB_TASK } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java index a8d98831f21..0e3f8e618d7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java @@ -18,7 +18,6 @@ package org.apache.doris.scheduler.disruptor; import org.apache.doris.common.Config; -import org.apache.doris.common.CustomThreadFactory; import org.apache.doris.scheduler.constants.TaskType; import org.apache.doris.scheduler.manager.TimerJobManager; import org.apache.doris.scheduler.manager.TransientTaskManager; @@ -29,6 +28,7 @@ import com.lmax.disruptor.TimeoutException; import com.lmax.disruptor.WorkHandler; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; +import com.lmax.disruptor.util.DaemonThreadFactory; import lombok.extern.slf4j.Slf4j; import java.io.Closeable; @@ -47,10 +47,13 @@ import java.util.concurrent.TimeUnit; @Slf4j public class TaskDisruptor implements Closeable { - private final Disruptor<TaskEvent> disruptor; + private Disruptor<TaskEvent> disruptor; + + private TimerJobManager timerJobManager; + private TransientTaskManager transientTaskManager; private static final int DEFAULT_RING_BUFFER_SIZE = Config.async_task_queen_size; - private static int consumerThreadCount = Config.async_task_consumer_thread_num; + private static final int consumerThreadCount = Config.async_task_consumer_thread_num; /** * The default timeout for {@link #close()} in seconds. @@ -75,7 +78,12 @@ public class TaskDisruptor implements Closeable { }; public TaskDisruptor(TimerJobManager timerJobManager, TransientTaskManager transientTaskManager) { - ThreadFactory producerThreadFactory = new CustomThreadFactory("task-disruptor-producer"); + this.timerJobManager = timerJobManager; + this.transientTaskManager = transientTaskManager; + } + + public void start() { + ThreadFactory producerThreadFactory = DaemonThreadFactory.INSTANCE; disruptor = new Disruptor<>(TaskEvent.FACTORY, DEFAULT_RING_BUFFER_SIZE, producerThreadFactory, ProducerType.SINGLE, new BlockingWaitStrategy()); WorkHandler<TaskEvent>[] workers = new TaskHandler[consumerThreadCount]; @@ -88,16 +96,29 @@ public class TaskDisruptor implements Closeable { /** * Publishes a job to the disruptor. + * Default task type is {@link TaskType#SCHEDULER_JOB_TASK} * * @param jobId job id */ public void tryPublish(Long jobId, Long taskId) { + this.tryPublish(jobId, taskId, TaskType.SCHEDULER_JOB_TASK); + } + + + /** + * Publishes a job task to the disruptor. + * + * @param jobId job id, describe which job this task belongs to + * @param taskId task id, it's linked to job id, we can get job detail by task id + * @param taskType {@link TaskType} + */ + public void tryPublish(Long jobId, Long taskId, TaskType taskType) { if (isClosed) { log.info("tryPublish failed, disruptor is closed, jobId: {}", jobId); return; } try { - disruptor.publishEvent(TRANSLATOR, jobId, taskId, TaskType.TimerJobTask); + disruptor.publishEvent(TRANSLATOR, jobId, taskId, taskType); } catch (Exception e) { log.error("tryPublish failed, jobId: {}", jobId, e); } @@ -105,6 +126,7 @@ public class TaskDisruptor implements Closeable { /** * Publishes a task to the disruptor. + * Default task type is {@link TaskType#TRANSIENT_TASK} * * @param taskId task id */ @@ -114,7 +136,7 @@ public class TaskDisruptor implements Closeable { return; } try { - disruptor.publishEvent(TRANSLATOR, taskId, 0L, TaskType.TransientTask); + disruptor.publishEvent(TRANSLATOR, taskId, 0L, TaskType.TRANSIENT_TASK); } catch (Exception e) { log.error("tryPublish failed, taskId: {}", taskId, e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java index 9daf3b7ee45..6005fa1bd40 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java @@ -19,6 +19,7 @@ package org.apache.doris.scheduler.disruptor; import org.apache.doris.catalog.Env; import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.scheduler.constants.JobType; import org.apache.doris.scheduler.exception.JobException; import org.apache.doris.scheduler.executor.TransientTaskExecutor; import org.apache.doris.scheduler.job.ExecutorResult; @@ -70,13 +71,15 @@ public class TaskHandler implements WorkHandler<TaskEvent> { @Override public void onEvent(TaskEvent event) { switch (event.getTaskType()) { - case TimerJobTask: + case SCHEDULER_JOB_TASK: + case MANUAL_JOB_TASK: onTimerJobTaskHandle(event); break; - case TransientTask: + case TRANSIENT_TASK: onTransientTaskHandle(event); break; default: + log.warn("unknown task type: {}", event.getTaskType()); break; } } @@ -90,7 +93,11 @@ public class TaskHandler implements WorkHandler<TaskEvent> { public void onTimerJobTaskHandle(TaskEvent taskEvent) { long jobId = taskEvent.getId(); long taskId = taskEvent.getTaskId(); - long createTimeMs = jobTaskManager.pollPrepareTaskByTaskId(jobId, taskId); + JobTask jobTask = jobTaskManager.pollPrepareTaskByTaskId(jobId, taskId); + if (jobTask == null) { + log.warn("jobTask is null, maybe it's cancel, jobId: {}, taskId: {}", jobId, taskId); + return; + } Job job = timerJobManager.getJob(jobId); if (job == null) { log.info("job is null, jobId: {}", jobId); @@ -102,12 +109,12 @@ public class TaskHandler implements WorkHandler<TaskEvent> { } log.debug("job is running, eventJobId: {}", jobId); - JobTask jobTask = new JobTask(jobId, taskId, createTimeMs); + try { jobTask.setStartTimeMs(System.currentTimeMillis()); - ExecutorResult result = job.getExecutor().execute(job); + ExecutorResult result = job.getExecutor().execute(job, jobTask.getContextData()); job.setLatestCompleteExecuteTimeMs(System.currentTimeMillis()); - if (job.isCycleJob()) { + if (job.getJobType().equals(JobType.RECURRING)) { updateJobStatusIfPastEndTime(job); } else { // one time job should be finished after execute @@ -117,7 +124,7 @@ public class TaskHandler implements WorkHandler<TaskEvent> { log.warn("Job execute failed, jobId: {}, result is null", jobId); jobTask.setErrorMsg("Job execute failed, result is null"); jobTask.setIsSuccessful(false); - timerJobManager.pauseJob(jobId); + timerJobManager.setJobLatestStatus(jobId, false); return; } String resultStr = GsonUtils.GSON.toJson(result.getResult()); @@ -126,14 +133,12 @@ public class TaskHandler implements WorkHandler<TaskEvent> { if (!result.isSuccess()) { log.warn("Job execute failed, jobId: {}, msg : {}", jobId, result.getExecutorSql()); jobTask.setErrorMsg(result.getErrorMsg()); - timerJobManager.pauseJob(jobId); } jobTask.setExecuteSql(result.getExecutorSql()); } catch (Exception e) { log.warn("Job execute failed, jobId: {}, msg : {}", jobId, e.getMessage()); jobTask.setErrorMsg(e.getMessage()); jobTask.setIsSuccessful(false); - timerJobManager.pauseJob(jobId); } jobTask.setEndTimeMs(System.currentTimeMillis()); if (null == jobTaskManager) { @@ -141,6 +146,7 @@ public class TaskHandler implements WorkHandler<TaskEvent> { } boolean isPersistent = job.getJobCategory().isPersistent(); jobTaskManager.addJobTask(jobTask, isPersistent); + timerJobManager.setJobLatestStatus(jobId, jobTask.getIsSuccessful()); } public void onTransientTaskHandle(TaskEvent taskEvent) { @@ -165,7 +171,7 @@ public class TaskHandler implements WorkHandler<TaskEvent> { } private void updateOnceTimeJobStatus(Job job) { - if (job.isStreamingJob()) { + if (job.getJobType().equals(JobType.STREAMING)) { timerJobManager.putOneJobToQueen(job.getJobId()); return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/AbstractJobExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/AbstractJobExecutor.java new file mode 100644 index 00000000000..7a70c963bd1 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/AbstractJobExecutor.java @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.scheduler.executor; + +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Env; +import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.scheduler.job.Job; +import org.apache.doris.thrift.TUniqueId; + +import lombok.Getter; + +import java.util.UUID; + +@Getter +public abstract class AbstractJobExecutor<T, C> implements JobExecutor<T, C> { + + protected ConnectContext createContext(Job job) { + ConnectContext ctx = new ConnectContext(); + ctx.setEnv(Env.getCurrentEnv()); + ctx.setCluster(ClusterNamespace.getClusterNameFromFullName(job.getDbName())); + ctx.setDatabase(job.getDbName()); + ctx.setQualifiedUser(job.getUser()); + ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(job.getUser(), "%")); + ctx.getState().reset(); + ctx.setThreadLocalInfo(); + return ctx; + } + + protected String generateTaskId() { + return UUID.randomUUID().toString(); + } + + protected TUniqueId generateQueryId(String taskIdString) { + UUID taskId = UUID.fromString(taskIdString); + return new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java index a6f2e10306f..40aebc8f6ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java @@ -28,18 +28,19 @@ import org.apache.doris.scheduler.job.Job; * We use Gson to serialize and deserialize JobExecutor. so the implementation of JobExecutor needs to be serializable. * You can see @org.apache.doris.persist.gson.GsonUtils.java for details.When you implement JobExecutor,pls make sure * you can serialize and deserialize it. - * - * @param <T> The result type of the event job execution. */ @FunctionalInterface -public interface JobExecutor<T> { +public interface JobExecutor<T, C> { /** * Executes the event job and returns the result. * Exceptions will be caught internally, so there is no need to define or throw them separately. * + * @param job The event job to execute. + * @param dataContext The data context of the event job. if you need to pass parameters to the event job, + * you can use it. * @return The result of the event job execution. */ - ExecutorResult execute(Job job) throws JobException; + ExecutorResult<T> execute(Job job, C dataContext) throws JobException; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java index 3df2a6fd9a2..546eac9a768 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java @@ -17,9 +17,6 @@ package org.apache.doris.scheduler.executor; -import org.apache.doris.analysis.UserIdentity; -import org.apache.doris.catalog.Env; -import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.scheduler.exception.JobException; @@ -32,15 +29,15 @@ import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; -import java.util.UUID; +import java.util.Map; /** * we use this executor to execute sql job */ +@Getter @Slf4j -public class SqlJobExecutor implements JobExecutor { +public class SqlJobExecutor extends AbstractJobExecutor<String, Map<String, Object>> { - @Getter @Setter @SerializedName(value = "sql") private String sql; @@ -50,24 +47,14 @@ public class SqlJobExecutor implements JobExecutor { } @Override - public ExecutorResult<String> execute(Job job) throws JobException { - ConnectContext ctx = new ConnectContext(); - ctx.setEnv(Env.getCurrentEnv()); - ctx.setCluster(ClusterNamespace.getClusterNameFromFullName(job.getDbName())); - ctx.setDatabase(job.getDbName()); - ctx.setQualifiedUser(job.getUser()); - ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(job.getUser(), "%")); - ctx.getState().reset(); - ctx.setThreadLocalInfo(); - String taskIdString = UUID.randomUUID().toString(); - UUID taskId = UUID.fromString(taskIdString); - TUniqueId queryId = new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits()); - ctx.setQueryId(queryId); + public ExecutorResult<String> execute(Job job, Map<String, Object> dataContext) throws JobException { + ConnectContext ctx = createContext(job); + String taskIdString = generateTaskId(); + TUniqueId queryId = generateQueryId(taskIdString); try { StmtExecutor executor = new StmtExecutor(ctx, sql); executor.execute(queryId); String result = convertExecuteResult(ctx, taskIdString); - return new ExecutorResult<>(result, true, null, sql); } catch (Exception e) { log.warn("execute sql job failed, job id :{}, sql: {}, error: {}", job.getJobId(), sql, e); @@ -88,4 +75,5 @@ public class SqlJobExecutor implements JobExecutor { return "queryId:" + queryId + ",affectedRows : " + ctx.getState().getAffectedRows() + ", warningRows: " + ctx.getState().getWarningRows() + ",infoMsg" + ctx.getState().getInfoMessage(); } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java index 6d39f1cd8d9..7f3fd0884b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java @@ -80,6 +80,9 @@ public class Job implements Writable { @SerializedName("jobStatus") private JobStatus jobStatus; + @SerializedName("jobType") + private JobType jobType = JobType.RECURRING; + /** * The executor of the job. * @@ -93,12 +96,6 @@ public class Job implements Writable { @SerializedName("user") private String user; - @SerializedName("isCycleJob") - private boolean isCycleJob = false; - - @SerializedName("isStreamingJob") - private boolean isStreamingJob = false; - @SerializedName("intervalMs") private Long intervalMs = 0L; @SerializedName("startTimeMs") @@ -129,6 +126,8 @@ public class Job implements Writable { @SerializedName("createTimeMs") private Long createTimeMs = System.currentTimeMillis(); + private Boolean lastExecuteTaskStatus; + @SerializedName("comment") private String comment; @@ -206,6 +205,18 @@ public class Job implements Writable { } public void checkJobParam() throws DdlException { + if (null == jobCategory) { + throw new DdlException("jobCategory must be set"); + } + if (null == executor) { + throw new DdlException("Job executor must be set"); + } + if (null == jobType) { + throw new DdlException("Job type must be set"); + } + if (jobType.equals(JobType.MANUAL)) { + return; + } if (startTimeMs != 0L && startTimeMs < System.currentTimeMillis()) { throw new DdlException("startTimeMs must be greater than current time"); } @@ -221,15 +232,10 @@ public class Job implements Writable { if (null != intervalUnit && null != originInterval) { this.intervalMs = intervalUnit.getParameterValue(originInterval); } - if (isCycleJob && (intervalMs == null || intervalMs <= 0L)) { + if (jobType.equals(JobType.RECURRING) && (intervalMs == null || intervalMs <= 0L)) { throw new DdlException("cycle job must set intervalMs"); } - if (null == jobCategory) { - throw new DdlException("jobCategory must be set"); - } - if (null == executor) { - throw new DdlException("Job executor must be set"); - } + } @@ -246,33 +252,41 @@ public class Job implements Writable { public List<String> getShowInfo() { List<String> row = Lists.newArrayList(); row.add(String.valueOf(jobId)); - row.add(dbName); - if (jobCategory.equals(JobCategory.MTMV)) { - row.add(baseName); - } row.add(jobName); row.add(user); - row.add(timezone); - if (isCycleJob) { - row.add(JobType.RECURRING.name()); - } else { - if (isStreamingJob) { - row.add(JobType.STREAMING.name()); - } else { - row.add(JobType.ONE_TIME.name()); - } - } - row.add(isCycleJob ? "null" : TimeUtils.longToTimeString(startTimeMs)); - row.add(isCycleJob ? originInterval.toString() : "null"); - row.add(isCycleJob ? intervalUnit.name() : "null"); - row.add(isCycleJob && startTimeMs > 0 ? TimeUtils.longToTimeString(startTimeMs) : "null"); - row.add(isCycleJob && endTimeMs > 0 ? TimeUtils.longToTimeString(endTimeMs) : "null"); + row.add(jobType.name()); + + row.add(convertRecurringStrategyToString()); row.add(jobStatus.name()); - row.add(latestCompleteExecuteTimeMs <= 0L ? "null" : TimeUtils.longToTimeString(latestCompleteExecuteTimeMs)); - row.add(errMsg == null ? "null" : errMsg); + row.add(null == lastExecuteTaskStatus ? "null" : lastExecuteTaskStatus.toString()); row.add(createTimeMs <= 0L ? "null" : TimeUtils.longToTimeString(createTimeMs)); row.add(comment == null ? "null" : comment); return row; } + private String convertRecurringStrategyToString() { + if (jobType.equals(JobType.MANUAL)) { + return "MANUAL TRIGGER"; + } + switch (jobType) { + case ONE_TIME: + return "AT " + TimeUtils.longToTimeString(startTimeMs); + case RECURRING: + String result = "EVERY " + originInterval + " " + intervalUnit.name(); + if (startTimeMs > 0) { + result += " STARTS " + TimeUtils.longToTimeString(startTimeMs); + } + if (endTimeMs > 0) { + result += " ENDS " + TimeUtils.longToTimeString(endTimeMs); + } + return result; + case STREAMING: + return "STREAMING" + (startTimeMs > 0 ? " AT " + TimeUtils.longToTimeString(startTimeMs) : ""); + case MANUAL: + return "MANUAL TRIGGER"; + default: + return "UNKNOWN"; + } + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java index 04fb552e74b..1f8aac58285 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java @@ -21,10 +21,12 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.scheduler.constants.TaskType; import com.google.common.collect.Lists; import com.google.gson.annotations.SerializedName; import lombok.Data; +import lombok.extern.slf4j.Slf4j; import java.io.DataInput; import java.io.DataOutput; @@ -32,7 +34,8 @@ import java.io.IOException; import java.util.List; @Data -public class JobTask implements Writable { +@Slf4j +public class JobTask<T> implements Writable { @SerializedName("jobId") private Long jobId; @@ -54,6 +57,21 @@ public class JobTask implements Writable { @SerializedName("errorMsg") private String errorMsg; + @SerializedName("contextDataStr") + private String contextDataStr; + + @SerializedName("taskType") + private TaskType taskType = TaskType.SCHEDULER_JOB_TASK; + + /** + * Some parameters specific to the current task that need to be used to execute the task + * eg: sql task, sql it's: select * from table where id = 1 order by id desc limit ${limit} offset ${offset} + * contextData is a map, key1 is limit, value is 10,key2 is offset, value is 1 + * when execute the task, we will replace the ${limit} to 10, ${offset} to 1 + * so to execute sql is: select * from table where id = 1 order by id desc limit 10 offset 1. + */ + private T contextData; + public JobTask(Long jobId, Long taskId, Long createTimeMs) { //it's enough to use nanoTime to identify a task this.taskId = taskId; @@ -61,10 +79,22 @@ public class JobTask implements Writable { this.createTimeMs = createTimeMs; } - public List<String> getShowInfo() { + public JobTask(Long jobId, Long taskId, Long createTimeMs, T contextData) { + this(jobId, taskId, createTimeMs); + this.contextData = contextData; + try { + this.contextDataStr = GsonUtils.GSON.toJson(contextData); + } catch (Exception e) { + this.contextDataStr = null; + log.error("contextData serialize failed, jobId: {}, taskId: {}", jobId, taskId, e); + } + } + + public List<String> getShowInfo(String jobName) { List<String> row = Lists.newArrayList(); - row.add(String.valueOf(jobId)); row.add(String.valueOf(taskId)); + row.add(String.valueOf(jobId)); + row.add(jobName); if (null != createTimeMs) { row.add(TimeUtils.longToTimeString(createTimeMs)); } @@ -90,6 +120,7 @@ public class JobTask implements Writable { } else { row.add(errorMsg); } + row.add(taskType.name()); return row; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java index 7c739ba4603..6f7cd839074 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java @@ -51,17 +51,20 @@ public class JobTaskManager implements Writable { * used to record the start time of the task to be executed * will clear when the task is executed */ - private static ConcurrentHashMap<Long, Map<Long, Long>> prepareTaskCreateMsMap = new ConcurrentHashMap<>(16); + private static ConcurrentHashMap<Long, Map<Long, JobTask>> prepareTaskCreateMsMap = new ConcurrentHashMap<>(16); - public static void addPrepareTaskStartTime(Long jobId, Long taskId, Long startTime) { + public static void addPrepareTask(JobTask jobTask) { + long jobId = jobTask.getJobId(); + long taskId = jobTask.getTaskId(); prepareTaskCreateMsMap.computeIfAbsent(jobId, k -> new HashMap<>()); - prepareTaskCreateMsMap.get(jobId).put(taskId, startTime); + prepareTaskCreateMsMap.get(jobId).put(taskId, jobTask); } - public static Long pollPrepareTaskByTaskId(Long jobId, Long taskId) { - if (!prepareTaskCreateMsMap.containsKey(jobId)) { - // if the job is not in the map, return current time - return System.currentTimeMillis(); + public static JobTask pollPrepareTaskByTaskId(Long jobId, Long taskId) { + if (!prepareTaskCreateMsMap.containsKey(jobId) || !prepareTaskCreateMsMap.get(jobId).containsKey(taskId)) { + // if the job is not in the map, return new JobTask + // return new JobTask(jobId, taskId, System.currentTimeMillis()); fixme + return null; } return prepareTaskCreateMsMap.get(jobId).remove(taskId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java index c7a728cf049..33ac7d5f940 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java @@ -19,7 +19,6 @@ package org.apache.doris.scheduler.manager; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; -import org.apache.doris.common.CustomThreadFactory; import org.apache.doris.common.DdlException; import org.apache.doris.common.PatternMatcher; import org.apache.doris.common.io.Writable; @@ -28,8 +27,11 @@ import org.apache.doris.common.util.LogKey; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.scheduler.constants.JobCategory; import org.apache.doris.scheduler.constants.JobStatus; +import org.apache.doris.scheduler.constants.JobType; +import org.apache.doris.scheduler.constants.TaskType; import org.apache.doris.scheduler.disruptor.TaskDisruptor; import org.apache.doris.scheduler.job.Job; +import org.apache.doris.scheduler.job.JobTask; import org.apache.doris.scheduler.job.TimerJobTask; import io.netty.util.HashedWheelTimer; @@ -88,13 +90,12 @@ public class TimerJobManager implements Closeable, Writable { } public void start() { - dorisTimer = new HashedWheelTimer(new CustomThreadFactory("hashed-wheel-timer"), - 1, TimeUnit.SECONDS, 660); + dorisTimer = new HashedWheelTimer(1, TimeUnit.SECONDS, 660); dorisTimer.start(); Long currentTimeMs = System.currentTimeMillis(); jobMap.forEach((jobId, job) -> { Long nextExecuteTimeMs = findFistExecuteTime(currentTimeMs, job.getStartTimeMs(), - job.getIntervalMs(), job.isCycleJob()); + job.getIntervalMs(), job.getJobType()); job.setNextExecuteTimeMs(nextExecuteTimeMs); }); batchSchedulerTasks(); @@ -152,18 +153,18 @@ public class TimerJobManager implements Closeable, Writable { } private void initAndSchedulerJob(Job job) { - if (!job.getJobStatus().equals(JobStatus.RUNNING)) { + if (!job.getJobStatus().equals(JobStatus.RUNNING) || job.getJobType().equals(JobType.MANUAL)) { return; } Long currentTimeMs = System.currentTimeMillis(); Long nextExecuteTimeMs = findFistExecuteTime(currentTimeMs, job.getStartTimeMs(), - job.getIntervalMs(), job.isCycleJob()); + job.getIntervalMs(), job.getJobType()); job.setNextExecuteTimeMs(nextExecuteTimeMs); if (job.getNextExecuteTimeMs() < lastBatchSchedulerTimestamp) { List<Long> executeTimestamp = findTasksBetweenTime(job, lastBatchSchedulerTimestamp, - job.getNextExecuteTimeMs()); + job.getNextExecuteTimeMs(), job.getJobType()); if (!executeTimestamp.isEmpty()) { for (Long timestamp : executeTimestamp) { putOneTask(job.getJobId(), timestamp); @@ -172,7 +173,7 @@ public class TimerJobManager implements Closeable, Writable { } } - private Long findFistExecuteTime(Long currentTimeMs, Long startTimeMs, Long intervalMs, boolean isCycleJob) { + private Long findFistExecuteTime(Long currentTimeMs, Long startTimeMs, Long intervalMs, JobType jobType) { // if job not delay, first execute time is start time if (startTimeMs != 0L && startTimeMs > currentTimeMs) { return startTimeMs; @@ -182,13 +183,30 @@ public class TimerJobManager implements Closeable, Writable { return currentTimeMs; } // if it's cycle job and not set start tine, first execute time is current time + interval - if (isCycleJob && startTimeMs == 0L) { + if (jobType.equals(JobType.RECURRING) && startTimeMs == 0L) { return currentTimeMs + intervalMs; } // if it's not cycle job and already delay, first execute time is current time return currentTimeMs; } + public <T> boolean immediateExecuteTask(Long jobId, T taskContextData) throws DdlException { + Job job = jobMap.get(jobId); + if (job == null) { + log.warn("immediateExecuteTask failed, jobId: {} not exist", jobId); + return false; + } + if (!job.getJobStatus().equals(JobStatus.RUNNING)) { + log.warn("immediateExecuteTask failed, jobId: {} is not running", jobId); + return false; + } + JobTask jobTask = createInitialTask(jobId, taskContextData); + jobTask.setTaskType(TaskType.MANUAL_JOB_TASK); + JobTaskManager.addPrepareTask(jobTask); + disruptor.tryPublish(jobId, jobTask.getTaskId(), TaskType.MANUAL_JOB_TASK); + return true; + } + public void unregisterJob(Long jobId) { jobMap.remove(jobId); } @@ -204,6 +222,14 @@ public class TimerJobManager implements Closeable, Writable { pauseJob(job); } + public void setJobLatestStatus(long jobId, boolean status) { + Job job = jobMap.get(jobId); + if (jobMap.get(jobId) == null) { + log.warn("pauseJob failed, jobId: {} not exist", jobId); + } + job.setLastExecuteTaskStatus(status); + } + public void stopJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException { Optional<Job> optionalJob = findJob(dbName, jobName, jobCategory); @@ -326,14 +352,14 @@ public class TimerJobManager implements Closeable, Writable { executeJobIdsWithinLastTenMinutesWindow(); } - private List<Long> findTasksBetweenTime(Job job, Long endTimeEndWindow, Long nextExecuteTime) { + private List<Long> findTasksBetweenTime(Job job, Long endTimeEndWindow, Long nextExecuteTime, JobType jobType) { List<Long> jobExecuteTimes = new ArrayList<>(); - if (!job.isCycleJob() && (nextExecuteTime < endTimeEndWindow)) { + if (!jobType.equals(JobType.RECURRING) && (nextExecuteTime < endTimeEndWindow)) { jobExecuteTimes.add(nextExecuteTime); return jobExecuteTimes; } - if (job.isCycleJob() && (nextExecuteTime > endTimeEndWindow)) { + if (jobType.equals(JobType.RECURRING) && (nextExecuteTime > endTimeEndWindow)) { return new ArrayList<>(); } while (endTimeEndWindow >= nextExecuteTime) { @@ -360,11 +386,11 @@ public class TimerJobManager implements Closeable, Writable { return; } jobMap.forEach((k, v) -> { - if (v.isRunning() && (v.getNextExecuteTimeMs() + if (!v.getJobType().equals(JobType.MANUAL) && v.isRunning() && (v.getNextExecuteTimeMs() + v.getIntervalMs() < lastBatchSchedulerTimestamp)) { List<Long> executeTimes = findTasksBetweenTime( v, lastBatchSchedulerTimestamp, - v.getNextExecuteTimeMs()); + v.getNextExecuteTimeMs(), v.getJobType()); if (!executeTimes.isEmpty()) { for (Long executeTime : executeTimes) { putOneTask(v.getJobId(), executeTime); @@ -402,7 +428,8 @@ public class TimerJobManager implements Closeable, Writable { log.info("putOneTask failed, scheduler is closed, jobId: {}", jobId); return; } - long taskId = System.nanoTime(); + JobTask jobTask = createAsyncInitialTask(jobId, startExecuteTime); + long taskId = jobTask.getTaskId(); TimerJobTask task = new TimerJobTask(jobId, taskId, startExecuteTime, disruptor); long delay = getDelaySecond(task.getStartTimestamp()); Timeout timeout = dorisTimer.newTimeout(task, delay, TimeUnit.SECONDS); @@ -412,13 +439,13 @@ public class TimerJobManager implements Closeable, Writable { } if (jobTimeoutMap.containsKey(task.getJobId())) { jobTimeoutMap.get(task.getJobId()).put(task.getTaskId(), timeout); - JobTaskManager.addPrepareTaskStartTime(jobId, taskId, startExecuteTime); + JobTaskManager.addPrepareTask(jobTask); return; } Map<Long, Timeout> timeoutMap = new ConcurrentHashMap<>(); timeoutMap.put(task.getTaskId(), timeout); jobTimeoutMap.put(task.getJobId(), timeoutMap); - JobTaskManager.addPrepareTaskStartTime(jobId, taskId, startExecuteTime); + JobTaskManager.addPrepareTask(jobTask); } // cancel all task for one job @@ -488,9 +515,19 @@ public class TimerJobManager implements Closeable, Writable { } public void putOneJobToQueen(Long jobId) { + JobTask jobTask = createInitialTask(jobId, null); + JobTaskManager.addPrepareTask(jobTask); + disruptor.tryPublish(jobId, jobTask.getTaskId()); + } + + private JobTask createAsyncInitialTask(long jobId, long createTimeMs) { + long taskId = System.nanoTime(); + return new JobTask(jobId, taskId, createTimeMs); + } + + private <T> JobTask createInitialTask(long jobId, T context) { long taskId = System.nanoTime(); - JobTaskManager.addPrepareTaskStartTime(jobId, taskId, System.currentTimeMillis()); - disruptor.tryPublish(jobId, taskId); + return new JobTask(jobId, taskId, System.currentTimeMillis(), context); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/PersistentJobRegister.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/PersistentJobRegister.java index 4ee17bb8df0..f1a901299f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/PersistentJobRegister.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/PersistentJobRegister.java @@ -110,6 +110,19 @@ public interface PersistentJobRegister { Long registerJob(Job job) throws DdlException; + + /** + * execute job task immediately,this method will not change job status and don't affect scheduler job + * this task type should set to {@link org.apache.doris.scheduler.constants.TaskType#MANUAL_JOB_TASK} + * + * @param jobId job id + * @param contextData if you need to pass parameters to the task, + * @param <T> context data type + * @return true if execute success, false if execute failed, + * if job is not exist or job is not running, or job not support manual execute, return false + */ + <T> boolean immediateExecuteTask(Long jobId, T contextData) throws DdlException; + List<Job> getJobs(String dbFullName, String jobName, JobCategory jobCategory, PatternMatcher matcher); /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/TimerJobRegister.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/TimerJobRegister.java index 5ce802eff72..f8ab59e5d54 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/TimerJobRegister.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/TimerJobRegister.java @@ -68,6 +68,11 @@ public class TimerJobRegister implements PersistentJobRegister { return timerJobManager.registerJob(job); } + @Override + public <T> boolean immediateExecuteTask(Long jobId, T data) throws DdlException { + return timerJobManager.immediateExecuteTask(jobId, data); + } + @Override public void pauseJob(Long jobId) { timerJobManager.pauseJob(jobId); diff --git a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/JobTest.java b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/JobTest.java index ad5f740907c..57fbfda6cf9 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/JobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/JobTest.java @@ -19,6 +19,7 @@ package org.apache.doris.scheduler.disruptor; import org.apache.doris.scheduler.common.IntervalUnit; import org.apache.doris.scheduler.constants.JobCategory; +import org.apache.doris.scheduler.constants.JobType; import org.apache.doris.scheduler.executor.SqlJobExecutor; import org.apache.doris.scheduler.job.Job; @@ -42,7 +43,7 @@ public class JobTest { public static void init() { SqlJobExecutor sqlJobExecutor = new SqlJobExecutor("insert into test values(1);"); job = new Job("insertTest", 1000L, System.currentTimeMillis(), System.currentTimeMillis() + 100000, sqlJobExecutor); - job.setCycleJob(true); + job.setJobType(JobType.RECURRING); job.setComment("test"); job.setOriginInterval(10L); job.setIntervalUnit(IntervalUnit.SECOND); diff --git a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TaskDisruptorTest.java b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TaskDisruptorTest.java index 1036285c47b..b8482b11fff 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TaskDisruptorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TaskDisruptorTest.java @@ -19,9 +19,12 @@ package org.apache.doris.scheduler.disruptor; import org.apache.doris.catalog.Env; import org.apache.doris.scheduler.constants.JobCategory; +import org.apache.doris.scheduler.exception.JobException; import org.apache.doris.scheduler.executor.JobExecutor; import org.apache.doris.scheduler.job.ExecutorResult; import org.apache.doris.scheduler.job.Job; +import org.apache.doris.scheduler.job.JobTask; +import org.apache.doris.scheduler.manager.JobTaskManager; import org.apache.doris.scheduler.manager.TimerJobManager; import org.apache.doris.scheduler.manager.TransientTaskManager; @@ -56,26 +59,30 @@ public class TaskDisruptorTest { @BeforeEach public void init() { taskDisruptor = new TaskDisruptor(timerJobManager, transientTaskManager); + taskDisruptor.start(); } @Test void testPublishEventAndConsumer() { Job job = new Job("test", 6000L, null, null, new TestExecutor()); + JobTask jobTask = new JobTask(job.getJobId(), 1L, System.currentTimeMillis()); + JobTaskManager.addPrepareTask(jobTask); job.setJobCategory(JobCategory.COMMON); new Expectations() {{ timerJobManager.getJob(anyLong); result = job; }}; taskDisruptor.tryPublish(job.getJobId(), 1L); - Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> testEventExecuteFlag); + Awaitility.await().atMost(3, TimeUnit.SECONDS).until(() -> testEventExecuteFlag); Assertions.assertTrue(testEventExecuteFlag); } - class TestExecutor implements JobExecutor<Boolean> { + class TestExecutor implements JobExecutor<Boolean, String> { + @Override - public ExecutorResult execute(Job job) { + public ExecutorResult<Boolean> execute(Job job, String dataContext) throws JobException { testEventExecuteFlag = true; return new ExecutorResult(true, true, null, "null"); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java index 3e912b8fd85..5fcf242fd13 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java @@ -21,9 +21,12 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.DdlException; import org.apache.doris.persist.EditLog; import org.apache.doris.scheduler.constants.JobCategory; +import org.apache.doris.scheduler.constants.JobType; +import org.apache.doris.scheduler.exception.JobException; import org.apache.doris.scheduler.executor.JobExecutor; import org.apache.doris.scheduler.job.ExecutorResult; import org.apache.doris.scheduler.job.Job; +import org.apache.doris.scheduler.job.JobTask; import org.apache.doris.scheduler.manager.TimerJobManager; import org.apache.doris.scheduler.manager.TransientTaskManager; @@ -51,16 +54,18 @@ public class TimerJobManagerTest { private static AtomicInteger testExecuteCount = new AtomicInteger(0); Job job = new Job("test", 6000L, null, null, new TestExecutor()); + JobTask jobTask = new JobTask(job.getJobId(), 1L, System.currentTimeMillis()); @BeforeEach public void init() { - job.setCycleJob(true); + job.setJobType(JobType.RECURRING); job.setJobCategory(JobCategory.COMMON); testExecuteCount.set(0); timerJobManager = new TimerJobManager(); TransientTaskManager transientTaskManager = new TransientTaskManager(); TaskDisruptor taskDisruptor = new TaskDisruptor(this.timerJobManager, transientTaskManager); this.timerJobManager.setDisruptor(taskDisruptor); + taskDisruptor.start(); timerJobManager.start(); } @@ -153,7 +158,7 @@ public class TimerJobManagerTest { long startTimestamp = System.currentTimeMillis() + 3000L; job.setIntervalMs(0L); job.setStartTimeMs(startTimestamp); - job.setCycleJob(false); + job.setJobType(JobType.ONE_TIME); timerJobManager.registerJob(job); //consider the time of the first execution and give some buffer time Awaitility.await().atMost(14, TimeUnit.SECONDS).until(() -> System.currentTimeMillis() @@ -166,9 +171,10 @@ public class TimerJobManagerTest { timerJobManager.close(); } - class TestExecutor implements JobExecutor<Boolean> { + class TestExecutor implements JobExecutor<Boolean, String> { + @Override - public ExecutorResult execute(Job job) { + public ExecutorResult<Boolean> execute(Job job, String dataContext) throws JobException { log.info("test execute count:{}", testExecuteCount.incrementAndGet()); return new ExecutorResult<>(true, true, null, ""); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org