This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ca55bd88ad [Fix](Job)Fix the window time is not updated when no job is 
registered (#23628)
ca55bd88ad is described below

commit ca55bd88adf3a376404b1b8b5c67b4deeba3cb54
Author: Calvin Kirs <acm_mas...@163.com>
AuthorDate: Wed Aug 30 09:48:21 2023 +0800

    [Fix](Job)Fix the window time is not updated when no job is registered 
(#23628)
    
    Fix resume job grammar definition is inconsistent
    Show Job task Add execution results
    JOB allows to define update operations
---
 .../Data-Definition-Statements/Create/CREATE-JOB.md   |  2 +-
 .../sql-reference/Show-Statements/SHOW-JOB-TASK.md    |  1 +
 .../Data-Definition-Statements/Create/CREATE-JOB.md   |  2 +-
 .../sql-reference/Show-Statements/SHOW-JOB-TASK.md    |  1 +
 .../src/main/java/org/apache/doris/common/Config.java |  2 +-
 fe/fe-core/src/main/cup/sql_parser.cup                |  2 +-
 .../java/org/apache/doris/analysis/CreateJobStmt.java |  3 ++-
 .../org/apache/doris/analysis/ShowJobTaskStmt.java    |  1 +
 .../apache/doris/scheduler/disruptor/TaskHandler.java | 10 ++++------
 .../doris/scheduler/executor/SqlJobExecutor.java      | 19 +++++++++++++++----
 .../java/org/apache/doris/scheduler/job/JobTask.java  |  7 +++++++
 .../doris/scheduler/manager/JobTaskManager.java       |  2 +-
 .../doris/scheduler/manager/TimerJobManager.java      |  6 ++++++
 13 files changed, 42 insertions(+), 16 deletions(-)

diff --git 
a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md
 
b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md
index 839fe0e72c..d89e0aad0d 100644
--- 
a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md
+++ 
b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md
@@ -98,7 +98,7 @@ The SCHEDULER statement is used to define the execution time, 
frequency and dura
        Used to specify the end time of the job, if not specified, it means 
permanent execution.
 - DO
 
-  It is used to specify the operation that needs to be performed when the job 
is triggered. Currently, all ***INSERT*** operations are supported. We will 
support more operations in the future.
+  It is used to specify the operation that needs to be performed when the job 
is triggered. Currently, all ***INSERT, UPDATE*** operations are supported. We 
will support more operations in the future.
 
 ### Example
 
diff --git 
a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md 
b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md
index c7d1f6afb0..d816059510 100644
--- a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md
+++ b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md
@@ -49,6 +49,7 @@ Result description:
                         StartTime: start execution time
                           EndTime: end time
                            Status: status
+                           Result: execution result
                            ErrMsg: error message
 ```
 
diff --git 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md
 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md
index b67e054714..c06d48a4d0 100644
--- 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md
+++ 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md
@@ -99,7 +99,7 @@ SCHEDULER 语句用于定义作业的执行时间,频率以及持续时间,
       用于指定作业的结束时间,如果没有指定,则表示永久执行。
 - DO
  
-   用于指定作业触发时需要执行的操作,目前支持所有的 ***INSERT*** 操作。后续我们会支持更多的操作。
+   用于指定作业触发时需要执行的操作,目前支持所有的 ***INSERT,UPDATE*** 操作。后续我们会支持更多的操作。
 
 ### Example
 
diff --git 
a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md 
b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md
index 3ae1e24733..d1b458ea29 100644
--- a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md
+++ b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md
@@ -50,6 +50,7 @@ SHOW JOB TASKS FOR job_name;
                        StartTime: 开始执行时间
                          EndTime: 结束时间
                           Status: 状态
+                          Result: 执行结果
                           ErrMsg: 错误信息
 ```
 
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 2815dfb0ec..a683b307da 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1554,7 +1554,7 @@ public class Config extends ConfigBase {
     public static boolean enable_pipeline_load = false;
 
     @ConfField
-    public static int scheduler_job_task_max_num = 10;
+    public static int scheduler_job_task_max_saved_count = 10;
 
     /**
      * The number of async tasks that can be queued. @See TaskDisruptor
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup 
b/fe/fe-core/src/main/cup/sql_parser.cup
index bff2a7ff9c..5f3311ab0f 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -2680,7 +2680,7 @@ create_job_stmt ::=
      :}
      ;       
 resume_job_stmt ::=
-    KW_RESUME KW_JOB job_label:jobLabel
+    KW_RESUME KW_JOB  KW_FOR job_label:jobLabel
     {:
         RESULT = new ResumeJobStmt(jobLabel);
     :}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
index a1f9b6bd82..999d630153 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
@@ -85,7 +85,8 @@ public class CreateJobStmt extends DdlStmt {
     private String timezone = TimeUtils.DEFAULT_TIME_ZONE;
 
     private static final ImmutableSet<Class<? extends DdlStmt>> 
supportStmtSuperClass
-            = new ImmutableSet.Builder<Class<? extends 
DdlStmt>>().add(InsertStmt.class).build();
+            = new ImmutableSet.Builder<Class<? extends 
DdlStmt>>().add(InsertStmt.class)
+            .add(UpdateStmt.class).build();
 
     private static HashSet<String> supportStmtClassNamesCache = new 
HashSet<>(16);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java
index 1eb0241cfe..e7b04d9ece 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java
@@ -43,6 +43,7 @@ public class ShowJobTaskStmt extends ShowStmt {
                     .add("StartTime")
                     .add("EndTime")
                     .add("Status")
+                    .add("Result")
                     .add("ErrorMsg")
                     .build();
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
index 37ed214325..297537f712 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
@@ -108,11 +108,7 @@ public class TaskHandler implements WorkHandler<TaskEvent> 
{
         JobTask jobTask = new JobTask(jobId);
         try {
             jobTask.setStartTimeMs(System.currentTimeMillis());
-
-
-            // TODO: We should record the result of the event task.
-            //Object result = job.getExecutor().execute();
-            job.getExecutor().execute(job);
+            Object result = job.getExecutor().execute(job);
             job.setLatestCompleteExecuteTimeMs(System.currentTimeMillis());
             if (job.isCycleJob()) {
                 updateJobStatusIfPastEndTime(job);
@@ -120,9 +116,11 @@ public class TaskHandler implements WorkHandler<TaskEvent> 
{
                 // one time job should be finished after execute
                 updateOnceTimeJobStatus(job);
             }
+            String resultStr = Objects.isNull(result) ? "" : result.toString();
+            jobTask.setExecuteResult(resultStr);
             jobTask.setIsSuccessful(true);
         } catch (Exception e) {
-            log.warn("Event job execute failed, jobId: {}, msg : {}", jobId, 
e.getMessage());
+            log.warn("Job execute failed, jobId: {}, msg : {}", jobId, 
e.getMessage());
             job.pause(e.getMessage());
             jobTask.setErrorMsg(e.getMessage());
             jobTask.setIsSuccessful(false);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java
index 2f2495f7f8..572e895906 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java
@@ -35,6 +35,7 @@ import java.util.UUID;
 
 /**
  * we use this executor to execute sql job
+ *
  * @param <QueryState> the state of sql job, we can record the state of sql job
  */
 @Slf4j
@@ -50,7 +51,7 @@ public class SqlJobExecutor<QueryState> implements 
JobExecutor {
     }
 
     @Override
-    public QueryState execute(Job job) throws JobException {
+    public String execute(Job job) throws JobException {
         ConnectContext ctx = new ConnectContext();
         ctx.setEnv(Env.getCurrentEnv());
         
ctx.setCluster(ClusterNamespace.getClusterNameFromFullName(job.getDbName()));
@@ -66,12 +67,22 @@ public class SqlJobExecutor<QueryState> implements 
JobExecutor {
         try {
             StmtExecutor executor = new StmtExecutor(ctx, sql);
             executor.execute(queryId);
-            log.debug("execute sql job success, sql: {}, state is: {}", sql, 
ctx.getState());
-            return (QueryState) ctx.getState();
+            return convertExecuteResult(ctx, taskIdString);
         } catch (Exception e) {
-            log.warn("execute sql job failed, sql: {}, error: {}", sql, e);
             throw new JobException("execute sql job failed, sql: " + sql + ", 
error: " + e.getMessage());
         }
 
     }
+
+    private String convertExecuteResult(ConnectContext ctx, String queryId) 
throws JobException {
+        if (null == ctx.getState()) {
+            throw new JobException("execute sql job failed, sql: " + sql + ", 
error:  response state is null");
+        }
+        if (null != ctx.getState().getErrorCode()) {
+            throw new JobException("error code: " + 
ctx.getState().getErrorCode() + ", error msg: "
+                    + ctx.getState().getErrorMessage());
+        }
+        return "queryId:" + queryId + ",affectedRows : " + 
ctx.getState().getAffectedRows() + ", warningRows: "
+                + ctx.getState().getWarningRows() + ",infoMsg" + 
ctx.getState().getInfoMessage();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java
index 254875bb75..3bfe4ffc44 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java
@@ -44,6 +44,8 @@ public class JobTask implements Writable {
     private Long endTimeMs;
     @SerializedName("successful")
     private Boolean isSuccessful;
+    @SerializedName("executeResult")
+    private String executeResult;
     @SerializedName("errorMsg")
     private String errorMsg;
 
@@ -64,6 +66,11 @@ public class JobTask implements Writable {
         } else {
             row.add(isSuccessful ? "SUCCESS" : "FAILED");
         }
+        if (null == executeResult) {
+            row.add("null");
+        } else {
+            row.add(executeResult);
+        }
         if (null == errorMsg) {
             row.add("null");
         } else {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java
index c22c89e6d5..e18a143ec7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java
@@ -40,7 +40,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 @Slf4j
 public class JobTaskManager implements Writable {
 
-    private static final Integer TASK_MAX_NUM = 
Config.scheduler_job_task_max_num;
+    private static final Integer TASK_MAX_NUM = 
Config.scheduler_job_task_max_saved_count;
 
     private ConcurrentHashMap<Long, ConcurrentLinkedQueue<JobTask>> jobTaskMap 
= new ConcurrentHashMap<>(16);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
index df67366430..2b3b922b37 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
@@ -321,7 +321,13 @@ public class TimerJobManager implements Closeable, 
Writable {
      * We will get the task in the next time window, and then hand it over to 
the time wheel for timing trigger
      */
     private void executeJobIdsWithinLastTenMinutesWindow() {
+        // if the task executes for more than 10 minutes, it will be delay, so,
+        // set lastBatchSchedulerTimestamp to current time
+        if (lastBatchSchedulerTimestamp + 
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS < System.currentTimeMillis()) {
+            this.lastBatchSchedulerTimestamp = System.currentTimeMillis();
+        }
         if (jobMap.isEmpty()) {
+            this.lastBatchSchedulerTimestamp += 
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
             return;
         }
         jobMap.forEach((k, v) -> {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to