This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 961d2c9af55 [feat](job)Internal job cancellation immediately and the 
strong association with the STARTS parameter (#36805) (#38110)
961d2c9af55 is described below

commit 961d2c9af55ae7e8cba30608952111975076c8c0
Author: Calvin Kirs <k...@apache.org>
AuthorDate: Thu Sep 5 16:28:35 2024 +0800

    [feat](job)Internal job cancellation immediately and the strong association 
with the STARTS parameter (#36805) (#38110)
    
    …
    
    ## Proposed changes
    
    For internal tasks, such as MTMV, the start time may already be set, or
    the time may be adjusted immediately.
    
    <!--Describe your changes.-->
    
    (cherry picked from commit 904a6c0fc1a804520285533de874fe4d0ffff2c1)
    
    ## Proposed changes
    
    Issue Number: close #36805
    
    <!--Describe your changes.-->
---
 .../src/main/java/org/apache/doris/analysis/CreateJobStmt.java    | 3 +++
 .../java/org/apache/doris/job/base/JobExecutionConfiguration.java | 8 +++-----
 .../src/main/java/org/apache/doris/job/base/TimerDefinition.java  | 8 +-------
 .../main/java/org/apache/doris/job/scheduler/JobScheduler.java    | 4 ++++
 .../org/apache/doris/job/base/JobExecutionConfigurationTest.java  | 2 +-
 5 files changed, 12 insertions(+), 13 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
index 367d03fa867..8a8db0a3d1e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
@@ -128,6 +128,7 @@ public class CreateJobStmt extends DdlStmt {
         if (null != onceJobStartTimestamp) {
             if 
(onceJobStartTimestamp.equalsIgnoreCase(CURRENT_TIMESTAMP_STRING)) {
                 jobExecutionConfiguration.setImmediate(true);
+                timerDefinition.setStartTimeMs(System.currentTimeMillis());
             } else {
                 
timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(onceJobStartTimestamp));
             }
@@ -149,6 +150,8 @@ public class CreateJobStmt extends DdlStmt {
         if (null != startsTimeStamp) {
             if (startsTimeStamp.equalsIgnoreCase(CURRENT_TIMESTAMP_STRING)) {
                 jobExecutionConfiguration.setImmediate(true);
+                //To avoid immediate re-scheduling, set the start time of the 
timer 100ms before the current time.
+                timerDefinition.setStartTimeMs(System.currentTimeMillis());
             } else {
                 
timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(startsTimeStamp));
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
index 46bc2c71ea2..301222d5434 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
@@ -57,9 +57,7 @@ public class JobExecutionConfiguration {
         if (executeType == JobExecuteType.INSTANT || executeType == 
JobExecuteType.MANUAL) {
             return;
         }
-
-        checkTimerDefinition(immediate);
-
+        checkTimerDefinition();
         if (executeType == JobExecuteType.ONE_TIME) {
             validateStartTimeMs();
             return;
@@ -80,12 +78,12 @@ public class JobExecutionConfiguration {
         }
     }
 
-    private void checkTimerDefinition(boolean immediate) {
+    private void checkTimerDefinition() {
         if (timerDefinition == null) {
             throw new IllegalArgumentException(
                     "timerDefinition cannot be null when executeType is not 
instant or manual");
         }
-        timerDefinition.checkParams(immediate);
+        timerDefinition.checkParams();
     }
 
     private void validateStartTimeMs() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java
index bcff4216c6e..9068a18f693 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java
@@ -38,13 +38,7 @@ public class TimerDefinition {
     private Long latestSchedulerTimeMs;
 
 
-    public void checkParams(boolean immediate) {
-        if (null != startTimeMs && immediate) {
-            throw new IllegalArgumentException("startTimeMs must be null when 
immediate is true");
-        }
-        if (null == startTimeMs && immediate) {
-            startTimeMs = System.currentTimeMillis();
-        }
+    public void checkParams() {
         if (null == startTimeMs) {
             startTimeMs = System.currentTimeMillis() + 
intervalUnit.getIntervalMs(interval);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
index 5ba88c6e3ce..33d12c30a4b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
@@ -124,6 +124,10 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> 
implements Closeable {
                 schedulerInstantJob(job, TaskType.SCHEDULED, null);
             }
         }
+        if (job.getJobConfig().isImmediate() && 
JobExecuteType.ONE_TIME.equals(job.getJobConfig().getExecuteType())) {
+            schedulerInstantJob(job, TaskType.SCHEDULED, null);
+            return;
+        }
         //RECURRING job and  immediate is true
         if (job.getJobConfig().isImmediate()) {
             
job.getJobConfig().getTimerDefinition().setLatestSchedulerTimeMs(System.currentTimeMillis());
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
index 6d01f09c5ea..24c486baff8 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
@@ -75,8 +75,8 @@ public class JobExecutionConfigurationTest {
         JobExecutionConfiguration configuration = new 
JobExecutionConfiguration();
         configuration.setExecuteType(JobExecuteType.ONE_TIME);
         configuration.setImmediate(true);
-        configuration.setImmediate(true);
         TimerDefinition timerDefinition = new TimerDefinition();
+        timerDefinition.setStartTimeMs(0L);
         configuration.setTimerDefinition(timerDefinition);
         configuration.checkParams();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to