This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d8aac1c72fd branch-3.0: [fix](job)Fix millisecond offset issue in time 
window scheduling trigger time calculation #45176 (#45352)
d8aac1c72fd is described below

commit d8aac1c72fdb7af30b4097da4ef47adfc3b95e0d
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Dec 20 09:37:07 2024 +0800

    branch-3.0: [fix](job)Fix millisecond offset issue in time window 
scheduling trigger time calculation #45176 (#45352)
    
    Cherry-picked from #45176
    
    Co-authored-by: Calvin Kirs <guoqi...@selectdb.com>
---
 .../main/java/org/apache/doris/common/util/TimeUtils.java   | 11 +++++++++++
 .../apache/doris/job/base/JobExecutionConfiguration.java    |  2 +-
 .../java/org/apache/doris/job/base/TimerDefinition.java     |  7 ++++++-
 .../java/org/apache/doris/job/scheduler/JobScheduler.java   | 13 +++++++++----
 .../doris/job/base/JobExecutionConfigurationTest.java       |  7 +++++++
 5 files changed, 34 insertions(+), 6 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java
index e7066846c30..d88971a6e72 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java
@@ -257,6 +257,17 @@ public class TimeUtils {
         return d.getTime();
     }
 
+    /**
+     * Converts a millisecond timestamp to a second-level timestamp.
+     *
+     * @param timestamp The millisecond timestamp to be converted.
+     * @return The timestamp rounded to the nearest second (in milliseconds).
+     */
+    public static long convertToSecondTimestamp(long timestamp) {
+        // Divide by 1000 to convert to seconds, then multiply by 1000 to 
return to milliseconds with no fractional part
+        return (timestamp / 1000) * 1000;
+    }
+
     public static long timeStringToLong(String timeStr, TimeZone timeZone) {
         DateTimeFormatter dateFormatTimeZone = getDatetimeFormatWithTimeZone();
         dateFormatTimeZone.withZone(timeZone.toZoneId());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
index 4c6ef4d2037..d564b114312 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
@@ -155,7 +155,7 @@ public class JobExecutionConfiguration {
             return 0L;
         }
 
-        return (startTimeMs - currentTimeMs) / 1000;
+        return (startTimeMs * 1000 / 1000 - currentTimeMs) / 1000;
     }
 
     // Returns a list of delay times in seconds for executing the job within 
the specified window
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java
index 9068a18f693..96181877b9a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.job.base;
 
+import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.job.common.IntervalUnit;
 
 import com.google.gson.annotations.SerializedName;
@@ -40,11 +41,15 @@ public class TimerDefinition {
 
     public void checkParams() {
         if (null == startTimeMs) {
-            startTimeMs = System.currentTimeMillis() + 
intervalUnit.getIntervalMs(interval);
+            long currentTimeMs = 
TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
+            startTimeMs = currentTimeMs + intervalUnit.getIntervalMs(interval);
         }
         if (null != endTimeMs && endTimeMs < startTimeMs) {
             throw new IllegalArgumentException("endTimeMs must be greater than 
the start time");
         }
+        if (null != endTimeMs) {
+            endTimeMs = TimeUtils.convertToSecondTimestamp(endTimeMs);
+        }
 
         if (null != intervalUnit) {
             if (null == interval) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
index 921f333791c..2bd6fc04dac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
@@ -84,7 +84,8 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> 
implements Closeable {
         taskDisruptorGroupManager = new TaskDisruptorGroupManager();
         taskDisruptorGroupManager.init();
         this.timerJobDisruptor = 
taskDisruptorGroupManager.getDispatchDisruptor();
-        latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis();
+        long currentTimeMs = 
TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
+        latestBatchSchedulerTimerTaskTimeMs = currentTimeMs;
         batchSchedulerTimerJob();
         cycleSystemSchedulerTasks();
     }
@@ -94,7 +95,8 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> 
implements Closeable {
      * Jobs will be re-registered after the task is completed
      */
     private void cycleSystemSchedulerTasks() {
-        log.info("re-register system scheduler timer tasks" + 
TimeUtils.longToTimeString(System.currentTimeMillis()));
+        log.info("re-register system scheduler timer tasks, time is " + 
TimeUtils
+                .longToTimeStringWithms(System.currentTimeMillis()));
         timerTaskScheduler.newTimeout(timeout -> {
             batchSchedulerTimerJob();
             cycleSystemSchedulerTasks();
@@ -144,7 +146,9 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> 
implements Closeable {
 
 
     private void cycleTimerJobScheduler(T job, long startTimeWindowMs) {
-        List<Long> delaySeconds = 
job.getJobConfig().getTriggerDelayTimes(System.currentTimeMillis(),
+        long currentTimeMs = 
TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
+        startTimeWindowMs = 
TimeUtils.convertToSecondTimestamp(startTimeWindowMs);
+        List<Long> delaySeconds = 
job.getJobConfig().getTriggerDelayTimes(currentTimeMs,
                 startTimeWindowMs, latestBatchSchedulerTimerTaskTimeMs);
         if (CollectionUtils.isEmpty(delaySeconds)) {
             log.info("skip job {} scheduler timer job, delay seconds is 
empty", job.getJobName());
@@ -190,7 +194,8 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> 
implements Closeable {
 
         long lastTimeWindowMs = latestBatchSchedulerTimerTaskTimeMs;
         if (latestBatchSchedulerTimerTaskTimeMs < System.currentTimeMillis()) {
-            this.latestBatchSchedulerTimerTaskTimeMs = 
System.currentTimeMillis();
+            long currentTimeMs = 
TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
+            this.latestBatchSchedulerTimerTaskTimeMs = currentTimeMs;
         }
         this.latestBatchSchedulerTimerTaskTimeMs += 
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
         log.info("execute timer job ids within last ten minutes window, last 
time window is {}",
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
index cce0a93c01d..163b2494189 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
@@ -75,7 +75,14 @@ public class JobExecutionConfigurationTest {
         timerDefinition.setInterval(1L);
         Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 
5 + 10L, second * 3, second * 7).size());
         Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 
5, second * 5, second * 7).size());
+        timerDefinition.setStartTimeMs(1672531200000L);
+        timerDefinition.setIntervalUnit(IntervalUnit.MINUTE);
+        timerDefinition.setInterval(1L);
+        Assertions.assertArrayEquals(new Long[]{0L}, 
configuration.getTriggerDelayTimes(1672531800000L, 1672531200000L, 
1672531800000L).toArray());
+
+        List<Long> expectDelayTimes = 
configuration.getTriggerDelayTimes(1672531200000L, 1672531200000L, 
1672531850000L);
 
+        Assertions.assertArrayEquals(new Long[]{0L, 60L, 120L, 180L, 240L, 
300L, 360L, 420L, 480L, 540L, 600L}, expectDelayTimes.toArray());
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to