This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b6dea0afdb7 [Feature](Job)STARTS and AT allow setting 
current_timestamp (#30593)
b6dea0afdb7 is described below

commit b6dea0afdb7af818be57bc6ea44164a18844c02a
Author: Calvin Kirs <k...@apache.org>
AuthorDate: Fri Feb 2 14:10:41 2024 +0800

    [Feature](Job)STARTS and AT allow setting current_timestamp (#30593)
---
 fe/fe-core/src/main/cup/sql_parser.cup             | 19 ++++++++++++++++--
 .../org/apache/doris/analysis/CreateJobStmt.java   | 15 +++++++++++---
 .../doris/job/base/JobExecutionConfiguration.java  |  4 +++-
 .../suites/job_p0/test_base_insert_job.groovy      | 23 ++++++++++++----------
 4 files changed, 45 insertions(+), 16 deletions(-)

diff --git a/fe/fe-core/src/main/cup/sql_parser.cup 
b/fe/fe-core/src/main/cup/sql_parser.cup
index ea0bfac3d51..708986bcbce 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -834,6 +834,7 @@ nonterminal ArrayList<String> opt_common_hints;
 nonterminal String optional_on_ident;
 nonterminal String opt_job_starts;
 nonterminal String opt_job_ends;
+nonterminal String job_at_time;
 nonterminal ColocateGroupName colocate_group_name;
 
 nonterminal LoadTask.MergeType opt_merge_type, opt_with_merge_type;
@@ -2596,7 +2597,7 @@ create_job_stmt ::=
         CreateJobStmt stmt = new 
CreateJobStmt(jobLabel,org.apache.doris.job.base.JobExecuteType.STREAMING,atTime,null,null,null,null,comment,executeSql);
         RESULT = stmt;
     :}   */ 
-    | KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULE KW_AT 
STRING_LITERAL:atTime opt_comment:comment KW_DO stmt:executeSql  
+    | KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULE job_at_time:atTime 
opt_comment:comment KW_DO stmt:executeSql  
     {:
         CreateJobStmt stmt = new 
CreateJobStmt(jobLabel,org.apache.doris.job.base.JobExecuteType.ONE_TIME,atTime,null,null,null,null,comment,executeSql);
         RESULT = stmt;
@@ -2610,8 +2611,22 @@ create_job_stmt ::=
      {: 
         RESULT = startTime;    
      :}
+     |KW_STARTS KW_CURRENT_TIMESTAMP
+     {: 
+        RESULT = CreateJobStmt.CURRENT_TIMESTAMP_STRING;
+     :}   
+     ;
+     
+ job_at_time ::= 
+     | KW_AT STRING_LITERAL:atTime 
+     {: 
+        RESULT = atTime;    
+     :}
+     |KW_AT KW_CURRENT_TIMESTAMP
+     {: 
+        RESULT = CreateJobStmt.CURRENT_TIMESTAMP_STRING;
+     :}   
      ;
- 
  opt_job_ends ::= 
      {:
          RESULT = null;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
index 17e2185bf5e..d8618caae84 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
@@ -82,6 +82,8 @@ public class CreateJobStmt extends DdlStmt {
     private final String endsTimeStamp;
 
     private final String comment;
+
+    public static final String CURRENT_TIMESTAMP_STRING = "current_timestamp";
     private JobExecuteType executeType;
 
     // exclude job name prefix, which is used by inner job
@@ -122,7 +124,11 @@ public class CreateJobStmt extends DdlStmt {
         TimerDefinition timerDefinition = new TimerDefinition();
 
         if (null != onceJobStartTimestamp) {
-            
timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(onceJobStartTimestamp));
+            if 
(onceJobStartTimestamp.equalsIgnoreCase(CURRENT_TIMESTAMP_STRING)) {
+                jobExecutionConfiguration.setImmediate(true);
+            } else {
+                
timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(onceJobStartTimestamp));
+            }
         }
         if (null != interval) {
             timerDefinition.setInterval(interval);
@@ -139,7 +145,11 @@ public class CreateJobStmt extends DdlStmt {
             timerDefinition.setIntervalUnit(intervalUnit);
         }
         if (null != startsTimeStamp) {
-            
timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(startsTimeStamp));
+            if (startsTimeStamp.equalsIgnoreCase(CURRENT_TIMESTAMP_STRING)) {
+                jobExecutionConfiguration.setImmediate(true);
+            } else {
+                
timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(startsTimeStamp));
+            }
         }
         if (null != endsTimeStamp) {
             
timerDefinition.setEndTimeMs(TimeUtils.timeStringToLong(endsTimeStamp));
@@ -158,7 +168,6 @@ public class CreateJobStmt extends DdlStmt {
                 jobExecutionConfiguration,
                 System.currentTimeMillis(),
                 executeSql);
-        //job.checkJobParams();
         jobInstance = job;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
index 0b44073464f..553db9e966f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
@@ -137,8 +137,10 @@ public class JobExecutionConfiguration {
             long jobStartTimeMs = timerDefinition.getStartTimeMs();
             if (isImmediate()) {
                 jobStartTimeMs += intervalValue;
+                if (jobStartTimeMs > endTimeMs) {
+                    return delayTimeSeconds;
+                }
             }
-
             return getExecutionDelaySeconds(startTimeMs, endTimeMs, 
jobStartTimeMs,
                     intervalValue, currentTimeMs);
         }
diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy 
b/regression-test/suites/job_p0/test_base_insert_job.groovy
index d9ebb832152..6ba917bd576 100644
--- a/regression-test/suites/job_p0/test_base_insert_job.groovy
+++ b/regression-test/suites/job_p0/test_base_insert_job.groovy
@@ -111,16 +111,10 @@ suite("test_base_insert_job") {
             "replication_allocation" = "tag.location.default: 1"
         );
         """
-    // Enlarge this parameter to avoid other factors that cause time 
verification to fail when submitting.
-    def currentMs = System.currentTimeMillis() + 20000;
-    def dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentMs), 
ZoneId.systemDefault());
-
-    def formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
-    def startTime = dateTime.format(formatter);
     def dataCount = sql """select count(*) from ${tableName}"""
     assert dataCount.get(0).get(0) == 0
     sql """
-          CREATE JOB ${jobName}  ON SCHEDULE at '${startTime}'   comment 'test 
for test&68686781jbjbhj//ncsa' DO insert into ${tableName}  values  
('2023-07-19', sleep(10000), 1001);
+          CREATE JOB ${jobName}  ON SCHEDULE at current_timestamp   comment 
'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName}  values  
('2023-07-19', sleep(10000), 1001);
      """
 
     Thread.sleep(25000)
@@ -153,9 +147,18 @@ suite("test_base_insert_job") {
     //assert comment
     assert oncejob.get(0).get(1) == "test for test&68686781jbjbhj//ncsa"
     sql """
-        DROP JOB IF EXISTS where jobname =  '${jobName}'
+        DROP JOB IF EXISTS where jobname =  'press'
     """
 
+    sql """
+          CREATE JOB press  ON SCHEDULE every 10 hour starts CURRENT_TIMESTAMP 
 comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName}  
values  ('2023-07-19', 99, 99);
+     """
+    Thread.sleep(2500)
+    def recurringTableDatas = sql """ select count(1) from ${tableName} where 
user_id=99 and type=99 """
+    assert recurringTableDatas.get(0).get(0) == 1
+    sql """
+        DROP JOB IF EXISTS where jobname =  '${jobName}'
+    """
     sql """
           CREATE JOB ${jobName}  ON SCHEDULE every 1 second   comment 'test 
for test&68686781jbjbhj//ncsa' DO insert into ${tableName}  values  
('2023-07-19', sleep(10000), 1001);
      """
@@ -198,7 +201,7 @@ suite("test_base_insert_job") {
     // assert not support stmt
     try {
         sql """
-            CREATE JOB ${jobName}  ON SCHEDULE at '${startTime}'   comment 
'test' DO update ${tableName} set type=2 where type=1;
+            CREATE JOB ${jobName}  ON SCHEDULE at current_timestamp   comment 
'test' DO update ${tableName} set type=2 where type=1;
         """
     } catch (Exception e) {
         assert e.getMessage().contains("Not support UpdateStmt type in job")
@@ -206,7 +209,7 @@ suite("test_base_insert_job") {
     // assert start time greater than current time
     try {
         sql """
-            CREATE JOB ${jobName}  ON SCHEDULE at '${startTime}'   comment 
'test' DO insert into ${tableName} (timestamp, type, user_id) values 
('2023-03-18','1','12213');
+            CREATE JOB ${jobName}  ON SCHEDULE at '2023-11-13 14:18:07'   
comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values 
('2023-03-18','1','12213');
         """
     } catch (Exception e) {
         assert e.getMessage().contains("startTimeMs must be greater than 
current time")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to