This is an automated email from the ASF dual-hosted git repository. kirs pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 961d2c9af55 [feat](job)Internal job cancellation immediately and the strong association with the STARTS parameter (#36805) (#38110) 961d2c9af55 is described below commit 961d2c9af55ae7e8cba30608952111975076c8c0 Author: Calvin Kirs <k...@apache.org> AuthorDate: Thu Sep 5 16:28:35 2024 +0800 [feat](job)Internal job cancellation immediately and the strong association with the STARTS parameter (#36805) (#38110) … ## Proposed changes For internal tasks, such as MTMV, the start time may already be set, or the time may be adjusted immediately. <!--Describe your changes.--> (cherry picked from commit 904a6c0fc1a804520285533de874fe4d0ffff2c1) ## Proposed changes Issue Number: close #36805 <!--Describe your changes.--> --- .../src/main/java/org/apache/doris/analysis/CreateJobStmt.java | 3 +++ .../java/org/apache/doris/job/base/JobExecutionConfiguration.java | 8 +++----- .../src/main/java/org/apache/doris/job/base/TimerDefinition.java | 8 +------- .../main/java/org/apache/doris/job/scheduler/JobScheduler.java | 4 ++++ .../org/apache/doris/job/base/JobExecutionConfigurationTest.java | 2 +- 5 files changed, 12 insertions(+), 13 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java index 367d03fa867..8a8db0a3d1e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java @@ -128,6 +128,7 @@ public class CreateJobStmt extends DdlStmt { if (null != onceJobStartTimestamp) { if (onceJobStartTimestamp.equalsIgnoreCase(CURRENT_TIMESTAMP_STRING)) { jobExecutionConfiguration.setImmediate(true); + timerDefinition.setStartTimeMs(System.currentTimeMillis()); } else { timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(onceJobStartTimestamp)); } @@ -149,6 +150,8 @@ public class CreateJobStmt extends DdlStmt { if (null != startsTimeStamp) { if (startsTimeStamp.equalsIgnoreCase(CURRENT_TIMESTAMP_STRING)) { jobExecutionConfiguration.setImmediate(true); + //To avoid immediate re-scheduling, set the start time of the timer 100ms before the current time. + timerDefinition.setStartTimeMs(System.currentTimeMillis()); } else { timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(startsTimeStamp)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java index 46bc2c71ea2..301222d5434 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java @@ -57,9 +57,7 @@ public class JobExecutionConfiguration { if (executeType == JobExecuteType.INSTANT || executeType == JobExecuteType.MANUAL) { return; } - - checkTimerDefinition(immediate); - + checkTimerDefinition(); if (executeType == JobExecuteType.ONE_TIME) { validateStartTimeMs(); return; @@ -80,12 +78,12 @@ public class JobExecutionConfiguration { } } - private void checkTimerDefinition(boolean immediate) { + private void checkTimerDefinition() { if (timerDefinition == null) { throw new IllegalArgumentException( "timerDefinition cannot be null when executeType is not instant or manual"); } - timerDefinition.checkParams(immediate); + timerDefinition.checkParams(); } private void validateStartTimeMs() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java index bcff4216c6e..9068a18f693 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java @@ -38,13 +38,7 @@ public class TimerDefinition { private Long latestSchedulerTimeMs; - public void checkParams(boolean immediate) { - if (null != startTimeMs && immediate) { - throw new IllegalArgumentException("startTimeMs must be null when immediate is true"); - } - if (null == startTimeMs && immediate) { - startTimeMs = System.currentTimeMillis(); - } + public void checkParams() { if (null == startTimeMs) { startTimeMs = System.currentTimeMillis() + intervalUnit.getIntervalMs(interval); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index 5ba88c6e3ce..33d12c30a4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -124,6 +124,10 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable { schedulerInstantJob(job, TaskType.SCHEDULED, null); } } + if (job.getJobConfig().isImmediate() && JobExecuteType.ONE_TIME.equals(job.getJobConfig().getExecuteType())) { + schedulerInstantJob(job, TaskType.SCHEDULED, null); + return; + } //RECURRING job and immediate is true if (job.getJobConfig().isImmediate()) { job.getJobConfig().getTimerDefinition().setLatestSchedulerTimeMs(System.currentTimeMillis()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java index 6d01f09c5ea..24c486baff8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java @@ -75,8 +75,8 @@ public class JobExecutionConfigurationTest { JobExecutionConfiguration configuration = new JobExecutionConfiguration(); configuration.setExecuteType(JobExecuteType.ONE_TIME); configuration.setImmediate(true); - configuration.setImmediate(true); TimerDefinition timerDefinition = new TimerDefinition(); + timerDefinition.setStartTimeMs(0L); configuration.setTimerDefinition(timerDefinition); configuration.checkParams(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org