This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ca55bd88ad [Fix](Job)Fix the window time is not updated when no job is registered (#23628) ca55bd88ad is described below commit ca55bd88adf3a376404b1b8b5c67b4deeba3cb54 Author: Calvin Kirs <acm_mas...@163.com> AuthorDate: Wed Aug 30 09:48:21 2023 +0800 [Fix](Job)Fix the window time is not updated when no job is registered (#23628) Fix resume job grammar definition is inconsistent Show Job task Add execution results JOB allows to define update operations --- .../Data-Definition-Statements/Create/CREATE-JOB.md | 2 +- .../sql-reference/Show-Statements/SHOW-JOB-TASK.md | 1 + .../Data-Definition-Statements/Create/CREATE-JOB.md | 2 +- .../sql-reference/Show-Statements/SHOW-JOB-TASK.md | 1 + .../src/main/java/org/apache/doris/common/Config.java | 2 +- fe/fe-core/src/main/cup/sql_parser.cup | 2 +- .../java/org/apache/doris/analysis/CreateJobStmt.java | 3 ++- .../org/apache/doris/analysis/ShowJobTaskStmt.java | 1 + .../apache/doris/scheduler/disruptor/TaskHandler.java | 10 ++++------ .../doris/scheduler/executor/SqlJobExecutor.java | 19 +++++++++++++++---- .../java/org/apache/doris/scheduler/job/JobTask.java | 7 +++++++ .../doris/scheduler/manager/JobTaskManager.java | 2 +- .../doris/scheduler/manager/TimerJobManager.java | 6 ++++++ 13 files changed, 42 insertions(+), 16 deletions(-) diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md index 839fe0e72c..d89e0aad0d 100644 --- a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md +++ b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md @@ -98,7 +98,7 @@ The SCHEDULER statement is used to define the execution time, frequency and dura Used to specify the end time of the job, if not specified, it means permanent execution. - DO - It is used to specify the operation that needs to be performed when the job is triggered. Currently, all ***INSERT*** operations are supported. We will support more operations in the future. + It is used to specify the operation that needs to be performed when the job is triggered. Currently, all ***INSERT, UPDATE*** operations are supported. We will support more operations in the future. ### Example diff --git a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md index c7d1f6afb0..d816059510 100644 --- a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md +++ b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md @@ -49,6 +49,7 @@ Result description: StartTime: start execution time EndTime: end time Status: status + Result: execution result ErrMsg: error message ``` diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md index b67e054714..c06d48a4d0 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md @@ -99,7 +99,7 @@ SCHEDULER 语句用于定义作业的执行时间,频率以及持续时间, 用于指定作业的结束时间,如果没有指定,则表示永久执行。 - DO - 用于指定作业触发时需要执行的操作,目前支持所有的 ***INSERT*** 操作。后续我们会支持更多的操作。 + 用于指定作业触发时需要执行的操作,目前支持所有的 ***INSERT,UPDATE*** 操作。后续我们会支持更多的操作。 ### Example diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md index 3ae1e24733..d1b458ea29 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md @@ -50,6 +50,7 @@ SHOW JOB TASKS FOR job_name; StartTime: 开始执行时间 EndTime: 结束时间 Status: 状态 + Result: 执行结果 ErrMsg: 错误信息 ``` diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 2815dfb0ec..a683b307da 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1554,7 +1554,7 @@ public class Config extends ConfigBase { public static boolean enable_pipeline_load = false; @ConfField - public static int scheduler_job_task_max_num = 10; + public static int scheduler_job_task_max_saved_count = 10; /** * The number of async tasks that can be queued. @See TaskDisruptor diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index bff2a7ff9c..5f3311ab0f 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -2680,7 +2680,7 @@ create_job_stmt ::= :} ; resume_job_stmt ::= - KW_RESUME KW_JOB job_label:jobLabel + KW_RESUME KW_JOB KW_FOR job_label:jobLabel {: RESULT = new ResumeJobStmt(jobLabel); :} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java index a1f9b6bd82..999d630153 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java @@ -85,7 +85,8 @@ public class CreateJobStmt extends DdlStmt { private String timezone = TimeUtils.DEFAULT_TIME_ZONE; private static final ImmutableSet<Class<? extends DdlStmt>> supportStmtSuperClass - = new ImmutableSet.Builder<Class<? extends DdlStmt>>().add(InsertStmt.class).build(); + = new ImmutableSet.Builder<Class<? extends DdlStmt>>().add(InsertStmt.class) + .add(UpdateStmt.class).build(); private static HashSet<String> supportStmtClassNamesCache = new HashSet<>(16); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java index 1eb0241cfe..e7b04d9ece 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java @@ -43,6 +43,7 @@ public class ShowJobTaskStmt extends ShowStmt { .add("StartTime") .add("EndTime") .add("Status") + .add("Result") .add("ErrorMsg") .build(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java index 37ed214325..297537f712 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java @@ -108,11 +108,7 @@ public class TaskHandler implements WorkHandler<TaskEvent> { JobTask jobTask = new JobTask(jobId); try { jobTask.setStartTimeMs(System.currentTimeMillis()); - - - // TODO: We should record the result of the event task. - //Object result = job.getExecutor().execute(); - job.getExecutor().execute(job); + Object result = job.getExecutor().execute(job); job.setLatestCompleteExecuteTimeMs(System.currentTimeMillis()); if (job.isCycleJob()) { updateJobStatusIfPastEndTime(job); @@ -120,9 +116,11 @@ public class TaskHandler implements WorkHandler<TaskEvent> { // one time job should be finished after execute updateOnceTimeJobStatus(job); } + String resultStr = Objects.isNull(result) ? "" : result.toString(); + jobTask.setExecuteResult(resultStr); jobTask.setIsSuccessful(true); } catch (Exception e) { - log.warn("Event job execute failed, jobId: {}, msg : {}", jobId, e.getMessage()); + log.warn("Job execute failed, jobId: {}, msg : {}", jobId, e.getMessage()); job.pause(e.getMessage()); jobTask.setErrorMsg(e.getMessage()); jobTask.setIsSuccessful(false); diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java index 2f2495f7f8..572e895906 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java @@ -35,6 +35,7 @@ import java.util.UUID; /** * we use this executor to execute sql job + * * @param <QueryState> the state of sql job, we can record the state of sql job */ @Slf4j @@ -50,7 +51,7 @@ public class SqlJobExecutor<QueryState> implements JobExecutor { } @Override - public QueryState execute(Job job) throws JobException { + public String execute(Job job) throws JobException { ConnectContext ctx = new ConnectContext(); ctx.setEnv(Env.getCurrentEnv()); ctx.setCluster(ClusterNamespace.getClusterNameFromFullName(job.getDbName())); @@ -66,12 +67,22 @@ public class SqlJobExecutor<QueryState> implements JobExecutor { try { StmtExecutor executor = new StmtExecutor(ctx, sql); executor.execute(queryId); - log.debug("execute sql job success, sql: {}, state is: {}", sql, ctx.getState()); - return (QueryState) ctx.getState(); + return convertExecuteResult(ctx, taskIdString); } catch (Exception e) { - log.warn("execute sql job failed, sql: {}, error: {}", sql, e); throw new JobException("execute sql job failed, sql: " + sql + ", error: " + e.getMessage()); } } + + private String convertExecuteResult(ConnectContext ctx, String queryId) throws JobException { + if (null == ctx.getState()) { + throw new JobException("execute sql job failed, sql: " + sql + ", error: response state is null"); + } + if (null != ctx.getState().getErrorCode()) { + throw new JobException("error code: " + ctx.getState().getErrorCode() + ", error msg: " + + ctx.getState().getErrorMessage()); + } + return "queryId:" + queryId + ",affectedRows : " + ctx.getState().getAffectedRows() + ", warningRows: " + + ctx.getState().getWarningRows() + ",infoMsg" + ctx.getState().getInfoMessage(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java index 254875bb75..3bfe4ffc44 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java @@ -44,6 +44,8 @@ public class JobTask implements Writable { private Long endTimeMs; @SerializedName("successful") private Boolean isSuccessful; + @SerializedName("executeResult") + private String executeResult; @SerializedName("errorMsg") private String errorMsg; @@ -64,6 +66,11 @@ public class JobTask implements Writable { } else { row.add(isSuccessful ? "SUCCESS" : "FAILED"); } + if (null == executeResult) { + row.add("null"); + } else { + row.add(executeResult); + } if (null == errorMsg) { row.add("null"); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java index c22c89e6d5..e18a143ec7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java @@ -40,7 +40,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; @Slf4j public class JobTaskManager implements Writable { - private static final Integer TASK_MAX_NUM = Config.scheduler_job_task_max_num; + private static final Integer TASK_MAX_NUM = Config.scheduler_job_task_max_saved_count; private ConcurrentHashMap<Long, ConcurrentLinkedQueue<JobTask>> jobTaskMap = new ConcurrentHashMap<>(16); diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java index df67366430..2b3b922b37 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java @@ -321,7 +321,13 @@ public class TimerJobManager implements Closeable, Writable { * We will get the task in the next time window, and then hand it over to the time wheel for timing trigger */ private void executeJobIdsWithinLastTenMinutesWindow() { + // if the task executes for more than 10 minutes, it will be delay, so, + // set lastBatchSchedulerTimestamp to current time + if (lastBatchSchedulerTimestamp + BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS < System.currentTimeMillis()) { + this.lastBatchSchedulerTimestamp = System.currentTimeMillis(); + } if (jobMap.isEmpty()) { + this.lastBatchSchedulerTimestamp += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS; return; } jobMap.forEach((k, v) -> { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org