This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ca55bd88ad [Fix](Job)Fix the window time is not updated when no job is
registered (#23628)
ca55bd88ad is described below
commit ca55bd88adf3a376404b1b8b5c67b4deeba3cb54
Author: Calvin Kirs <[email protected]>
AuthorDate: Wed Aug 30 09:48:21 2023 +0800
[Fix](Job)Fix the window time is not updated when no job is registered
(#23628)
Fix resume job grammar definition is inconsistent
Show Job task Add execution results
JOB allows to define update operations
---
.../Data-Definition-Statements/Create/CREATE-JOB.md | 2 +-
.../sql-reference/Show-Statements/SHOW-JOB-TASK.md | 1 +
.../Data-Definition-Statements/Create/CREATE-JOB.md | 2 +-
.../sql-reference/Show-Statements/SHOW-JOB-TASK.md | 1 +
.../src/main/java/org/apache/doris/common/Config.java | 2 +-
fe/fe-core/src/main/cup/sql_parser.cup | 2 +-
.../java/org/apache/doris/analysis/CreateJobStmt.java | 3 ++-
.../org/apache/doris/analysis/ShowJobTaskStmt.java | 1 +
.../apache/doris/scheduler/disruptor/TaskHandler.java | 10 ++++------
.../doris/scheduler/executor/SqlJobExecutor.java | 19 +++++++++++++++----
.../java/org/apache/doris/scheduler/job/JobTask.java | 7 +++++++
.../doris/scheduler/manager/JobTaskManager.java | 2 +-
.../doris/scheduler/manager/TimerJobManager.java | 6 ++++++
13 files changed, 42 insertions(+), 16 deletions(-)
diff --git
a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md
b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md
index 839fe0e72c..d89e0aad0d 100644
---
a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md
+++
b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md
@@ -98,7 +98,7 @@ The SCHEDULER statement is used to define the execution time,
frequency and dura
Used to specify the end time of the job, if not specified, it means
permanent execution.
- DO
- It is used to specify the operation that needs to be performed when the job
is triggered. Currently, all ***INSERT*** operations are supported. We will
support more operations in the future.
+ It is used to specify the operation that needs to be performed when the job
is triggered. Currently, all ***INSERT, UPDATE*** operations are supported. We
will support more operations in the future.
### Example
diff --git
a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md
b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md
index c7d1f6afb0..d816059510 100644
--- a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md
+++ b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md
@@ -49,6 +49,7 @@ Result description:
StartTime: start execution time
EndTime: end time
Status: status
+ Result: execution result
ErrMsg: error message
```
diff --git
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md
index b67e054714..c06d48a4d0 100644
---
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md
+++
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md
@@ -99,7 +99,7 @@ SCHEDULER 语句用于定义作业的执行时间,频率以及持续时间,
用于指定作业的结束时间,如果没有指定,则表示永久执行。
- DO
- 用于指定作业触发时需要执行的操作,目前支持所有的 ***INSERT*** 操作。后续我们会支持更多的操作。
+ 用于指定作业触发时需要执行的操作,目前支持所有的 ***INSERT,UPDATE*** 操作。后续我们会支持更多的操作。
### Example
diff --git
a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md
b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md
index 3ae1e24733..d1b458ea29 100644
--- a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md
+++ b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md
@@ -50,6 +50,7 @@ SHOW JOB TASKS FOR job_name;
StartTime: 开始执行时间
EndTime: 结束时间
Status: 状态
+ Result: 执行结果
ErrMsg: 错误信息
```
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 2815dfb0ec..a683b307da 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1554,7 +1554,7 @@ public class Config extends ConfigBase {
public static boolean enable_pipeline_load = false;
@ConfField
- public static int scheduler_job_task_max_num = 10;
+ public static int scheduler_job_task_max_saved_count = 10;
/**
* The number of async tasks that can be queued. @See TaskDisruptor
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup
b/fe/fe-core/src/main/cup/sql_parser.cup
index bff2a7ff9c..5f3311ab0f 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -2680,7 +2680,7 @@ create_job_stmt ::=
:}
;
resume_job_stmt ::=
- KW_RESUME KW_JOB job_label:jobLabel
+ KW_RESUME KW_JOB KW_FOR job_label:jobLabel
{:
RESULT = new ResumeJobStmt(jobLabel);
:}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
index a1f9b6bd82..999d630153 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
@@ -85,7 +85,8 @@ public class CreateJobStmt extends DdlStmt {
private String timezone = TimeUtils.DEFAULT_TIME_ZONE;
private static final ImmutableSet<Class<? extends DdlStmt>>
supportStmtSuperClass
- = new ImmutableSet.Builder<Class<? extends
DdlStmt>>().add(InsertStmt.class).build();
+ = new ImmutableSet.Builder<Class<? extends
DdlStmt>>().add(InsertStmt.class)
+ .add(UpdateStmt.class).build();
private static HashSet<String> supportStmtClassNamesCache = new
HashSet<>(16);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java
index 1eb0241cfe..e7b04d9ece 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java
@@ -43,6 +43,7 @@ public class ShowJobTaskStmt extends ShowStmt {
.add("StartTime")
.add("EndTime")
.add("Status")
+ .add("Result")
.add("ErrorMsg")
.build();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
index 37ed214325..297537f712 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
@@ -108,11 +108,7 @@ public class TaskHandler implements WorkHandler<TaskEvent>
{
JobTask jobTask = new JobTask(jobId);
try {
jobTask.setStartTimeMs(System.currentTimeMillis());
-
-
- // TODO: We should record the result of the event task.
- //Object result = job.getExecutor().execute();
- job.getExecutor().execute(job);
+ Object result = job.getExecutor().execute(job);
job.setLatestCompleteExecuteTimeMs(System.currentTimeMillis());
if (job.isCycleJob()) {
updateJobStatusIfPastEndTime(job);
@@ -120,9 +116,11 @@ public class TaskHandler implements WorkHandler<TaskEvent>
{
// one time job should be finished after execute
updateOnceTimeJobStatus(job);
}
+ String resultStr = Objects.isNull(result) ? "" : result.toString();
+ jobTask.setExecuteResult(resultStr);
jobTask.setIsSuccessful(true);
} catch (Exception e) {
- log.warn("Event job execute failed, jobId: {}, msg : {}", jobId,
e.getMessage());
+ log.warn("Job execute failed, jobId: {}, msg : {}", jobId,
e.getMessage());
job.pause(e.getMessage());
jobTask.setErrorMsg(e.getMessage());
jobTask.setIsSuccessful(false);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java
index 2f2495f7f8..572e895906 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java
@@ -35,6 +35,7 @@ import java.util.UUID;
/**
* we use this executor to execute sql job
+ *
* @param <QueryState> the state of sql job, we can record the state of sql job
*/
@Slf4j
@@ -50,7 +51,7 @@ public class SqlJobExecutor<QueryState> implements
JobExecutor {
}
@Override
- public QueryState execute(Job job) throws JobException {
+ public String execute(Job job) throws JobException {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setCluster(ClusterNamespace.getClusterNameFromFullName(job.getDbName()));
@@ -66,12 +67,22 @@ public class SqlJobExecutor<QueryState> implements
JobExecutor {
try {
StmtExecutor executor = new StmtExecutor(ctx, sql);
executor.execute(queryId);
- log.debug("execute sql job success, sql: {}, state is: {}", sql,
ctx.getState());
- return (QueryState) ctx.getState();
+ return convertExecuteResult(ctx, taskIdString);
} catch (Exception e) {
- log.warn("execute sql job failed, sql: {}, error: {}", sql, e);
throw new JobException("execute sql job failed, sql: " + sql + ",
error: " + e.getMessage());
}
}
+
+ private String convertExecuteResult(ConnectContext ctx, String queryId)
throws JobException {
+ if (null == ctx.getState()) {
+ throw new JobException("execute sql job failed, sql: " + sql + ",
error: response state is null");
+ }
+ if (null != ctx.getState().getErrorCode()) {
+ throw new JobException("error code: " +
ctx.getState().getErrorCode() + ", error msg: "
+ + ctx.getState().getErrorMessage());
+ }
+ return "queryId:" + queryId + ",affectedRows : " +
ctx.getState().getAffectedRows() + ", warningRows: "
+ + ctx.getState().getWarningRows() + ",infoMsg" +
ctx.getState().getInfoMessage();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java
index 254875bb75..3bfe4ffc44 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java
@@ -44,6 +44,8 @@ public class JobTask implements Writable {
private Long endTimeMs;
@SerializedName("successful")
private Boolean isSuccessful;
+ @SerializedName("executeResult")
+ private String executeResult;
@SerializedName("errorMsg")
private String errorMsg;
@@ -64,6 +66,11 @@ public class JobTask implements Writable {
} else {
row.add(isSuccessful ? "SUCCESS" : "FAILED");
}
+ if (null == executeResult) {
+ row.add("null");
+ } else {
+ row.add(executeResult);
+ }
if (null == errorMsg) {
row.add("null");
} else {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java
index c22c89e6d5..e18a143ec7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java
@@ -40,7 +40,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
@Slf4j
public class JobTaskManager implements Writable {
- private static final Integer TASK_MAX_NUM =
Config.scheduler_job_task_max_num;
+ private static final Integer TASK_MAX_NUM =
Config.scheduler_job_task_max_saved_count;
private ConcurrentHashMap<Long, ConcurrentLinkedQueue<JobTask>> jobTaskMap
= new ConcurrentHashMap<>(16);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
index df67366430..2b3b922b37 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
@@ -321,7 +321,13 @@ public class TimerJobManager implements Closeable,
Writable {
* We will get the task in the next time window, and then hand it over to
the time wheel for timing trigger
*/
private void executeJobIdsWithinLastTenMinutesWindow() {
+ // if the task executes for more than 10 minutes, it will be delay, so,
+ // set lastBatchSchedulerTimestamp to current time
+ if (lastBatchSchedulerTimestamp +
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS < System.currentTimeMillis()) {
+ this.lastBatchSchedulerTimestamp = System.currentTimeMillis();
+ }
if (jobMap.isEmpty()) {
+ this.lastBatchSchedulerTimestamp +=
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
return;
}
jobMap.forEach((k, v) -> {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]