This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 232f120edc [Improve](Job)Support other types of Job query interfaces 
(#24172)
232f120edc is described below

commit 232f120edc713955741898af94698dca247ae7de
Author: Calvin Kirs <acm_mas...@163.com>
AuthorDate: Tue Sep 12 13:55:56 2023 +0800

    [Improve](Job)Support other types of Job query interfaces (#24172)
    
    - Support MTMV job
    - Task info add create time and sql
    - Optimize scheduling logic
---
 .../main/java/org/apache/doris/common/Config.java  |  4 +-
 fe/fe-core/src/main/cup/sql_parser.cup             |  6 +-
 .../org/apache/doris/analysis/ShowJobStmt.java     | 44 +++++++++-----
 .../org/apache/doris/analysis/ShowJobTaskStmt.java | 29 ++++++---
 .../main/java/org/apache/doris/catalog/Env.java    |  1 +
 .../java/org/apache/doris/qe/ShowExecutor.java     |  5 +-
 .../doris/scheduler/constants/JobCategory.java     | 13 +++--
 .../doris/scheduler/disruptor/TaskDisruptor.java   | 20 +++----
 .../doris/scheduler/disruptor/TaskEvent.java       |  2 +
 .../doris/scheduler/disruptor/TaskHandler.java     | 32 +++++++---
 .../doris/scheduler/executor/JobExecutor.java      |  3 +-
 .../doris/scheduler/executor/SqlJobExecutor.java   | 14 +++--
 .../scheduler/executor/TransientTaskExecutor.java  |  2 +-
 .../ExecutorResult.java}                           | 29 +++++----
 .../java/org/apache/doris/scheduler/job/Job.java   | 13 ++++-
 .../org/apache/doris/scheduler/job/JobTask.java    | 18 +++++-
 .../apache/doris/scheduler/job/TimerJobTask.java   |  9 ++-
 .../doris/scheduler/manager/JobTaskManager.java    | 26 +++++++++
 .../doris/scheduler/manager/TimerJobManager.java   | 68 ++++++++++++++++------
 .../scheduler/disruptor/TaskDisruptorTest.java     |  7 ++-
 .../scheduler/disruptor/TimerJobManagerTest.java   |  6 +-
 21 files changed, 241 insertions(+), 110 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 54a0e814a4..fd5258e0c0 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1554,7 +1554,7 @@ public class Config extends ConfigBase {
     public static boolean enable_pipeline_load = false;
 
     @ConfField
-    public static int scheduler_job_task_max_saved_count = 10;
+    public static int scheduler_job_task_max_saved_count = 20;
 
     /**
      * The number of async tasks that can be queued. @See TaskDisruptor
@@ -1568,7 +1568,7 @@ public class Config extends ConfigBase {
      * if we have a lot of async tasks, we need more threads to consume them. 
Sure, it's depends on the cpu cores.
      */
     @ConfField
-    public static int async_task_consumer_thread_num = 10;
+    public static int async_task_consumer_thread_num = 5;
 
     // enable_workload_group should be immutable and temporarily set to 
mutable during the development test phase
     @ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL)
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup 
b/fe/fe-core/src/main/cup/sql_parser.cup
index 94fcd2260f..99c65ffee2 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -2689,15 +2689,15 @@ resume_job_stmt ::=
 show_job_stmt ::=
     KW_SHOW KW_JOBS
     {:
-        RESULT = new ShowJobStmt(null,null);
+        RESULT = new ShowJobStmt(null,null,null);
     :}
     | KW_SHOW  KW_JOB KW_FOR job_label:jobLabel
     {:
-        RESULT = new ShowJobStmt(jobLabel,null);
+        RESULT = new ShowJobStmt(null,jobLabel,null);
     :}
     | KW_SHOW KW_JOB KW_TASKS KW_FOR job_label:jobLabel
     {:
-        RESULT = new ShowJobTaskStmt(jobLabel);
+        RESULT = new ShowJobTaskStmt(null,jobLabel);
     :}    
     ;
 pause_job_stmt ::=
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobStmt.java
index 56fc916b2a..7ad3ce343c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobStmt.java
@@ -26,18 +26,21 @@ import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.UserException;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.ShowResultSetMetaData;
+import org.apache.doris.scheduler.constants.JobCategory;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
+import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
 
 import java.util.List;
 
 /**
  * SHOW JOB [FOR JobName]
  * eg: show event
- *     return all job in connection db
+ * return all job in connection db
  * eg: show event for test
- *     return job named test in connection db
+ * return job named test in connection db
  */
 public class ShowJobStmt extends ShowStmt {
 
@@ -57,29 +60,32 @@ public class ShowJobStmt extends ShowStmt {
                     .add("Status")
                     .add("LastExecuteFinishTime")
                     .add("ErrorMsg")
+                    .add("CreateTime")
                     .add("Comment")
                     .build();
 
+    private static final String MTMV_NAME_TITLE = "mtmv_name";
+
+    private static final String NAME_TITLE = "name";
     private final LabelName labelName;
+
+    @Getter
     private String dbFullName; // optional
+
+    @Getter
+    private JobCategory jobCategory; // optional
+
+    private String jobCategoryName; // optional
+
+    @Getter
     private String name; // optional
+    @Getter
     private String pattern; // optional
 
-    public ShowJobStmt(LabelName labelName, String pattern) {
+    public ShowJobStmt(String category, LabelName labelName, String pattern) {
         this.labelName = labelName;
         this.pattern = pattern;
-    }
-
-    public String getDbFullName() {
-        return dbFullName;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public String getPattern() {
-        return pattern;
+        this.jobCategoryName = category;
     }
 
     @Override
@@ -87,6 +93,11 @@ public class ShowJobStmt extends ShowStmt {
         super.analyze(analyzer);
         checkAuth();
         checkLabelName(analyzer);
+        if (StringUtils.isBlank(jobCategoryName)) {
+            this.jobCategory = JobCategory.SQL;
+        } else {
+            this.jobCategory = 
JobCategory.valueOf(jobCategoryName.toUpperCase());
+        }
     }
 
     private void checkAuth() throws AnalysisException {
@@ -118,6 +129,9 @@ public class ShowJobStmt extends ShowStmt {
         ShowResultSetMetaData.Builder builder = 
ShowResultSetMetaData.builder();
 
         for (String title : TITLE_NAMES) {
+            if (this.jobCategory.equals(JobCategory.MTMV) && 
title.equals(NAME_TITLE)) {
+                builder.addColumn(new Column(MTMV_NAME_TITLE, 
ScalarType.createVarchar(30)));
+            }
             builder.addColumn(new Column(title, ScalarType.createVarchar(30)));
         }
         return builder.build();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java
index e7b04d9ece..74dca72fb7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java
@@ -25,9 +25,12 @@ import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.UserException;
 import org.apache.doris.qe.ShowResultSetMetaData;
+import org.apache.doris.scheduler.constants.JobCategory;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
+import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
 
 import java.util.List;
 
@@ -40,27 +43,30 @@ public class ShowJobTaskStmt extends ShowStmt {
             new ImmutableList.Builder<String>()
                     .add("JobId")
                     .add("TaskId")
+                    .add("CreateTime")
                     .add("StartTime")
                     .add("EndTime")
                     .add("Status")
+                    .add("ExecuteSql")
                     .add("Result")
                     .add("ErrorMsg")
                     .build();
 
+    @Getter
     private final LabelName labelName;
+
+    @Getter
+    private JobCategory jobCategory; // optional
+
+    private String jobCategoryName; // optional
+    @Getter
     private String dbFullName; // optional
+    @Getter
     private String name; // optional
 
-    public ShowJobTaskStmt(LabelName labelName) {
+    public ShowJobTaskStmt(String category, LabelName labelName) {
         this.labelName = labelName;
-    }
-
-    public String getDbFullName() {
-        return dbFullName;
-    }
-
-    public String getName() {
-        return name;
+        this.jobCategoryName = category;
     }
 
     @Override
@@ -68,6 +74,11 @@ public class ShowJobTaskStmt extends ShowStmt {
         super.analyze(analyzer);
         CreateJobStmt.checkAuth();
         checkLabelName(analyzer);
+        if (StringUtils.isBlank(jobCategoryName)) {
+            this.jobCategory = JobCategory.SQL;
+        } else {
+            this.jobCategory = 
JobCategory.valueOf(jobCategoryName.toUpperCase());
+        }
     }
 
     private void checkLabelName(Analyzer analyzer) throws AnalysisException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 8ba9335e28..c016ada719 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -1523,6 +1523,7 @@ public class Env {
         publishVersionDaemon.start();
         // Start txn cleaner
         txnCleaner.start();
+        timerJobManager.start();
         // Alter
         getAlterInstance().start();
         // Consistency checker
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 03bdd24d0e..3b4049338b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -193,7 +193,6 @@ import org.apache.doris.mtmv.MTMVJobManager;
 import org.apache.doris.mtmv.metadata.MTMVJob;
 import org.apache.doris.mtmv.metadata.MTMVTask;
 import org.apache.doris.mysql.privilege.PrivPredicate;
-import org.apache.doris.scheduler.constants.JobCategory;
 import org.apache.doris.scheduler.job.Job;
 import org.apache.doris.scheduler.job.JobTask;
 import org.apache.doris.statistics.AnalysisInfo;
@@ -1441,7 +1440,7 @@ public class ShowExecutor {
         ShowJobTaskStmt showJobTaskStmt = (ShowJobTaskStmt) stmt;
         List<List<String>> rows = Lists.newArrayList();
         List<Job> jobs = Env.getCurrentEnv().getJobRegister()
-                .getJobs(showJobTaskStmt.getDbFullName(), 
showJobTaskStmt.getName(), JobCategory.SQL,
+                .getJobs(showJobTaskStmt.getDbFullName(), 
showJobTaskStmt.getName(), showJobTaskStmt.getJobCategory(),
                         null);
         if (CollectionUtils.isEmpty(jobs)) {
             resultSet = new ShowResultSet(showJobTaskStmt.getMetaData(), rows);
@@ -1470,7 +1469,7 @@ public class ShowExecutor {
                     CaseSensibility.JOB.getCaseSensibility());
         }
         jobList = Env.getCurrentEnv().getJobRegister()
-                .getJobs(showJobStmt.getDbFullName(), showJobStmt.getName(), 
JobCategory.SQL,
+                .getJobs(showJobStmt.getDbFullName(), showJobStmt.getName(), 
showJobStmt.getJobCategory(),
                         matcher);
 
         if (jobList.isEmpty()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobCategory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobCategory.java
index eb2653b9da..72a967625e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobCategory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobCategory.java
@@ -25,25 +25,26 @@ import lombok.Getter;
 public enum JobCategory {
     COMMON(1, "common"),
     SQL(2, "sql"),
+    MTMV(3, "mtmv"),
     ;
 
     @Getter
     private int code;
 
     @Getter
-    private String description;
+    private String name;
 
-    JobCategory(int code, String description) {
+    JobCategory(int code, String name) {
         this.code = code;
-        this.description = description;
+        this.name = name;
     }
 
-    public static JobCategory getJobCategoryByCode(int code) {
+    public static JobCategory getJobCategoryByName(String name) {
         for (JobCategory jobCategory : JobCategory.values()) {
-            if (jobCategory.getCode() == code) {
+            if (jobCategory.name.equalsIgnoreCase(name)) {
                 return jobCategory;
             }
         }
-        return null;
+        throw new IllegalArgumentException("Unknown job category name: " + 
name);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
index 3b59a5187e..dcf346e3eb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
@@ -23,8 +23,7 @@ import org.apache.doris.scheduler.manager.TimerJobManager;
 import org.apache.doris.scheduler.manager.TransientTaskManager;
 
 import com.lmax.disruptor.BlockingWaitStrategy;
-import com.lmax.disruptor.EventTranslatorOneArg;
-import com.lmax.disruptor.EventTranslatorTwoArg;
+import com.lmax.disruptor.EventTranslatorThreeArg;
 import com.lmax.disruptor.TimeoutException;
 import com.lmax.disruptor.WorkHandler;
 import com.lmax.disruptor.dsl.Disruptor;
@@ -65,12 +64,13 @@ public class TaskDisruptor implements Closeable {
     private boolean isClosed = false;
 
     /**
-     * The default {@link EventTranslatorOneArg} to use for {@link 
#tryPublish(Long)}.
+     * The default {@link EventTranslatorThreeArg} to use for {@link 
#tryPublish(Long, Long)}.
      * This is used to avoid creating a new object for each publish.
      */
-    private static final EventTranslatorTwoArg<TaskEvent, Long, TaskType> 
TRANSLATOR
-            = (event, sequence, jobId, taskType) -> {
+    private static final EventTranslatorThreeArg<TaskEvent, Long, Long, 
TaskType> TRANSLATOR
+            = (event, sequence, jobId, taskId, taskType) -> {
                 event.setId(jobId);
+                event.setTaskId(taskId);
                 event.setTaskType(taskType);
             };
 
@@ -89,15 +89,15 @@ public class TaskDisruptor implements Closeable {
     /**
      * Publishes a job to the disruptor.
      *
-     * @param jobId  job id
+     * @param jobId job id
      */
-    public void tryPublish(Long jobId) {
+    public void tryPublish(Long jobId, Long taskId) {
         if (isClosed) {
             log.info("tryPublish failed, disruptor is closed, jobId: {}", 
jobId);
             return;
         }
         try {
-            disruptor.publishEvent(TRANSLATOR, jobId, TaskType.TimerJobTask);
+            disruptor.publishEvent(TRANSLATOR, jobId, taskId, 
TaskType.TimerJobTask);
         } catch (Exception e) {
             log.error("tryPublish failed, jobId: {}", jobId, e);
         }
@@ -106,7 +106,7 @@ public class TaskDisruptor implements Closeable {
     /**
      * Publishes a task to the disruptor.
      *
-     * @param taskId  task id
+     * @param taskId task id
      */
     public void tryPublishTask(Long taskId) {
         if (isClosed) {
@@ -114,7 +114,7 @@ public class TaskDisruptor implements Closeable {
             return;
         }
         try {
-            disruptor.publishEvent(TRANSLATOR, taskId, TaskType.TransientTask);
+            disruptor.publishEvent(TRANSLATOR, taskId, 0L, 
TaskType.TransientTask);
         } catch (Exception e) {
             log.error("tryPublish failed, taskId: {}", taskId, e);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskEvent.java 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskEvent.java
index 6c4fcb1076..8b24b8c194 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskEvent.java
@@ -37,6 +37,8 @@ public class TaskEvent {
      */
     private Long id;
 
+    private Long taskId;
+
     private TaskType taskType;
 
     public static final EventFactory<TaskEvent> FACTORY = TaskEvent::new;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
index 0b309ec3ce..3056502e86 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
@@ -18,8 +18,10 @@
 package org.apache.doris.scheduler.disruptor;
 
 import org.apache.doris.catalog.Env;
+import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.scheduler.exception.JobException;
 import org.apache.doris.scheduler.executor.TransientTaskExecutor;
+import org.apache.doris.scheduler.job.ExecutorResult;
 import org.apache.doris.scheduler.job.Job;
 import org.apache.doris.scheduler.job.JobTask;
 import org.apache.doris.scheduler.manager.JobTaskManager;
@@ -29,8 +31,6 @@ import 
org.apache.doris.scheduler.manager.TransientTaskManager;
 import com.lmax.disruptor.WorkHandler;
 import lombok.extern.slf4j.Slf4j;
 
-import java.util.Objects;
-
 /**
  * This class represents a work handler for processing event tasks consumed by 
a Disruptor.
  * The work handler retrieves the associated event job and executes it if it 
is running.
@@ -89,6 +89,8 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
     @SuppressWarnings("checkstyle:UnusedLocalVariable")
     public void onTimerJobTaskHandle(TaskEvent taskEvent) {
         long jobId = taskEvent.getId();
+        long taskId = taskEvent.getTaskId();
+        long createTimeMs = jobTaskManager.pollPrepareTaskByTaskId(jobId, 
taskId);
         Job job = timerJobManager.getJob(jobId);
         if (job == null) {
             log.info("job is null, jobId: {}", jobId);
@@ -99,10 +101,11 @@ public class TaskHandler implements WorkHandler<TaskEvent> 
{
             return;
         }
         log.debug("job is running, eventJobId: {}", jobId);
-        JobTask jobTask = new JobTask(jobId);
+
+        JobTask jobTask = new JobTask(jobId, taskId, createTimeMs);
         try {
             jobTask.setStartTimeMs(System.currentTimeMillis());
-            Object result = job.getExecutor().execute(job);
+            ExecutorResult result = job.getExecutor().execute(job);
             job.setLatestCompleteExecuteTimeMs(System.currentTimeMillis());
             if (job.isCycleJob()) {
                 updateJobStatusIfPastEndTime(job);
@@ -110,14 +113,27 @@ public class TaskHandler implements 
WorkHandler<TaskEvent> {
                 // one time job should be finished after execute
                 updateOnceTimeJobStatus(job);
             }
-            String resultStr = Objects.isNull(result) ? "" : result.toString();
+            if (null == result) {
+                log.warn("Job execute failed, jobId: {}, result is null", 
jobId);
+                jobTask.setErrorMsg("Job execute failed, result is null");
+                jobTask.setIsSuccessful(false);
+                timerJobManager.pauseJob(jobId);
+                return;
+            }
+            String resultStr = GsonUtils.GSON.toJson(result.getResult());
             jobTask.setExecuteResult(resultStr);
-            jobTask.setIsSuccessful(true);
+            jobTask.setIsSuccessful(result.isSuccess());
+            if (!result.isSuccess()) {
+                log.warn("Job execute failed, jobId: {}, msg : {}", jobId, 
result.getExecutorSql());
+                jobTask.setErrorMsg(result.getExecutorSql());
+                timerJobManager.pauseJob(jobId);
+            }
+            jobTask.setExecuteSql(result.getExecutorSql());
         } catch (Exception e) {
             log.warn("Job execute failed, jobId: {}, msg : {}", jobId, 
e.getMessage());
-            job.pause(e.getMessage());
             jobTask.setErrorMsg(e.getMessage());
             jobTask.setIsSuccessful(false);
+            timerJobManager.pauseJob(jobId);
         }
         jobTask.setEndTimeMs(System.currentTimeMillis());
         if (null == jobTaskManager) {
@@ -143,7 +159,7 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
 
     private void updateJobStatusIfPastEndTime(Job job) {
         if (job.isExpired()) {
-            job.finish();
+            timerJobManager.finishJob(job.getJobId());
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java
index e67e62f267..a6f2e10306 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java
@@ -18,6 +18,7 @@
 package org.apache.doris.scheduler.executor;
 
 import org.apache.doris.scheduler.exception.JobException;
+import org.apache.doris.scheduler.job.ExecutorResult;
 import org.apache.doris.scheduler.job.Job;
 
 /**
@@ -39,6 +40,6 @@ public interface JobExecutor<T> {
      *
      * @return The result of the event job execution.
      */
-    T execute(Job job) throws JobException;
+    ExecutorResult execute(Job job) throws JobException;
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java
index 572e895906..5441532142 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java
@@ -23,6 +23,7 @@ import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.scheduler.exception.JobException;
+import org.apache.doris.scheduler.job.ExecutorResult;
 import org.apache.doris.scheduler.job.Job;
 import org.apache.doris.thrift.TUniqueId;
 
@@ -36,10 +37,9 @@ import java.util.UUID;
 /**
  * we use this executor to execute sql job
  *
- * @param <QueryState> the state of sql job, we can record the state of sql job
  */
 @Slf4j
-public class SqlJobExecutor<QueryState> implements JobExecutor {
+public class SqlJobExecutor implements JobExecutor {
 
     @Getter
     @Setter
@@ -51,7 +51,7 @@ public class SqlJobExecutor<QueryState> implements 
JobExecutor {
     }
 
     @Override
-    public String execute(Job job) throws JobException {
+    public ExecutorResult<String> execute(Job job) throws JobException {
         ConnectContext ctx = new ConnectContext();
         ctx.setEnv(Env.getCurrentEnv());
         
ctx.setCluster(ClusterNamespace.getClusterNameFromFullName(job.getDbName()));
@@ -67,9 +67,12 @@ public class SqlJobExecutor<QueryState> implements 
JobExecutor {
         try {
             StmtExecutor executor = new StmtExecutor(ctx, sql);
             executor.execute(queryId);
-            return convertExecuteResult(ctx, taskIdString);
+            String result = convertExecuteResult(ctx, taskIdString);
+
+            return new ExecutorResult<>(result, true, null, sql);
         } catch (Exception e) {
-            throw new JobException("execute sql job failed, sql: " + sql + ", 
error: " + e.getMessage());
+            log.warn("execute sql job failed, sql: {}, error: {}", sql, 
e.getMessage());
+            return new ExecutorResult<>(null, false, e.getMessage(), sql);
         }
 
     }
@@ -82,6 +85,7 @@ public class SqlJobExecutor<QueryState> implements 
JobExecutor {
             throw new JobException("error code: " + 
ctx.getState().getErrorCode() + ", error msg: "
                     + ctx.getState().getErrorMessage());
         }
+
         return "queryId:" + queryId + ",affectedRows : " + 
ctx.getState().getAffectedRows() + ", warningRows: "
                 + ctx.getState().getWarningRows() + ",infoMsg" + 
ctx.getState().getInfoMessage();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/TransientTaskExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/TransientTaskExecutor.java
index 6f818ae841..f297753bbd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/TransientTaskExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/TransientTaskExecutor.java
@@ -22,7 +22,7 @@ import org.apache.doris.scheduler.exception.JobException;
 /**
  * A functional interface for executing a memory task.
  */
-public interface TransientTaskExecutor<T> {
+public interface TransientTaskExecutor {
 
     /**
      * Executes the memory task.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/TransientTaskExecutor.java
 b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/ExecutorResult.java
similarity index 62%
copy from 
fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/TransientTaskExecutor.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/scheduler/job/ExecutorResult.java
index 6f818ae841..99df9c9e78 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/TransientTaskExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/ExecutorResult.java
@@ -15,24 +15,21 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.scheduler.executor;
+package org.apache.doris.scheduler.job;
 
-import org.apache.doris.scheduler.exception.JobException;
+import lombok.AllArgsConstructor;
+import lombok.Data;
 
-/**
- * A functional interface for executing a memory task.
- */
-public interface TransientTaskExecutor<T> {
+@Data
+@AllArgsConstructor
+public class ExecutorResult<T> {
 
-    /**
-     * Executes the memory task.
-     * Exceptions will be caught internally, so there is no need to define or 
throw them separately.
-     */
-    void execute() throws JobException;
+    private T result;
 
-    /**
-     * Cancel the memory task.
-     */
-    void cancel() throws JobException;
-}
+    private boolean success;
+
+    private String errorMsg;
 
+    private String executorSql;
+
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java
index e65cf07754..ebfdfb04a2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java
@@ -87,6 +87,8 @@ public class Job implements Writable {
      */
     @SerializedName("executor")
     private JobExecutor executor;
+    @SerializedName("baseName")
+    private String baseName;
 
     @SerializedName("user")
     private String user;
@@ -124,6 +126,9 @@ public class Job implements Writable {
     @SerializedName("nextExecuteTimeMs")
     private Long nextExecuteTimeMs = 0L;
 
+    @SerializedName("createTimeMs")
+    private Long createTimeMs = System.currentTimeMillis();
+
     @SerializedName("comment")
     private String comment;
 
@@ -209,7 +214,9 @@ public class Job implements Writable {
         if (endTimeMs != 0L && endTimeMs < System.currentTimeMillis()) {
             throw new DdlException("endTimeMs must be greater than current 
time");
         }
-
+        if (null != intervalUnit && null != originInterval) {
+            this.intervalMs = intervalUnit.getParameterValue(originInterval);
+        }
         if (isCycleJob && (intervalMs == null || intervalMs <= 0L)) {
             throw new DdlException("cycle job must set intervalMs");
         }
@@ -236,6 +243,9 @@ public class Job implements Writable {
         List<String> row = Lists.newArrayList();
         row.add(String.valueOf(jobId));
         row.add(dbName);
+        if (jobCategory.equals(JobCategory.MTMV)) {
+            row.add(baseName);
+        }
         row.add(jobName);
         row.add(user);
         row.add(timezone);
@@ -256,6 +266,7 @@ public class Job implements Writable {
         row.add(jobStatus.name());
         row.add(latestCompleteExecuteTimeMs <= 0L ? "null" : 
TimeUtils.longToTimeString(latestCompleteExecuteTimeMs));
         row.add(errMsg == null ? "null" : errMsg);
+        row.add(createTimeMs <= 0L ? "null" : 
TimeUtils.longToTimeString(createTimeMs));
         row.add(comment == null ? "null" : comment);
         return row;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java
index 3bfe4ffc44..04fb552e74 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java
@@ -38,27 +38,36 @@ public class JobTask implements Writable {
     private Long jobId;
     @SerializedName("taskId")
     private Long taskId;
+    @SerializedName("createTimeMs")
+    private Long createTimeMs;
     @SerializedName("startTimeMs")
     private Long startTimeMs;
     @SerializedName("endTimeMs")
     private Long endTimeMs;
     @SerializedName("successful")
     private Boolean isSuccessful;
+
+    @SerializedName("executeSql")
+    private String executeSql;
     @SerializedName("executeResult")
     private String executeResult;
     @SerializedName("errorMsg")
     private String errorMsg;
 
-    public JobTask(Long jobId) {
+    public JobTask(Long jobId, Long taskId, Long createTimeMs) {
         //it's enough to use nanoTime to identify a task
-        this.taskId = System.nanoTime();
+        this.taskId = taskId;
         this.jobId = jobId;
+        this.createTimeMs = createTimeMs;
     }
 
     public List<String> getShowInfo() {
         List<String> row = Lists.newArrayList();
         row.add(String.valueOf(jobId));
         row.add(String.valueOf(taskId));
+        if (null != createTimeMs) {
+            row.add(TimeUtils.longToTimeString(createTimeMs));
+        }
         row.add(TimeUtils.longToTimeString(startTimeMs));
         row.add(null == endTimeMs ? "null" : 
TimeUtils.longToTimeString(endTimeMs));
         if (endTimeMs == null) {
@@ -66,6 +75,11 @@ public class JobTask implements Writable {
         } else {
             row.add(isSuccessful ? "SUCCESS" : "FAILED");
         }
+        if (null == executeSql) {
+            row.add("null");
+        } else {
+            row.add(executeSql);
+        }
         if (null == executeResult) {
             row.add("null");
         } else {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/TimerJobTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/TimerJobTask.java
index 655df9467d..b3a199a8a6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/TimerJobTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/TimerJobTask.java
@@ -23,8 +23,6 @@ import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
 import lombok.Getter;
 
-import java.util.UUID;
-
 /**
  * This class represents a timer task that can be scheduled by a Netty timer.
  * When the timer task is triggered, it produces a Job task using the 
Disruptor.
@@ -36,16 +34,17 @@ public class TimerJobTask implements TimerTask {
     private final Long jobId;
 
     // more fields should be added here and record in feature
-    private final Long taskId = UUID.randomUUID().getMostSignificantBits();
+    private final Long taskId;
 
     private final Long startTimestamp;
 
     private final TaskDisruptor taskDisruptor;
 
-    public TimerJobTask(Long jobId, Long startTimestamp, TaskDisruptor 
taskDisruptor) {
+    public TimerJobTask(Long jobId, Long taskId, Long startTimestamp, 
TaskDisruptor taskDisruptor) {
         this.jobId = jobId;
         this.startTimestamp = startTimestamp;
         this.taskDisruptor = taskDisruptor;
+        this.taskId = taskId;
     }
 
     @Override
@@ -53,6 +52,6 @@ public class TimerJobTask implements TimerTask {
         if (timeout.isCancelled()) {
             return;
         }
-        taskDisruptor.tryPublish(jobId);
+        taskDisruptor.tryPublish(jobId, taskId);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java
index e18a143ec7..5117e2d417 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java
@@ -31,6 +31,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -44,6 +45,31 @@ public class JobTaskManager implements Writable {
 
     private ConcurrentHashMap<Long, ConcurrentLinkedQueue<JobTask>> jobTaskMap 
= new ConcurrentHashMap<>(16);
 
+
+    /**
+     * taskId -> startTime
+     * used to record the start time of the task to be executed
+     * will clear when the task is executed
+     */
+    private static ConcurrentHashMap<Long, Map<Long, Long>> 
prepareTaskCreateMsMap = new ConcurrentHashMap<>(16);
+
+    public static void addPrepareTaskStartTime(Long jobId, Long taskId, Long 
startTime) {
+        prepareTaskCreateMsMap.computeIfAbsent(jobId, k -> new HashMap<>());
+        prepareTaskCreateMsMap.get(jobId).put(taskId, startTime);
+    }
+
+    public static Long pollPrepareTaskByTaskId(Long jobId, Long taskId) {
+        if (!prepareTaskCreateMsMap.containsKey(jobId)) {
+            // if the job is not in the map, return current time
+            return System.currentTimeMillis();
+        }
+        return prepareTaskCreateMsMap.get(jobId).remove(taskId);
+    }
+
+    public static void clearPrepareTaskByJobId(Long jobId) {
+        prepareTaskCreateMsMap.remove(jobId);
+    }
+
     public void addJobTask(JobTask jobTask) {
         ConcurrentLinkedQueue<JobTask> jobTasks = jobTaskMap
                 .computeIfAbsent(jobTask.getJobId(), k -> new 
ConcurrentLinkedQueue<>());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
index 9000dba470..314b24f052 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.PatternMatcher;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.LogBuilder;
 import org.apache.doris.common.util.LogKey;
+import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.scheduler.constants.JobCategory;
 import org.apache.doris.scheduler.constants.JobStatus;
 import org.apache.doris.scheduler.disruptor.TaskDisruptor;
@@ -50,9 +51,7 @@ import java.util.concurrent.TimeUnit;
 public class TimerJobManager implements Closeable, Writable {
 
     private final ConcurrentHashMap<Long, Job> jobMap = new 
ConcurrentHashMap<>(128);
-
     private long lastBatchSchedulerTimestamp;
-
     private static final long BATCH_SCHEDULER_INTERVAL_SECONDS = 600;
 
     /**
@@ -72,7 +71,7 @@ public class TimerJobManager implements Closeable, Writable {
     /**
      * scheduler tasks, it's used to scheduler job
      */
-    private final HashedWheelTimer dorisTimer = new HashedWheelTimer(1, 
TimeUnit.SECONDS, 660);
+    private HashedWheelTimer dorisTimer;
 
     /**
      * Producer and Consumer model
@@ -83,8 +82,18 @@ public class TimerJobManager implements Closeable, Writable {
     private TaskDisruptor disruptor;
 
     public TimerJobManager() {
-        dorisTimer.start();
         this.lastBatchSchedulerTimestamp = System.currentTimeMillis();
+    }
+
+    public void start() {
+        dorisTimer = new HashedWheelTimer(1, TimeUnit.SECONDS, 660);
+        dorisTimer.start();
+        Long currentTimeMs = System.currentTimeMillis();
+        jobMap.forEach((jobId, job) -> {
+            Long nextExecuteTimeMs = findFistExecuteTime(currentTimeMs, 
job.getStartTimeMs(),
+                    job.getIntervalMs(), job.isCycleJob());
+            job.setNextExecuteTimeMs(nextExecuteTimeMs);
+        });
         batchSchedulerTasks();
         cycleSystemSchedulerTasks();
     }
@@ -99,6 +108,9 @@ public class TimerJobManager implements Closeable, Writable {
     }
 
     public void replayCreateJob(Job job) {
+        if (jobMap.containsKey(job.getJobId())) {
+            return;
+        }
         jobMap.putIfAbsent(job.getJobId(), job);
         initAndSchedulerJob(job);
         log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId())
@@ -252,6 +264,20 @@ public class TimerJobManager implements Closeable, 
Writable {
         Env.getCurrentEnv().getEditLog().logUpdateJob(job);
     }
 
+    public void finishJob(long jobId) {
+        Job job = jobMap.get(jobId);
+        if (jobMap.get(jobId) == null) {
+            log.warn("update job status failed, jobId: {} not exist", jobId);
+        }
+        if (jobMap.get(jobId).getJobStatus().equals(JobStatus.FINISHED)) {
+            return;
+        }
+        cancelJobAllTask(job.getJobId());
+        job.setJobStatus(JobStatus.FINISHED);
+        jobMap.get(job.getJobId()).finish();
+        Env.getCurrentEnv().getEditLog().logUpdateJob(job);
+    }
+
     private Optional<Job> findJob(String dbName, String jobName, JobCategory 
jobCategory) {
         return jobMap.values().stream().filter(job -> checkJobMatch(job, 
dbName, jobName, jobCategory)).findFirst();
     }
@@ -297,11 +323,15 @@ public class TimerJobManager implements Closeable, 
Writable {
     }
 
     private List<Long> findTasksBetweenTime(Job job, Long endTimeEndWindow, 
Long nextExecuteTime) {
+
         List<Long> jobExecuteTimes = new ArrayList<>();
         if (!job.isCycleJob() && (nextExecuteTime < endTimeEndWindow)) {
             jobExecuteTimes.add(nextExecuteTime);
             return jobExecuteTimes;
         }
+        if (job.isCycleJob() && (nextExecuteTime > endTimeEndWindow)) {
+            return new ArrayList<>();
+        }
         while (endTimeEndWindow >= nextExecuteTime) {
             if (job.isTaskTimeExceeded()) {
                 break;
@@ -345,7 +375,12 @@ public class TimerJobManager implements Closeable, 
Writable {
      * Jobs will be re-registered after the task is completed
      */
     private void cycleSystemSchedulerTasks() {
-        dorisTimer.newTimeout(timeout -> batchSchedulerTasks(), 
BATCH_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS);
+        log.info("re-register system scheduler tasks" + 
TimeUtils.longToTimeString(System.currentTimeMillis()));
+        dorisTimer.newTimeout(timeout -> {
+            batchSchedulerTasks();
+            cycleSystemSchedulerTasks();
+        }, BATCH_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS);
+
     }
 
     /**
@@ -358,11 +393,12 @@ public class TimerJobManager implements Closeable, 
Writable {
      *                         delay seconds, we just can be second precision
      */
     public void putOneTask(Long jobId, Long startExecuteTime) {
-        TimerJobTask task = new TimerJobTask(jobId, startExecuteTime, 
disruptor);
         if (isClosed) {
-            log.info("putOneTask failed, scheduler is closed, jobId: {}", 
task.getJobId());
+            log.info("putOneTask failed, scheduler is closed, jobId: {}", 
jobId);
             return;
         }
+        long taskId = System.nanoTime();
+        TimerJobTask task = new TimerJobTask(jobId, taskId, startExecuteTime, 
disruptor);
         long delay = getDelaySecond(task.getStartTimestamp());
         Timeout timeout = dorisTimer.newTimeout(task, delay, TimeUnit.SECONDS);
         if (timeout == null) {
@@ -371,11 +407,13 @@ public class TimerJobManager implements Closeable, 
Writable {
         }
         if (jobTimeoutMap.containsKey(task.getJobId())) {
             jobTimeoutMap.get(task.getJobId()).put(task.getTaskId(), timeout);
+            JobTaskManager.addPrepareTaskStartTime(jobId, taskId, 
startExecuteTime);
             return;
         }
         Map<Long, Timeout> timeoutMap = new ConcurrentHashMap<>();
         timeoutMap.put(task.getTaskId(), timeout);
         jobTimeoutMap.put(task.getJobId(), timeoutMap);
+        JobTaskManager.addPrepareTaskStartTime(jobId, taskId, 
startExecuteTime);
     }
 
     // cancel all task for one job
@@ -390,14 +428,7 @@ public class TimerJobManager implements Closeable, 
Writable {
                 timeout.cancel();
             }
         });
-    }
-
-    public void stopTask(Long jobId, Long taskId) {
-        if (!jobTimeoutMap.containsKey(jobId)) {
-            return;
-        }
-        cancelJobAllTask(jobId);
-        jobTimeoutMap.get(jobId).remove(taskId);
+        JobTaskManager.clearPrepareTaskByJobId(jobId);
     }
 
     // get delay time, if startTimestamp is less than now, return 0
@@ -452,7 +483,9 @@ public class TimerJobManager implements Closeable, Writable 
{
     }
 
     public void putOneJobToQueen(Long jobId) {
-        disruptor.tryPublish(jobId);
+        long taskId = System.nanoTime();
+        JobTaskManager.addPrepareTaskStartTime(jobId, taskId, 
System.currentTimeMillis());
+        disruptor.tryPublish(jobId, taskId);
     }
 
     @Override
@@ -473,8 +506,7 @@ public class TimerJobManager implements Closeable, Writable 
{
         int size = in.readInt();
         for (int i = 0; i < size; i++) {
             Job job = Job.readFields(in);
-            jobMap.put(job.getJobId(), job);
-            initAndSchedulerJob(job);
+            jobMap.putIfAbsent(job.getJobId(), job);
         }
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TaskDisruptorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TaskDisruptorTest.java
index e60587701e..878f8c5594 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TaskDisruptorTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TaskDisruptorTest.java
@@ -19,6 +19,7 @@ package org.apache.doris.scheduler.disruptor;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.scheduler.executor.JobExecutor;
+import org.apache.doris.scheduler.job.ExecutorResult;
 import org.apache.doris.scheduler.job.Job;
 import org.apache.doris.scheduler.manager.TimerJobManager;
 import org.apache.doris.scheduler.manager.TransientTaskManager;
@@ -64,7 +65,7 @@ public class TaskDisruptorTest {
                 timerJobManager.getJob(anyLong);
                 result = job;
             }};
-        taskDisruptor.tryPublish(job.getJobId());
+        taskDisruptor.tryPublish(job.getJobId(), 1L);
         Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> 
testEventExecuteFlag);
         Assertions.assertTrue(testEventExecuteFlag);
     }
@@ -72,9 +73,9 @@ public class TaskDisruptorTest {
 
     class TestExecutor implements JobExecutor<Boolean> {
         @Override
-        public Boolean execute(Job job) {
+        public ExecutorResult execute(Job job) {
             testEventExecuteFlag = true;
-            return true;
+            return new ExecutorResult(true, true, null, "null");
         }
     }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java
index fd871be962..3e912b8fd8 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.DdlException;
 import org.apache.doris.persist.EditLog;
 import org.apache.doris.scheduler.constants.JobCategory;
 import org.apache.doris.scheduler.executor.JobExecutor;
+import org.apache.doris.scheduler.job.ExecutorResult;
 import org.apache.doris.scheduler.job.Job;
 import org.apache.doris.scheduler.manager.TimerJobManager;
 import org.apache.doris.scheduler.manager.TransientTaskManager;
@@ -60,6 +61,7 @@ public class TimerJobManagerTest {
         TransientTaskManager transientTaskManager = new TransientTaskManager();
         TaskDisruptor taskDisruptor = new TaskDisruptor(this.timerJobManager, 
transientTaskManager);
         this.timerJobManager.setDisruptor(taskDisruptor);
+        timerJobManager.start();
     }
 
     @Test
@@ -166,9 +168,9 @@ public class TimerJobManagerTest {
 
     class TestExecutor implements JobExecutor<Boolean> {
         @Override
-        public Boolean execute(Job job) {
+        public ExecutorResult execute(Job job) {
             log.info("test execute count:{}", 
testExecuteCount.incrementAndGet());
-            return true;
+            return new ExecutorResult<>(true, true, null, "");
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to