This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b6dea0afdb7 [Feature](Job)STARTS and AT allow setting current_timestamp (#30593) b6dea0afdb7 is described below commit b6dea0afdb7af818be57bc6ea44164a18844c02a Author: Calvin Kirs <k...@apache.org> AuthorDate: Fri Feb 2 14:10:41 2024 +0800 [Feature](Job)STARTS and AT allow setting current_timestamp (#30593) --- fe/fe-core/src/main/cup/sql_parser.cup | 19 ++++++++++++++++-- .../org/apache/doris/analysis/CreateJobStmt.java | 15 +++++++++++--- .../doris/job/base/JobExecutionConfiguration.java | 4 +++- .../suites/job_p0/test_base_insert_job.groovy | 23 ++++++++++++---------- 4 files changed, 45 insertions(+), 16 deletions(-) diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index ea0bfac3d51..708986bcbce 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -834,6 +834,7 @@ nonterminal ArrayList<String> opt_common_hints; nonterminal String optional_on_ident; nonterminal String opt_job_starts; nonterminal String opt_job_ends; +nonterminal String job_at_time; nonterminal ColocateGroupName colocate_group_name; nonterminal LoadTask.MergeType opt_merge_type, opt_with_merge_type; @@ -2596,7 +2597,7 @@ create_job_stmt ::= CreateJobStmt stmt = new CreateJobStmt(jobLabel,org.apache.doris.job.base.JobExecuteType.STREAMING,atTime,null,null,null,null,comment,executeSql); RESULT = stmt; :} */ - | KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULE KW_AT STRING_LITERAL:atTime opt_comment:comment KW_DO stmt:executeSql + | KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULE job_at_time:atTime opt_comment:comment KW_DO stmt:executeSql {: CreateJobStmt stmt = new CreateJobStmt(jobLabel,org.apache.doris.job.base.JobExecuteType.ONE_TIME,atTime,null,null,null,null,comment,executeSql); RESULT = stmt; @@ -2610,8 +2611,22 @@ create_job_stmt ::= {: RESULT = startTime; :} + |KW_STARTS KW_CURRENT_TIMESTAMP + {: + RESULT = CreateJobStmt.CURRENT_TIMESTAMP_STRING; + :} + ; + + job_at_time ::= + | KW_AT STRING_LITERAL:atTime + {: + RESULT = atTime; + :} + |KW_AT KW_CURRENT_TIMESTAMP + {: + RESULT = CreateJobStmt.CURRENT_TIMESTAMP_STRING; + :} ; - opt_job_ends ::= {: RESULT = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java index 17e2185bf5e..d8618caae84 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java @@ -82,6 +82,8 @@ public class CreateJobStmt extends DdlStmt { private final String endsTimeStamp; private final String comment; + + public static final String CURRENT_TIMESTAMP_STRING = "current_timestamp"; private JobExecuteType executeType; // exclude job name prefix, which is used by inner job @@ -122,7 +124,11 @@ public class CreateJobStmt extends DdlStmt { TimerDefinition timerDefinition = new TimerDefinition(); if (null != onceJobStartTimestamp) { - timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(onceJobStartTimestamp)); + if (onceJobStartTimestamp.equalsIgnoreCase(CURRENT_TIMESTAMP_STRING)) { + jobExecutionConfiguration.setImmediate(true); + } else { + timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(onceJobStartTimestamp)); + } } if (null != interval) { timerDefinition.setInterval(interval); @@ -139,7 +145,11 @@ public class CreateJobStmt extends DdlStmt { timerDefinition.setIntervalUnit(intervalUnit); } if (null != startsTimeStamp) { - timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(startsTimeStamp)); + if (startsTimeStamp.equalsIgnoreCase(CURRENT_TIMESTAMP_STRING)) { + jobExecutionConfiguration.setImmediate(true); + } else { + timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(startsTimeStamp)); + } } if (null != endsTimeStamp) { timerDefinition.setEndTimeMs(TimeUtils.timeStringToLong(endsTimeStamp)); @@ -158,7 +168,6 @@ public class CreateJobStmt extends DdlStmt { jobExecutionConfiguration, System.currentTimeMillis(), executeSql); - //job.checkJobParams(); jobInstance = job; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java index 0b44073464f..553db9e966f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java @@ -137,8 +137,10 @@ public class JobExecutionConfiguration { long jobStartTimeMs = timerDefinition.getStartTimeMs(); if (isImmediate()) { jobStartTimeMs += intervalValue; + if (jobStartTimeMs > endTimeMs) { + return delayTimeSeconds; + } } - return getExecutionDelaySeconds(startTimeMs, endTimeMs, jobStartTimeMs, intervalValue, currentTimeMs); } diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy b/regression-test/suites/job_p0/test_base_insert_job.groovy index d9ebb832152..6ba917bd576 100644 --- a/regression-test/suites/job_p0/test_base_insert_job.groovy +++ b/regression-test/suites/job_p0/test_base_insert_job.groovy @@ -111,16 +111,10 @@ suite("test_base_insert_job") { "replication_allocation" = "tag.location.default: 1" ); """ - // Enlarge this parameter to avoid other factors that cause time verification to fail when submitting. - def currentMs = System.currentTimeMillis() + 20000; - def dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentMs), ZoneId.systemDefault()); - - def formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - def startTime = dateTime.format(formatter); def dataCount = sql """select count(*) from ${tableName}""" assert dataCount.get(0).get(0) == 0 sql """ - CREATE JOB ${jobName} ON SCHEDULE at '${startTime}' comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values ('2023-07-19', sleep(10000), 1001); + CREATE JOB ${jobName} ON SCHEDULE at current_timestamp comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values ('2023-07-19', sleep(10000), 1001); """ Thread.sleep(25000) @@ -153,9 +147,18 @@ suite("test_base_insert_job") { //assert comment assert oncejob.get(0).get(1) == "test for test&68686781jbjbhj//ncsa" sql """ - DROP JOB IF EXISTS where jobname = '${jobName}' + DROP JOB IF EXISTS where jobname = 'press' """ + sql """ + CREATE JOB press ON SCHEDULE every 10 hour starts CURRENT_TIMESTAMP comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values ('2023-07-19', 99, 99); + """ + Thread.sleep(2500) + def recurringTableDatas = sql """ select count(1) from ${tableName} where user_id=99 and type=99 """ + assert recurringTableDatas.get(0).get(0) == 1 + sql """ + DROP JOB IF EXISTS where jobname = '${jobName}' + """ sql """ CREATE JOB ${jobName} ON SCHEDULE every 1 second comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values ('2023-07-19', sleep(10000), 1001); """ @@ -198,7 +201,7 @@ suite("test_base_insert_job") { // assert not support stmt try { sql """ - CREATE JOB ${jobName} ON SCHEDULE at '${startTime}' comment 'test' DO update ${tableName} set type=2 where type=1; + CREATE JOB ${jobName} ON SCHEDULE at current_timestamp comment 'test' DO update ${tableName} set type=2 where type=1; """ } catch (Exception e) { assert e.getMessage().contains("Not support UpdateStmt type in job") @@ -206,7 +209,7 @@ suite("test_base_insert_job") { // assert start time greater than current time try { sql """ - CREATE JOB ${jobName} ON SCHEDULE at '${startTime}' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); + CREATE JOB ${jobName} ON SCHEDULE at '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); """ } catch (Exception e) { assert e.getMessage().contains("startTimeMs must be greater than current time") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org