This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 08c78a11359 [Feature](Job)Support manual and refactor some execution 
logic (#26082)
08c78a11359 is described below

commit 08c78a11359e3d0c9e6bc89cf32d4d9456c6c4d5
Author: Calvin Kirs <acm_mas...@163.com>
AuthorDate: Tue Oct 31 20:35:55 2023 +0800

    [Feature](Job)Support manual and refactor some execution logic (#26082)
    
    Supports manually triggered JOBs and Tasks
    Optimize JOB&TASK display logic
    Refactor the executor to support context passing
---
 fe/fe-core/src/main/cup/sql_parser.cup             |  6 +-
 .../org/apache/doris/analysis/CreateJobStmt.java   | 36 +++++-----
 .../org/apache/doris/analysis/ShowJobStmt.java     | 11 +--
 .../org/apache/doris/analysis/ShowJobTaskStmt.java |  4 +-
 .../main/java/org/apache/doris/catalog/Env.java    |  5 +-
 .../java/org/apache/doris/qe/ShowExecutor.java     |  7 +-
 .../apache/doris/scheduler/constants/JobType.java  |  7 +-
 .../apache/doris/scheduler/constants/TaskType.java | 16 ++++-
 .../doris/scheduler/disruptor/TaskDisruptor.java   | 34 +++++++--
 .../doris/scheduler/disruptor/TaskHandler.java     | 26 ++++---
 .../scheduler/executor/AbstractJobExecutor.java    | 54 ++++++++++++++
 .../doris/scheduler/executor/JobExecutor.java      |  9 +--
 .../doris/scheduler/executor/SqlJobExecutor.java   | 28 +++-----
 .../java/org/apache/doris/scheduler/job/Job.java   | 82 +++++++++++++---------
 .../org/apache/doris/scheduler/job/JobTask.java    | 37 +++++++++-
 .../doris/scheduler/manager/JobTaskManager.java    | 17 +++--
 .../doris/scheduler/manager/TimerJobManager.java   | 75 +++++++++++++++-----
 .../scheduler/registry/PersistentJobRegister.java  | 13 ++++
 .../doris/scheduler/registry/TimerJobRegister.java |  5 ++
 .../apache/doris/scheduler/disruptor/JobTest.java  |  3 +-
 .../scheduler/disruptor/TaskDisruptorTest.java     | 13 +++-
 .../scheduler/disruptor/TimerJobManagerTest.java   | 14 ++--
 22 files changed, 352 insertions(+), 150 deletions(-)

diff --git a/fe/fe-core/src/main/cup/sql_parser.cup 
b/fe/fe-core/src/main/cup/sql_parser.cup
index 80a71b7490d..5933fcffbcb 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -2551,17 +2551,17 @@ resource_desc ::=
 create_job_stmt ::=
      KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULER KW_EVERY 
INTEGER_LITERAL:time_interval ident:time_unit opt_job_starts:startsTime 
opt_job_ends:endsTime opt_comment:comment KW_DO stmt:executeSql  
     {:
-        CreateJobStmt stmt = new 
CreateJobStmt(jobLabel,null,false,time_interval,time_unit, startsTime, 
endsTime,comment,executeSql);
+        CreateJobStmt stmt = new 
CreateJobStmt(jobLabel,"RECURRING",null,time_interval,time_unit, startsTime, 
endsTime,comment,executeSql);
         RESULT = stmt;
     :}
     | KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULER KW_STREAMING 
KW_AT STRING_LITERAL:atTime opt_comment:comment KW_DO stmt:executeSql  
     {:
-        CreateJobStmt stmt = new 
CreateJobStmt(jobLabel,atTime,true,null,null,null,null,comment,executeSql);
+        CreateJobStmt stmt = new 
CreateJobStmt(jobLabel,"STREAMING",atTime,null,null,null,null,comment,executeSql);
         RESULT = stmt;
     :}    
     | KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULER KW_AT 
STRING_LITERAL:atTime opt_comment:comment KW_DO stmt:executeSql  
     {:
-        CreateJobStmt stmt = new 
CreateJobStmt(jobLabel,atTime,false,null,null,null,null,comment,executeSql);
+        CreateJobStmt stmt = new 
CreateJobStmt(jobLabel,"ONE_TIME",atTime,null,null,null,null,comment,executeSql);
         RESULT = stmt;
     :}
     ;                           
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
index 999d630153d..57f976712c9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
@@ -28,6 +28,7 @@ import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.scheduler.common.IntervalUnit;
 import org.apache.doris.scheduler.constants.JobCategory;
 import org.apache.doris.scheduler.constants.JobStatus;
+import org.apache.doris.scheduler.constants.JobType;
 import org.apache.doris.scheduler.executor.SqlJobExecutor;
 import org.apache.doris.scheduler.job.Job;
 
@@ -41,22 +42,21 @@ import java.util.HashSet;
 /**
  * syntax:
  * CREATE
- *     [DEFINER = user]
- *     JOB
- *     event_name
- *     ON SCHEDULE schedule
- *     [COMMENT 'string']
- *     DO event_body;
+ * [DEFINER = user]
+ * JOB
+ * event_name
+ * ON SCHEDULE schedule
+ * [COMMENT 'string']
+ * DO event_body;
  * schedule: {
- *    [STREAMING] AT timestamp
- *   | EVERY interval
- *     [STARTS timestamp ]
- *     [ENDS timestamp ]
+ * [STREAMING] AT timestamp
+ * | EVERY interval
+ * [STARTS timestamp ]
+ * [ENDS timestamp ]
  * }
  * interval:
- *     quantity { DAY | HOUR | MINUTE |
- *               WEEK | SECOND }
- *
+ * quantity { DAY | HOUR | MINUTE |
+ * WEEK | SECOND }
  */
 @Slf4j
 public class CreateJobStmt extends DdlStmt {
@@ -90,7 +90,7 @@ public class CreateJobStmt extends DdlStmt {
 
     private static HashSet<String> supportStmtClassNamesCache = new 
HashSet<>(16);
 
-    public CreateJobStmt(LabelName labelName, String onceJobStartTimestamp, 
Boolean isStreamingJob,
+    public CreateJobStmt(LabelName labelName, String jobTypeName, String 
onceJobStartTimestamp,
                          Long interval, String intervalTimeUnit,
                          String startsTimeStamp, String endsTimeStamp, String 
comment, StatementBase doStmt) {
         this.labelName = labelName;
@@ -102,7 +102,8 @@ public class CreateJobStmt extends DdlStmt {
         this.comment = comment;
         this.stmt = doStmt;
         this.job = new Job();
-        job.setStreamingJob(isStreamingJob);
+        JobType jobType = JobType.valueOf(jobTypeName.toUpperCase());
+        job.setJobType(jobType);
     }
 
     private String parseExecuteSql(String sql) throws AnalysisException {
@@ -136,7 +137,7 @@ public class CreateJobStmt extends DdlStmt {
         job.setTimezone(timezone);
         job.setComment(comment);
         //todo support user define
-        job.setUser("root");
+        job.setUser(ConnectContext.get().getQualifiedUser());
         job.setJobStatus(JobStatus.RUNNING);
         job.setJobCategory(JobCategory.SQL);
         analyzerSqlStmt();
@@ -172,7 +173,6 @@ public class CreateJobStmt extends DdlStmt {
 
 
     private void analyzerCycleJob() throws UserException {
-        job.setCycleJob(true);
         if (null == interval) {
             throw new AnalysisException("interval is null");
         }
@@ -214,8 +214,6 @@ public class CreateJobStmt extends DdlStmt {
 
 
     private void analyzerOnceTimeJob() throws UserException {
-        job.setCycleJob(false);
-
         job.setIntervalMs(0L);
 
         long executeAtTimeMillis = 
TimeUtils.timeStringToLong(onceJobStartTimestamp);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobStmt.java
index 7ad3ce343c5..42fb1c508fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobStmt.java
@@ -47,19 +47,12 @@ public class ShowJobStmt extends ShowStmt {
     private static final ImmutableList<String> TITLE_NAMES =
             new ImmutableList.Builder<String>()
                     .add("Id")
-                    .add("Db")
                     .add("Name")
                     .add("Definer")
-                    .add("TimeZone")
                     .add("ExecuteType")
-                    .add("ExecuteAt")
-                    .add("ExecuteInterval")
-                    .add("ExecuteIntervalUnit")
-                    .add("Starts")
-                    .add("Ends")
+                    .add("RecurringStrategy")
                     .add("Status")
-                    .add("LastExecuteFinishTime")
-                    .add("ErrorMsg")
+                    .add("lastExecuteTaskStatus")
                     .add("CreateTime")
                     .add("Comment")
                     .build();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java
index 118530341ff..db3fb2ef3cd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java
@@ -41,8 +41,9 @@ public class ShowJobTaskStmt extends ShowStmt {
 
     private static final ImmutableList<String> TITLE_NAMES =
             new ImmutableList.Builder<String>()
-                    .add("JobId")
                     .add("TaskId")
+                    .add("JobId")
+                    .add("JobName")
                     .add("CreateTime")
                     .add("StartTime")
                     .add("EndTime")
@@ -50,6 +51,7 @@ public class ShowJobTaskStmt extends ShowStmt {
                     .add("ExecuteSql")
                     .add("Result")
                     .add("ErrorMsg")
+                    .add("TaskType")
                     .build();
 
     @Getter
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 9383c8b9266..f1cdf47b33f 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -355,6 +355,8 @@ public class Env {
     private TimerJobManager timerJobManager;
     private TransientTaskManager transientTaskManager;
     private JobTaskManager jobTaskManager;
+
+    private TaskDisruptor taskDisruptor;
     private MasterDaemon labelCleaner; // To clean old LabelInfo, 
ExportJobInfos
     private MasterDaemon txnCleaner; // To clean aborted or timeout txns
     private Daemon feDiskUpdater;  // Update fe disk info
@@ -629,7 +631,7 @@ public class Env {
         this.jobTaskManager = new JobTaskManager();
         this.timerJobManager = new TimerJobManager();
         this.transientTaskManager = new TransientTaskManager();
-        TaskDisruptor taskDisruptor = new TaskDisruptor(this.timerJobManager, 
this.transientTaskManager);
+        this.taskDisruptor = new TaskDisruptor(this.timerJobManager, 
this.transientTaskManager);
         this.timerJobManager.setDisruptor(taskDisruptor);
         this.transientTaskManager.setDisruptor(taskDisruptor);
         this.persistentJobRegister = new TimerJobRegister(timerJobManager);
@@ -1532,6 +1534,7 @@ public class Env {
         publishVersionDaemon.start();
         // Start txn cleaner
         txnCleaner.start();
+        taskDisruptor.start();
         timerJobManager.start();
         // Alter
         getAlterInstance().start();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 8c72d260876..2aadab78852 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -1428,14 +1428,15 @@ public class ShowExecutor {
             resultSet = new ShowResultSet(showJobTaskStmt.getMetaData(), rows);
             return;
         }
-        long jobId = jobs.get(0).getJobId();
+        Job job = jobs.get(0);
+        long jobId = job.getJobId();
         List<JobTask> jobTasks = 
Env.getCurrentEnv().getJobTaskManager().getJobTasks(jobId);
         if (CollectionUtils.isEmpty(jobTasks)) {
             resultSet = new ShowResultSet(showJobTaskStmt.getMetaData(), rows);
             return;
         }
-        for (JobTask job : jobTasks) {
-            rows.add(job.getShowInfo());
+        for (JobTask jobTask : jobTasks) {
+            rows.add(jobTask.getShowInfo(job.getJobName()));
         }
         resultSet = new ShowResultSet(showJobTaskStmt.getMetaData(), rows);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobType.java 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobType.java
index 4f4467c989a..58f681c4061 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobType.java
@@ -29,5 +29,10 @@ public enum JobType {
     /**
      * JOB_TYPE_STREAMING is used to identify the streaming job.
      */
-    STREAMING
+    STREAMING,
+
+    /**
+     * The job will be executed manually and need to be triggered by the user.
+     */
+    MANUAL
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/TaskType.java 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/TaskType.java
index 525dbe8adaa..996d72584ff 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/TaskType.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/TaskType.java
@@ -18,7 +18,19 @@
 package org.apache.doris.scheduler.constants;
 
 public enum TaskType {
-    TimerJobTask,
+    /**
+     *  Usually don't require persistence and are used in various asynchronous 
tasks, such as export tasks.
+     *  the life cycle of this kind of task is not managed by JOB-scheduler.
+     */
+    TRANSIENT_TASK,
 
-    TransientTask
+    /**
+     * Tasks generated by scheduled jobs.
+     */
+    SCHEDULER_JOB_TASK,
+
+    /**
+     * Tasks generated by manual jobs.
+     */
+    MANUAL_JOB_TASK
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
index a8d98831f21..0e3f8e618d7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
@@ -18,7 +18,6 @@
 package org.apache.doris.scheduler.disruptor;
 
 import org.apache.doris.common.Config;
-import org.apache.doris.common.CustomThreadFactory;
 import org.apache.doris.scheduler.constants.TaskType;
 import org.apache.doris.scheduler.manager.TimerJobManager;
 import org.apache.doris.scheduler.manager.TransientTaskManager;
@@ -29,6 +28,7 @@ import com.lmax.disruptor.TimeoutException;
 import com.lmax.disruptor.WorkHandler;
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
+import com.lmax.disruptor.util.DaemonThreadFactory;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.Closeable;
@@ -47,10 +47,13 @@ import java.util.concurrent.TimeUnit;
 @Slf4j
 public class TaskDisruptor implements Closeable {
 
-    private final Disruptor<TaskEvent> disruptor;
+    private  Disruptor<TaskEvent> disruptor;
+
+    private TimerJobManager timerJobManager;
+    private TransientTaskManager transientTaskManager;
     private static final int DEFAULT_RING_BUFFER_SIZE = 
Config.async_task_queen_size;
 
-    private static int consumerThreadCount = 
Config.async_task_consumer_thread_num;
+    private static final int consumerThreadCount = 
Config.async_task_consumer_thread_num;
 
     /**
      * The default timeout for {@link #close()} in seconds.
@@ -75,7 +78,12 @@ public class TaskDisruptor implements Closeable {
             };
 
     public TaskDisruptor(TimerJobManager timerJobManager, TransientTaskManager 
transientTaskManager) {
-        ThreadFactory producerThreadFactory = new 
CustomThreadFactory("task-disruptor-producer");
+        this.timerJobManager = timerJobManager;
+        this.transientTaskManager = transientTaskManager;
+    }
+
+    public void start() {
+        ThreadFactory producerThreadFactory = DaemonThreadFactory.INSTANCE;
         disruptor = new Disruptor<>(TaskEvent.FACTORY, 
DEFAULT_RING_BUFFER_SIZE, producerThreadFactory,
                 ProducerType.SINGLE, new BlockingWaitStrategy());
         WorkHandler<TaskEvent>[] workers = new 
TaskHandler[consumerThreadCount];
@@ -88,16 +96,29 @@ public class TaskDisruptor implements Closeable {
 
     /**
      * Publishes a job to the disruptor.
+     * Default task type is {@link TaskType#SCHEDULER_JOB_TASK}
      *
      * @param jobId job id
      */
     public void tryPublish(Long jobId, Long taskId) {
+        this.tryPublish(jobId, taskId, TaskType.SCHEDULER_JOB_TASK);
+    }
+
+
+    /**
+     * Publishes a job task to the disruptor.
+     *
+     * @param jobId    job id, describe which job this task belongs to
+     * @param taskId   task id, it's linked to job id, we can get job detail 
by task id
+     * @param taskType {@link TaskType}
+     */
+    public void tryPublish(Long jobId, Long taskId, TaskType taskType) {
         if (isClosed) {
             log.info("tryPublish failed, disruptor is closed, jobId: {}", 
jobId);
             return;
         }
         try {
-            disruptor.publishEvent(TRANSLATOR, jobId, taskId, 
TaskType.TimerJobTask);
+            disruptor.publishEvent(TRANSLATOR, jobId, taskId, taskType);
         } catch (Exception e) {
             log.error("tryPublish failed, jobId: {}", jobId, e);
         }
@@ -105,6 +126,7 @@ public class TaskDisruptor implements Closeable {
 
     /**
      * Publishes a task to the disruptor.
+     * Default task type is {@link TaskType#TRANSIENT_TASK}
      *
      * @param taskId task id
      */
@@ -114,7 +136,7 @@ public class TaskDisruptor implements Closeable {
             return;
         }
         try {
-            disruptor.publishEvent(TRANSLATOR, taskId, 0L, 
TaskType.TransientTask);
+            disruptor.publishEvent(TRANSLATOR, taskId, 0L, 
TaskType.TRANSIENT_TASK);
         } catch (Exception e) {
             log.error("tryPublish failed, taskId: {}", taskId, e);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
index 9daf3b7ee45..6005fa1bd40 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
@@ -19,6 +19,7 @@ package org.apache.doris.scheduler.disruptor;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.scheduler.constants.JobType;
 import org.apache.doris.scheduler.exception.JobException;
 import org.apache.doris.scheduler.executor.TransientTaskExecutor;
 import org.apache.doris.scheduler.job.ExecutorResult;
@@ -70,13 +71,15 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
     @Override
     public void onEvent(TaskEvent event) {
         switch (event.getTaskType()) {
-            case TimerJobTask:
+            case SCHEDULER_JOB_TASK:
+            case MANUAL_JOB_TASK:
                 onTimerJobTaskHandle(event);
                 break;
-            case TransientTask:
+            case TRANSIENT_TASK:
                 onTransientTaskHandle(event);
                 break;
             default:
+                log.warn("unknown task type: {}", event.getTaskType());
                 break;
         }
     }
@@ -90,7 +93,11 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
     public void onTimerJobTaskHandle(TaskEvent taskEvent) {
         long jobId = taskEvent.getId();
         long taskId = taskEvent.getTaskId();
-        long createTimeMs = jobTaskManager.pollPrepareTaskByTaskId(jobId, 
taskId);
+        JobTask jobTask = jobTaskManager.pollPrepareTaskByTaskId(jobId, 
taskId);
+        if (jobTask == null) {
+            log.warn("jobTask is null, maybe it's cancel, jobId: {}, taskId: 
{}", jobId, taskId);
+            return;
+        }
         Job job = timerJobManager.getJob(jobId);
         if (job == null) {
             log.info("job is null, jobId: {}", jobId);
@@ -102,12 +109,12 @@ public class TaskHandler implements 
WorkHandler<TaskEvent> {
         }
         log.debug("job is running, eventJobId: {}", jobId);
 
-        JobTask jobTask = new JobTask(jobId, taskId, createTimeMs);
+
         try {
             jobTask.setStartTimeMs(System.currentTimeMillis());
-            ExecutorResult result = job.getExecutor().execute(job);
+            ExecutorResult result = job.getExecutor().execute(job, 
jobTask.getContextData());
             job.setLatestCompleteExecuteTimeMs(System.currentTimeMillis());
-            if (job.isCycleJob()) {
+            if (job.getJobType().equals(JobType.RECURRING)) {
                 updateJobStatusIfPastEndTime(job);
             } else {
                 // one time job should be finished after execute
@@ -117,7 +124,7 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
                 log.warn("Job execute failed, jobId: {}, result is null", 
jobId);
                 jobTask.setErrorMsg("Job execute failed, result is null");
                 jobTask.setIsSuccessful(false);
-                timerJobManager.pauseJob(jobId);
+                timerJobManager.setJobLatestStatus(jobId, false);
                 return;
             }
             String resultStr = GsonUtils.GSON.toJson(result.getResult());
@@ -126,14 +133,12 @@ public class TaskHandler implements 
WorkHandler<TaskEvent> {
             if (!result.isSuccess()) {
                 log.warn("Job execute failed, jobId: {}, msg : {}", jobId, 
result.getExecutorSql());
                 jobTask.setErrorMsg(result.getErrorMsg());
-                timerJobManager.pauseJob(jobId);
             }
             jobTask.setExecuteSql(result.getExecutorSql());
         } catch (Exception e) {
             log.warn("Job execute failed, jobId: {}, msg : {}", jobId, 
e.getMessage());
             jobTask.setErrorMsg(e.getMessage());
             jobTask.setIsSuccessful(false);
-            timerJobManager.pauseJob(jobId);
         }
         jobTask.setEndTimeMs(System.currentTimeMillis());
         if (null == jobTaskManager) {
@@ -141,6 +146,7 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
         }
         boolean isPersistent = job.getJobCategory().isPersistent();
         jobTaskManager.addJobTask(jobTask, isPersistent);
+        timerJobManager.setJobLatestStatus(jobId, jobTask.getIsSuccessful());
     }
 
     public void onTransientTaskHandle(TaskEvent taskEvent) {
@@ -165,7 +171,7 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
     }
 
     private void updateOnceTimeJobStatus(Job job) {
-        if (job.isStreamingJob()) {
+        if (job.getJobType().equals(JobType.STREAMING)) {
             timerJobManager.putOneJobToQueen(job.getJobId());
             return;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/AbstractJobExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/AbstractJobExecutor.java
new file mode 100644
index 00000000000..7a70c963bd1
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/AbstractJobExecutor.java
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler.executor;
+
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.scheduler.job.Job;
+import org.apache.doris.thrift.TUniqueId;
+
+import lombok.Getter;
+
+import java.util.UUID;
+
+@Getter
+public abstract class AbstractJobExecutor<T, C> implements JobExecutor<T, C> {
+
+    protected ConnectContext createContext(Job job) {
+        ConnectContext ctx = new ConnectContext();
+        ctx.setEnv(Env.getCurrentEnv());
+        
ctx.setCluster(ClusterNamespace.getClusterNameFromFullName(job.getDbName()));
+        ctx.setDatabase(job.getDbName());
+        ctx.setQualifiedUser(job.getUser());
+        
ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(job.getUser(),
 "%"));
+        ctx.getState().reset();
+        ctx.setThreadLocalInfo();
+        return ctx;
+    }
+
+    protected String generateTaskId() {
+        return UUID.randomUUID().toString();
+    }
+
+    protected TUniqueId generateQueryId(String taskIdString) {
+        UUID taskId = UUID.fromString(taskIdString);
+        return new TUniqueId(taskId.getMostSignificantBits(), 
taskId.getLeastSignificantBits());
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java
index a6f2e10306f..40aebc8f6ad 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java
@@ -28,18 +28,19 @@ import org.apache.doris.scheduler.job.Job;
  * We use Gson to serialize and deserialize JobExecutor. so the implementation 
of JobExecutor needs to be serializable.
  * You can see @org.apache.doris.persist.gson.GsonUtils.java for details.When 
you implement JobExecutor,pls make sure
  * you can serialize and deserialize it.
- *
- * @param <T> The result type of the event job execution.
  */
 @FunctionalInterface
-public interface JobExecutor<T> {
+public interface JobExecutor<T, C> {
 
     /**
      * Executes the event job and returns the result.
      * Exceptions will be caught internally, so there is no need to define or 
throw them separately.
      *
+     * @param job         The event job to execute.
+     * @param dataContext The data context of the event job. if you need to 
pass parameters to the event job,
+     *                    you can use it.
      * @return The result of the event job execution.
      */
-    ExecutorResult execute(Job job) throws JobException;
+    ExecutorResult<T> execute(Job job, C dataContext) throws JobException;
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java
index 3df2a6fd9a2..546eac9a768 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java
@@ -17,9 +17,6 @@
 
 package org.apache.doris.scheduler.executor;
 
-import org.apache.doris.analysis.UserIdentity;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.scheduler.exception.JobException;
@@ -32,15 +29,15 @@ import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 
-import java.util.UUID;
+import java.util.Map;
 
 /**
  * we use this executor to execute sql job
  */
+@Getter
 @Slf4j
-public class SqlJobExecutor implements JobExecutor {
+public class SqlJobExecutor extends AbstractJobExecutor<String, Map<String, 
Object>> {
 
-    @Getter
     @Setter
     @SerializedName(value = "sql")
     private String sql;
@@ -50,24 +47,14 @@ public class SqlJobExecutor implements JobExecutor {
     }
 
     @Override
-    public ExecutorResult<String> execute(Job job) throws JobException {
-        ConnectContext ctx = new ConnectContext();
-        ctx.setEnv(Env.getCurrentEnv());
-        
ctx.setCluster(ClusterNamespace.getClusterNameFromFullName(job.getDbName()));
-        ctx.setDatabase(job.getDbName());
-        ctx.setQualifiedUser(job.getUser());
-        
ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(job.getUser(),
 "%"));
-        ctx.getState().reset();
-        ctx.setThreadLocalInfo();
-        String taskIdString = UUID.randomUUID().toString();
-        UUID taskId = UUID.fromString(taskIdString);
-        TUniqueId queryId = new TUniqueId(taskId.getMostSignificantBits(), 
taskId.getLeastSignificantBits());
-        ctx.setQueryId(queryId);
+    public ExecutorResult<String> execute(Job job, Map<String, Object> 
dataContext) throws JobException {
+        ConnectContext ctx = createContext(job);
+        String taskIdString = generateTaskId();
+        TUniqueId queryId = generateQueryId(taskIdString);
         try {
             StmtExecutor executor = new StmtExecutor(ctx, sql);
             executor.execute(queryId);
             String result = convertExecuteResult(ctx, taskIdString);
-
             return new ExecutorResult<>(result, true, null, sql);
         } catch (Exception e) {
             log.warn("execute sql job failed, job id :{}, sql: {}, error: {}", 
job.getJobId(), sql, e);
@@ -88,4 +75,5 @@ public class SqlJobExecutor implements JobExecutor {
         return "queryId:" + queryId + ",affectedRows : " + 
ctx.getState().getAffectedRows() + ", warningRows: "
                 + ctx.getState().getWarningRows() + ",infoMsg" + 
ctx.getState().getInfoMessage();
     }
+
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java
index 6d39f1cd8d9..7f3fd0884b7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java
@@ -80,6 +80,9 @@ public class Job implements Writable {
     @SerializedName("jobStatus")
     private JobStatus jobStatus;
 
+    @SerializedName("jobType")
+    private JobType jobType = JobType.RECURRING;
+
     /**
      * The executor of the job.
      *
@@ -93,12 +96,6 @@ public class Job implements Writable {
     @SerializedName("user")
     private String user;
 
-    @SerializedName("isCycleJob")
-    private boolean isCycleJob = false;
-
-    @SerializedName("isStreamingJob")
-    private boolean isStreamingJob = false;
-
     @SerializedName("intervalMs")
     private Long intervalMs = 0L;
     @SerializedName("startTimeMs")
@@ -129,6 +126,8 @@ public class Job implements Writable {
     @SerializedName("createTimeMs")
     private Long createTimeMs = System.currentTimeMillis();
 
+    private Boolean lastExecuteTaskStatus;
+
     @SerializedName("comment")
     private String comment;
 
@@ -206,6 +205,18 @@ public class Job implements Writable {
     }
 
     public void checkJobParam() throws DdlException {
+        if (null == jobCategory) {
+            throw new DdlException("jobCategory must be set");
+        }
+        if (null == executor) {
+            throw new DdlException("Job executor must be set");
+        }
+        if (null == jobType) {
+            throw new DdlException("Job type must be set");
+        }
+        if (jobType.equals(JobType.MANUAL)) {
+            return;
+        }
         if (startTimeMs != 0L && startTimeMs < System.currentTimeMillis()) {
             throw new DdlException("startTimeMs must be greater than current 
time");
         }
@@ -221,15 +232,10 @@ public class Job implements Writable {
         if (null != intervalUnit && null != originInterval) {
             this.intervalMs = intervalUnit.getParameterValue(originInterval);
         }
-        if (isCycleJob && (intervalMs == null || intervalMs <= 0L)) {
+        if (jobType.equals(JobType.RECURRING) && (intervalMs == null || 
intervalMs <= 0L)) {
             throw new DdlException("cycle job must set intervalMs");
         }
-        if (null == jobCategory) {
-            throw new DdlException("jobCategory must be set");
-        }
-        if (null == executor) {
-            throw new DdlException("Job executor must be set");
-        }
+
     }
 
 
@@ -246,33 +252,41 @@ public class Job implements Writable {
     public List<String> getShowInfo() {
         List<String> row = Lists.newArrayList();
         row.add(String.valueOf(jobId));
-        row.add(dbName);
-        if (jobCategory.equals(JobCategory.MTMV)) {
-            row.add(baseName);
-        }
         row.add(jobName);
         row.add(user);
-        row.add(timezone);
-        if (isCycleJob) {
-            row.add(JobType.RECURRING.name());
-        } else {
-            if (isStreamingJob) {
-                row.add(JobType.STREAMING.name());
-            } else {
-                row.add(JobType.ONE_TIME.name());
-            }
-        }
-        row.add(isCycleJob ? "null" : TimeUtils.longToTimeString(startTimeMs));
-        row.add(isCycleJob ? originInterval.toString() : "null");
-        row.add(isCycleJob ? intervalUnit.name() : "null");
-        row.add(isCycleJob && startTimeMs > 0 ? 
TimeUtils.longToTimeString(startTimeMs) : "null");
-        row.add(isCycleJob && endTimeMs > 0 ? 
TimeUtils.longToTimeString(endTimeMs) : "null");
+        row.add(jobType.name());
+
+        row.add(convertRecurringStrategyToString());
         row.add(jobStatus.name());
-        row.add(latestCompleteExecuteTimeMs <= 0L ? "null" : 
TimeUtils.longToTimeString(latestCompleteExecuteTimeMs));
-        row.add(errMsg == null ? "null" : errMsg);
+        row.add(null == lastExecuteTaskStatus ? "null" : 
lastExecuteTaskStatus.toString());
         row.add(createTimeMs <= 0L ? "null" : 
TimeUtils.longToTimeString(createTimeMs));
         row.add(comment == null ? "null" : comment);
         return row;
     }
 
+    private String convertRecurringStrategyToString() {
+        if (jobType.equals(JobType.MANUAL)) {
+            return "MANUAL TRIGGER";
+        }
+        switch (jobType) {
+            case ONE_TIME:
+                return "AT " + TimeUtils.longToTimeString(startTimeMs);
+            case RECURRING:
+                String result = "EVERY " + originInterval + " " + 
intervalUnit.name();
+                if (startTimeMs > 0) {
+                    result += " STARTS " + 
TimeUtils.longToTimeString(startTimeMs);
+                }
+                if (endTimeMs > 0) {
+                    result += " ENDS " + TimeUtils.longToTimeString(endTimeMs);
+                }
+                return result;
+            case STREAMING:
+                return "STREAMING" + (startTimeMs > 0 ? " AT " + 
TimeUtils.longToTimeString(startTimeMs) : "");
+            case MANUAL:
+                return "MANUAL TRIGGER";
+            default:
+                return "UNKNOWN";
+        }
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java
index 04fb552e74b..1f8aac58285 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java
@@ -21,10 +21,12 @@ import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.scheduler.constants.TaskType;
 
 import com.google.common.collect.Lists;
 import com.google.gson.annotations.SerializedName;
 import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -32,7 +34,8 @@ import java.io.IOException;
 import java.util.List;
 
 @Data
-public class JobTask implements Writable {
+@Slf4j
+public class JobTask<T> implements Writable {
 
     @SerializedName("jobId")
     private Long jobId;
@@ -54,6 +57,21 @@ public class JobTask implements Writable {
     @SerializedName("errorMsg")
     private String errorMsg;
 
+    @SerializedName("contextDataStr")
+    private String contextDataStr;
+
+    @SerializedName("taskType")
+    private TaskType taskType = TaskType.SCHEDULER_JOB_TASK;
+
+    /**
+     * Some parameters specific to the current task that need to be used to 
execute the task
+     * eg: sql task, sql it's: select * from table where id = 1 order by id 
desc limit ${limit} offset ${offset}
+     * contextData is a map, key1 is limit, value is 10,key2 is offset, value 
is 1
+     * when execute the task, we will replace the ${limit} to 10, ${offset} to 
1
+     * so to execute sql is: select * from table where id = 1 order by id desc 
limit 10 offset 1.
+     */
+    private T contextData;
+
     public JobTask(Long jobId, Long taskId, Long createTimeMs) {
         //it's enough to use nanoTime to identify a task
         this.taskId = taskId;
@@ -61,10 +79,22 @@ public class JobTask implements Writable {
         this.createTimeMs = createTimeMs;
     }
 
-    public List<String> getShowInfo() {
+    public JobTask(Long jobId, Long taskId, Long createTimeMs, T contextData) {
+        this(jobId, taskId, createTimeMs);
+        this.contextData = contextData;
+        try {
+            this.contextDataStr = GsonUtils.GSON.toJson(contextData);
+        } catch (Exception e) {
+            this.contextDataStr = null;
+            log.error("contextData serialize failed, jobId: {}, taskId: {}", 
jobId, taskId, e);
+        }
+    }
+
+    public List<String> getShowInfo(String jobName) {
         List<String> row = Lists.newArrayList();
-        row.add(String.valueOf(jobId));
         row.add(String.valueOf(taskId));
+        row.add(String.valueOf(jobId));
+        row.add(jobName);
         if (null != createTimeMs) {
             row.add(TimeUtils.longToTimeString(createTimeMs));
         }
@@ -90,6 +120,7 @@ public class JobTask implements Writable {
         } else {
             row.add(errorMsg);
         }
+        row.add(taskType.name());
         return row;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java
index 7c739ba4603..6f7cd839074 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java
@@ -51,17 +51,20 @@ public class JobTaskManager implements Writable {
      * used to record the start time of the task to be executed
      * will clear when the task is executed
      */
-    private static ConcurrentHashMap<Long, Map<Long, Long>> 
prepareTaskCreateMsMap = new ConcurrentHashMap<>(16);
+    private static ConcurrentHashMap<Long, Map<Long, JobTask>> 
prepareTaskCreateMsMap = new ConcurrentHashMap<>(16);
 
-    public static void addPrepareTaskStartTime(Long jobId, Long taskId, Long 
startTime) {
+    public static void addPrepareTask(JobTask jobTask) {
+        long jobId = jobTask.getJobId();
+        long taskId = jobTask.getTaskId();
         prepareTaskCreateMsMap.computeIfAbsent(jobId, k -> new HashMap<>());
-        prepareTaskCreateMsMap.get(jobId).put(taskId, startTime);
+        prepareTaskCreateMsMap.get(jobId).put(taskId, jobTask);
     }
 
-    public static Long pollPrepareTaskByTaskId(Long jobId, Long taskId) {
-        if (!prepareTaskCreateMsMap.containsKey(jobId)) {
-            // if the job is not in the map, return current time
-            return System.currentTimeMillis();
+    public static JobTask pollPrepareTaskByTaskId(Long jobId, Long taskId) {
+        if (!prepareTaskCreateMsMap.containsKey(jobId) || 
!prepareTaskCreateMsMap.get(jobId).containsKey(taskId)) {
+            // if the job is not in the map, return new JobTask
+            // return new JobTask(jobId, taskId, System.currentTimeMillis()); 
fixme
+            return null;
         }
         return prepareTaskCreateMsMap.get(jobId).remove(taskId);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
index c7a728cf049..33ac7d5f940 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
@@ -19,7 +19,6 @@ package org.apache.doris.scheduler.manager;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
-import org.apache.doris.common.CustomThreadFactory;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.PatternMatcher;
 import org.apache.doris.common.io.Writable;
@@ -28,8 +27,11 @@ import org.apache.doris.common.util.LogKey;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.scheduler.constants.JobCategory;
 import org.apache.doris.scheduler.constants.JobStatus;
+import org.apache.doris.scheduler.constants.JobType;
+import org.apache.doris.scheduler.constants.TaskType;
 import org.apache.doris.scheduler.disruptor.TaskDisruptor;
 import org.apache.doris.scheduler.job.Job;
+import org.apache.doris.scheduler.job.JobTask;
 import org.apache.doris.scheduler.job.TimerJobTask;
 
 import io.netty.util.HashedWheelTimer;
@@ -88,13 +90,12 @@ public class TimerJobManager implements Closeable, Writable 
{
     }
 
     public void start() {
-        dorisTimer = new HashedWheelTimer(new 
CustomThreadFactory("hashed-wheel-timer"),
-                1, TimeUnit.SECONDS, 660);
+        dorisTimer = new HashedWheelTimer(1, TimeUnit.SECONDS, 660);
         dorisTimer.start();
         Long currentTimeMs = System.currentTimeMillis();
         jobMap.forEach((jobId, job) -> {
             Long nextExecuteTimeMs = findFistExecuteTime(currentTimeMs, 
job.getStartTimeMs(),
-                    job.getIntervalMs(), job.isCycleJob());
+                    job.getIntervalMs(), job.getJobType());
             job.setNextExecuteTimeMs(nextExecuteTimeMs);
         });
         batchSchedulerTasks();
@@ -152,18 +153,18 @@ public class TimerJobManager implements Closeable, 
Writable {
     }
 
     private void initAndSchedulerJob(Job job) {
-        if (!job.getJobStatus().equals(JobStatus.RUNNING)) {
+        if (!job.getJobStatus().equals(JobStatus.RUNNING) || 
job.getJobType().equals(JobType.MANUAL)) {
             return;
         }
 
         Long currentTimeMs = System.currentTimeMillis();
         Long nextExecuteTimeMs = findFistExecuteTime(currentTimeMs, 
job.getStartTimeMs(),
-                job.getIntervalMs(), job.isCycleJob());
+                job.getIntervalMs(), job.getJobType());
         job.setNextExecuteTimeMs(nextExecuteTimeMs);
         if (job.getNextExecuteTimeMs() < lastBatchSchedulerTimestamp) {
             List<Long> executeTimestamp = findTasksBetweenTime(job,
                     lastBatchSchedulerTimestamp,
-                    job.getNextExecuteTimeMs());
+                    job.getNextExecuteTimeMs(), job.getJobType());
             if (!executeTimestamp.isEmpty()) {
                 for (Long timestamp : executeTimestamp) {
                     putOneTask(job.getJobId(), timestamp);
@@ -172,7 +173,7 @@ public class TimerJobManager implements Closeable, Writable 
{
         }
     }
 
-    private Long findFistExecuteTime(Long currentTimeMs, Long startTimeMs, 
Long intervalMs, boolean isCycleJob) {
+    private Long findFistExecuteTime(Long currentTimeMs, Long startTimeMs, 
Long intervalMs, JobType jobType) {
         // if job not delay, first execute time is start time
         if (startTimeMs != 0L && startTimeMs > currentTimeMs) {
             return startTimeMs;
@@ -182,13 +183,30 @@ public class TimerJobManager implements Closeable, 
Writable {
             return currentTimeMs;
         }
         // if it's cycle job and not set start tine, first execute time is 
current time + interval
-        if (isCycleJob && startTimeMs == 0L) {
+        if (jobType.equals(JobType.RECURRING) && startTimeMs == 0L) {
             return currentTimeMs + intervalMs;
         }
         // if it's not cycle job and already delay, first execute time is 
current time
         return currentTimeMs;
     }
 
+    public <T> boolean immediateExecuteTask(Long jobId, T taskContextData) 
throws DdlException {
+        Job job = jobMap.get(jobId);
+        if (job == null) {
+            log.warn("immediateExecuteTask failed, jobId: {} not exist", 
jobId);
+            return false;
+        }
+        if (!job.getJobStatus().equals(JobStatus.RUNNING)) {
+            log.warn("immediateExecuteTask failed, jobId: {} is not running", 
jobId);
+            return false;
+        }
+        JobTask jobTask = createInitialTask(jobId, taskContextData);
+        jobTask.setTaskType(TaskType.MANUAL_JOB_TASK);
+        JobTaskManager.addPrepareTask(jobTask);
+        disruptor.tryPublish(jobId, jobTask.getTaskId(), 
TaskType.MANUAL_JOB_TASK);
+        return true;
+    }
+
     public void unregisterJob(Long jobId) {
         jobMap.remove(jobId);
     }
@@ -204,6 +222,14 @@ public class TimerJobManager implements Closeable, 
Writable {
         pauseJob(job);
     }
 
+    public void setJobLatestStatus(long jobId, boolean status) {
+        Job job = jobMap.get(jobId);
+        if (jobMap.get(jobId) == null) {
+            log.warn("pauseJob failed, jobId: {} not exist", jobId);
+        }
+        job.setLastExecuteTaskStatus(status);
+    }
+
     public void stopJob(String dbName, String jobName, JobCategory 
jobCategory) throws DdlException {
         Optional<Job> optionalJob = findJob(dbName, jobName, jobCategory);
 
@@ -326,14 +352,14 @@ public class TimerJobManager implements Closeable, 
Writable {
         executeJobIdsWithinLastTenMinutesWindow();
     }
 
-    private List<Long> findTasksBetweenTime(Job job, Long endTimeEndWindow, 
Long nextExecuteTime) {
+    private List<Long> findTasksBetweenTime(Job job, Long endTimeEndWindow, 
Long nextExecuteTime, JobType jobType) {
 
         List<Long> jobExecuteTimes = new ArrayList<>();
-        if (!job.isCycleJob() && (nextExecuteTime < endTimeEndWindow)) {
+        if (!jobType.equals(JobType.RECURRING) && (nextExecuteTime < 
endTimeEndWindow)) {
             jobExecuteTimes.add(nextExecuteTime);
             return jobExecuteTimes;
         }
-        if (job.isCycleJob() && (nextExecuteTime > endTimeEndWindow)) {
+        if (jobType.equals(JobType.RECURRING) && (nextExecuteTime > 
endTimeEndWindow)) {
             return new ArrayList<>();
         }
         while (endTimeEndWindow >= nextExecuteTime) {
@@ -360,11 +386,11 @@ public class TimerJobManager implements Closeable, 
Writable {
             return;
         }
         jobMap.forEach((k, v) -> {
-            if (v.isRunning() && (v.getNextExecuteTimeMs()
+            if (!v.getJobType().equals(JobType.MANUAL) && v.isRunning() && 
(v.getNextExecuteTimeMs()
                     + v.getIntervalMs() < lastBatchSchedulerTimestamp)) {
                 List<Long> executeTimes = findTasksBetweenTime(
                         v, lastBatchSchedulerTimestamp,
-                        v.getNextExecuteTimeMs());
+                        v.getNextExecuteTimeMs(), v.getJobType());
                 if (!executeTimes.isEmpty()) {
                     for (Long executeTime : executeTimes) {
                         putOneTask(v.getJobId(), executeTime);
@@ -402,7 +428,8 @@ public class TimerJobManager implements Closeable, Writable 
{
             log.info("putOneTask failed, scheduler is closed, jobId: {}", 
jobId);
             return;
         }
-        long taskId = System.nanoTime();
+        JobTask jobTask = createAsyncInitialTask(jobId, startExecuteTime);
+        long taskId = jobTask.getTaskId();
         TimerJobTask task = new TimerJobTask(jobId, taskId, startExecuteTime, 
disruptor);
         long delay = getDelaySecond(task.getStartTimestamp());
         Timeout timeout = dorisTimer.newTimeout(task, delay, TimeUnit.SECONDS);
@@ -412,13 +439,13 @@ public class TimerJobManager implements Closeable, 
Writable {
         }
         if (jobTimeoutMap.containsKey(task.getJobId())) {
             jobTimeoutMap.get(task.getJobId()).put(task.getTaskId(), timeout);
-            JobTaskManager.addPrepareTaskStartTime(jobId, taskId, 
startExecuteTime);
+            JobTaskManager.addPrepareTask(jobTask);
             return;
         }
         Map<Long, Timeout> timeoutMap = new ConcurrentHashMap<>();
         timeoutMap.put(task.getTaskId(), timeout);
         jobTimeoutMap.put(task.getJobId(), timeoutMap);
-        JobTaskManager.addPrepareTaskStartTime(jobId, taskId, 
startExecuteTime);
+        JobTaskManager.addPrepareTask(jobTask);
     }
 
     // cancel all task for one job
@@ -488,9 +515,19 @@ public class TimerJobManager implements Closeable, 
Writable {
     }
 
     public void putOneJobToQueen(Long jobId) {
+        JobTask jobTask = createInitialTask(jobId, null);
+        JobTaskManager.addPrepareTask(jobTask);
+        disruptor.tryPublish(jobId, jobTask.getTaskId());
+    }
+
+    private JobTask createAsyncInitialTask(long jobId, long createTimeMs) {
+        long taskId = System.nanoTime();
+        return new JobTask(jobId, taskId, createTimeMs);
+    }
+
+    private <T> JobTask createInitialTask(long jobId, T context) {
         long taskId = System.nanoTime();
-        JobTaskManager.addPrepareTaskStartTime(jobId, taskId, 
System.currentTimeMillis());
-        disruptor.tryPublish(jobId, taskId);
+        return new JobTask(jobId, taskId, System.currentTimeMillis(), context);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/PersistentJobRegister.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/PersistentJobRegister.java
index 4ee17bb8df0..f1a901299f7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/PersistentJobRegister.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/PersistentJobRegister.java
@@ -110,6 +110,19 @@ public interface PersistentJobRegister {
 
     Long registerJob(Job job) throws DdlException;
 
+
+    /**
+     * execute job task immediately,this method will not change job status and 
don't affect scheduler job
+     * this task type should set to {@link 
org.apache.doris.scheduler.constants.TaskType#MANUAL_JOB_TASK}
+     *
+     * @param jobId       job id
+     * @param contextData if you need to pass parameters to the task,
+     * @param <T>         context data type
+     * @return true if execute success, false if execute failed,
+     * if job is not exist or job is not running, or job not support manual 
execute, return false
+     */
+    <T> boolean immediateExecuteTask(Long jobId, T contextData) throws 
DdlException;
+
     List<Job> getJobs(String dbFullName, String jobName, JobCategory 
jobCategory, PatternMatcher matcher);
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/TimerJobRegister.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/TimerJobRegister.java
index 5ce802eff72..f8ab59e5d54 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/TimerJobRegister.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/TimerJobRegister.java
@@ -68,6 +68,11 @@ public class TimerJobRegister implements 
PersistentJobRegister {
         return timerJobManager.registerJob(job);
     }
 
+    @Override
+    public <T> boolean immediateExecuteTask(Long jobId, T data) throws 
DdlException {
+        return timerJobManager.immediateExecuteTask(jobId, data);
+    }
+
     @Override
     public void pauseJob(Long jobId) {
         timerJobManager.pauseJob(jobId);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/JobTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/JobTest.java
index ad5f740907c..57fbfda6cf9 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/JobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/JobTest.java
@@ -19,6 +19,7 @@ package org.apache.doris.scheduler.disruptor;
 
 import org.apache.doris.scheduler.common.IntervalUnit;
 import org.apache.doris.scheduler.constants.JobCategory;
+import org.apache.doris.scheduler.constants.JobType;
 import org.apache.doris.scheduler.executor.SqlJobExecutor;
 import org.apache.doris.scheduler.job.Job;
 
@@ -42,7 +43,7 @@ public class JobTest {
     public static void init() {
         SqlJobExecutor sqlJobExecutor = new SqlJobExecutor("insert into test 
values(1);");
         job = new Job("insertTest", 1000L, System.currentTimeMillis(), 
System.currentTimeMillis() + 100000, sqlJobExecutor);
-        job.setCycleJob(true);
+        job.setJobType(JobType.RECURRING);
         job.setComment("test");
         job.setOriginInterval(10L);
         job.setIntervalUnit(IntervalUnit.SECOND);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TaskDisruptorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TaskDisruptorTest.java
index 1036285c47b..b8482b11fff 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TaskDisruptorTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TaskDisruptorTest.java
@@ -19,9 +19,12 @@ package org.apache.doris.scheduler.disruptor;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.scheduler.constants.JobCategory;
+import org.apache.doris.scheduler.exception.JobException;
 import org.apache.doris.scheduler.executor.JobExecutor;
 import org.apache.doris.scheduler.job.ExecutorResult;
 import org.apache.doris.scheduler.job.Job;
+import org.apache.doris.scheduler.job.JobTask;
+import org.apache.doris.scheduler.manager.JobTaskManager;
 import org.apache.doris.scheduler.manager.TimerJobManager;
 import org.apache.doris.scheduler.manager.TransientTaskManager;
 
@@ -56,26 +59,30 @@ public class TaskDisruptorTest {
     @BeforeEach
     public void init() {
         taskDisruptor = new TaskDisruptor(timerJobManager, 
transientTaskManager);
+        taskDisruptor.start();
     }
 
     @Test
     void testPublishEventAndConsumer() {
         Job job = new Job("test", 6000L, null,
                 null, new TestExecutor());
+        JobTask jobTask = new JobTask(job.getJobId(), 1L, 
System.currentTimeMillis());
+        JobTaskManager.addPrepareTask(jobTask);
         job.setJobCategory(JobCategory.COMMON);
         new Expectations() {{
                 timerJobManager.getJob(anyLong);
                 result = job;
             }};
         taskDisruptor.tryPublish(job.getJobId(), 1L);
-        Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> 
testEventExecuteFlag);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).until(() -> 
testEventExecuteFlag);
         Assertions.assertTrue(testEventExecuteFlag);
     }
 
 
-    class TestExecutor implements JobExecutor<Boolean> {
+    class TestExecutor implements JobExecutor<Boolean, String> {
+
         @Override
-        public ExecutorResult execute(Job job) {
+        public ExecutorResult<Boolean> execute(Job job, String dataContext) 
throws JobException {
             testEventExecuteFlag = true;
             return new ExecutorResult(true, true, null, "null");
         }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java
index 3e912b8fd85..5fcf242fd13 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java
@@ -21,9 +21,12 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.persist.EditLog;
 import org.apache.doris.scheduler.constants.JobCategory;
+import org.apache.doris.scheduler.constants.JobType;
+import org.apache.doris.scheduler.exception.JobException;
 import org.apache.doris.scheduler.executor.JobExecutor;
 import org.apache.doris.scheduler.job.ExecutorResult;
 import org.apache.doris.scheduler.job.Job;
+import org.apache.doris.scheduler.job.JobTask;
 import org.apache.doris.scheduler.manager.TimerJobManager;
 import org.apache.doris.scheduler.manager.TransientTaskManager;
 
@@ -51,16 +54,18 @@ public class TimerJobManagerTest {
     private static AtomicInteger testExecuteCount = new AtomicInteger(0);
     Job job = new Job("test", 6000L, null,
             null, new TestExecutor());
+    JobTask jobTask = new JobTask(job.getJobId(), 1L, 
System.currentTimeMillis());
 
     @BeforeEach
     public void init() {
-        job.setCycleJob(true);
+        job.setJobType(JobType.RECURRING);
         job.setJobCategory(JobCategory.COMMON);
         testExecuteCount.set(0);
         timerJobManager = new TimerJobManager();
         TransientTaskManager transientTaskManager = new TransientTaskManager();
         TaskDisruptor taskDisruptor = new TaskDisruptor(this.timerJobManager, 
transientTaskManager);
         this.timerJobManager.setDisruptor(taskDisruptor);
+        taskDisruptor.start();
         timerJobManager.start();
     }
 
@@ -153,7 +158,7 @@ public class TimerJobManagerTest {
         long startTimestamp = System.currentTimeMillis() + 3000L;
         job.setIntervalMs(0L);
         job.setStartTimeMs(startTimestamp);
-        job.setCycleJob(false);
+        job.setJobType(JobType.ONE_TIME);
         timerJobManager.registerJob(job);
         //consider the time of the first execution and give some buffer time
         Awaitility.await().atMost(14, TimeUnit.SECONDS).until(() -> 
System.currentTimeMillis()
@@ -166,9 +171,10 @@ public class TimerJobManagerTest {
         timerJobManager.close();
     }
 
-    class TestExecutor implements JobExecutor<Boolean> {
+    class TestExecutor implements JobExecutor<Boolean, String> {
+
         @Override
-        public ExecutorResult execute(Job job) {
+        public ExecutorResult<Boolean> execute(Job job, String dataContext) 
throws JobException {
             log.info("test execute count:{}", 
testExecuteCount.incrementAndGet());
             return new ExecutorResult<>(true, true, null, "");
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to