This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new d8aac1c72fd branch-3.0: [fix](job)Fix millisecond offset issue in time window scheduling trigger time calculation #45176 (#45352) d8aac1c72fd is described below commit d8aac1c72fdb7af30b4097da4ef47adfc3b95e0d Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Fri Dec 20 09:37:07 2024 +0800 branch-3.0: [fix](job)Fix millisecond offset issue in time window scheduling trigger time calculation #45176 (#45352) Cherry-picked from #45176 Co-authored-by: Calvin Kirs <guoqi...@selectdb.com> --- .../main/java/org/apache/doris/common/util/TimeUtils.java | 11 +++++++++++ .../apache/doris/job/base/JobExecutionConfiguration.java | 2 +- .../java/org/apache/doris/job/base/TimerDefinition.java | 7 ++++++- .../java/org/apache/doris/job/scheduler/JobScheduler.java | 13 +++++++++---- .../doris/job/base/JobExecutionConfigurationTest.java | 7 +++++++ 5 files changed, 34 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java index e7066846c30..d88971a6e72 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java @@ -257,6 +257,17 @@ public class TimeUtils { return d.getTime(); } + /** + * Converts a millisecond timestamp to a second-level timestamp. + * + * @param timestamp The millisecond timestamp to be converted. + * @return The timestamp rounded to the nearest second (in milliseconds). + */ + public static long convertToSecondTimestamp(long timestamp) { + // Divide by 1000 to convert to seconds, then multiply by 1000 to return to milliseconds with no fractional part + return (timestamp / 1000) * 1000; + } + public static long timeStringToLong(String timeStr, TimeZone timeZone) { DateTimeFormatter dateFormatTimeZone = getDatetimeFormatWithTimeZone(); dateFormatTimeZone.withZone(timeZone.toZoneId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java index 4c6ef4d2037..d564b114312 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java @@ -155,7 +155,7 @@ public class JobExecutionConfiguration { return 0L; } - return (startTimeMs - currentTimeMs) / 1000; + return (startTimeMs * 1000 / 1000 - currentTimeMs) / 1000; } // Returns a list of delay times in seconds for executing the job within the specified window diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java index 9068a18f693..96181877b9a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java @@ -17,6 +17,7 @@ package org.apache.doris.job.base; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.job.common.IntervalUnit; import com.google.gson.annotations.SerializedName; @@ -40,11 +41,15 @@ public class TimerDefinition { public void checkParams() { if (null == startTimeMs) { - startTimeMs = System.currentTimeMillis() + intervalUnit.getIntervalMs(interval); + long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis()); + startTimeMs = currentTimeMs + intervalUnit.getIntervalMs(interval); } if (null != endTimeMs && endTimeMs < startTimeMs) { throw new IllegalArgumentException("endTimeMs must be greater than the start time"); } + if (null != endTimeMs) { + endTimeMs = TimeUtils.convertToSecondTimestamp(endTimeMs); + } if (null != intervalUnit) { if (null == interval) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index 921f333791c..2bd6fc04dac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -84,7 +84,8 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable { taskDisruptorGroupManager = new TaskDisruptorGroupManager(); taskDisruptorGroupManager.init(); this.timerJobDisruptor = taskDisruptorGroupManager.getDispatchDisruptor(); - latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis(); + long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis()); + latestBatchSchedulerTimerTaskTimeMs = currentTimeMs; batchSchedulerTimerJob(); cycleSystemSchedulerTasks(); } @@ -94,7 +95,8 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable { * Jobs will be re-registered after the task is completed */ private void cycleSystemSchedulerTasks() { - log.info("re-register system scheduler timer tasks" + TimeUtils.longToTimeString(System.currentTimeMillis())); + log.info("re-register system scheduler timer tasks, time is " + TimeUtils + .longToTimeStringWithms(System.currentTimeMillis())); timerTaskScheduler.newTimeout(timeout -> { batchSchedulerTimerJob(); cycleSystemSchedulerTasks(); @@ -144,7 +146,9 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable { private void cycleTimerJobScheduler(T job, long startTimeWindowMs) { - List<Long> delaySeconds = job.getJobConfig().getTriggerDelayTimes(System.currentTimeMillis(), + long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis()); + startTimeWindowMs = TimeUtils.convertToSecondTimestamp(startTimeWindowMs); + List<Long> delaySeconds = job.getJobConfig().getTriggerDelayTimes(currentTimeMs, startTimeWindowMs, latestBatchSchedulerTimerTaskTimeMs); if (CollectionUtils.isEmpty(delaySeconds)) { log.info("skip job {} scheduler timer job, delay seconds is empty", job.getJobName()); @@ -190,7 +194,8 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable { long lastTimeWindowMs = latestBatchSchedulerTimerTaskTimeMs; if (latestBatchSchedulerTimerTaskTimeMs < System.currentTimeMillis()) { - this.latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis(); + long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis()); + this.latestBatchSchedulerTimerTaskTimeMs = currentTimeMs; } this.latestBatchSchedulerTimerTaskTimeMs += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS; log.info("execute timer job ids within last ten minutes window, last time window is {}", diff --git a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java index cce0a93c01d..163b2494189 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java @@ -75,7 +75,14 @@ public class JobExecutionConfigurationTest { timerDefinition.setInterval(1L); Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 5 + 10L, second * 3, second * 7).size()); Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 5, second * 5, second * 7).size()); + timerDefinition.setStartTimeMs(1672531200000L); + timerDefinition.setIntervalUnit(IntervalUnit.MINUTE); + timerDefinition.setInterval(1L); + Assertions.assertArrayEquals(new Long[]{0L}, configuration.getTriggerDelayTimes(1672531800000L, 1672531200000L, 1672531800000L).toArray()); + + List<Long> expectDelayTimes = configuration.getTriggerDelayTimes(1672531200000L, 1672531200000L, 1672531850000L); + Assertions.assertArrayEquals(new Long[]{0L, 60L, 120L, 180L, 240L, 300L, 360L, 420L, 480L, 540L, 600L}, expectDelayTimes.toArray()); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org