This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new c4d0e1e6935 branch-2.1: [fix](job scheduler) specifies both startTime and immediate, it will trigger one fewer task execution #50624 (#50897) c4d0e1e6935 is described below commit c4d0e1e6935f3a83a8ee4d9d254c19b7f8b09a2b Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Wed May 14 22:59:48 2025 +0800 branch-2.1: [fix](job scheduler) specifies both startTime and immediate, it will trigger one fewer task execution #50624 (#50897) Cherry-picked from #50624 Co-authored-by: zhangdong <zhangd...@selectdb.com> --- .../doris/job/base/JobExecutionConfiguration.java | 10 +-- .../job/base/JobExecutionConfigurationTest.java | 11 ++- .../data/mtmv_p0/test_immediate_starttime_mtmv.out | Bin 0 -> 134 bytes .../mtmv_p0/test_immediate_starttime_mtmv.groovy | 82 +++++++++++++++++++++ 4 files changed, 95 insertions(+), 8 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java index 80e8b0cf5e3..629c88c19b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java @@ -136,12 +136,6 @@ public class JobExecutionConfiguration { } long intervalValue = timerDefinition.getIntervalUnit().getIntervalMs(timerDefinition.getInterval()); long jobStartTimeMs = timerDefinition.getStartTimeMs(); - if (isImmediate()) { - jobStartTimeMs += intervalValue; - if (jobStartTimeMs > endTimeMs) { - return delayTimeSeconds; - } - } return getExecutionDelaySeconds(startTimeMs, endTimeMs, jobStartTimeMs, intervalValue, currentTimeMs); } @@ -171,6 +165,10 @@ public class JobExecutionConfiguration { long firstTriggerTime = windowStartTimeMs + (intervalMs - ((windowStartTimeMs - startTimeMs) % intervalMs)) % intervalMs; + // should filter result which smaller than start time + if (firstTriggerTime < startTimeMs) { + firstTriggerTime = startTimeMs; + } if (firstTriggerTime < currentTimeMs) { // Calculate how many intervals to add to get the largest trigger time < currentTimeMs long intervalsToAdd = (currentTimeMs - firstTriggerTime) / intervalMs; diff --git a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java index fb0600b281f..8196c000959 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java @@ -53,13 +53,20 @@ public class JobExecutionConfigurationTest { configuration.setExecuteType(JobExecuteType.RECURRING); TimerDefinition timerDefinition = new TimerDefinition(); - timerDefinition.setStartTimeMs(100000L); // Start time set to 1 second in the future - timerDefinition.setInterval(10L); // Interval set to 10 milliseconds + timerDefinition.setStartTimeMs(700000L); // Start time set to 700 second in the future + timerDefinition.setInterval(10L); // Interval set to 10 minute timerDefinition.setIntervalUnit(IntervalUnit.MINUTE); configuration.setTimerDefinition(timerDefinition); List<Long> delayTimes = configuration.getTriggerDelayTimes( 0L, 0L, 1100000L); + // test should filter result which smaller than start time + Assertions.assertEquals(1, delayTimes.size()); + Assertions.assertArrayEquals(new Long[]{700L}, delayTimes.toArray()); + + timerDefinition.setStartTimeMs(100000L); // Start time set to 100 second in the future + delayTimes = configuration.getTriggerDelayTimes( + 0L, 0L, 1100000L); Assertions.assertEquals(2, delayTimes.size()); Assertions.assertArrayEquals(new Long[]{100L, 700L}, delayTimes.toArray()); diff --git a/regression-test/data/mtmv_p0/test_immediate_starttime_mtmv.out b/regression-test/data/mtmv_p0/test_immediate_starttime_mtmv.out new file mode 100644 index 00000000000..79ac76f677c Binary files /dev/null and b/regression-test/data/mtmv_p0/test_immediate_starttime_mtmv.out differ diff --git a/regression-test/suites/mtmv_p0/test_immediate_starttime_mtmv.groovy b/regression-test/suites/mtmv_p0/test_immediate_starttime_mtmv.groovy new file mode 100644 index 00000000000..8732eb20fda --- /dev/null +++ b/regression-test/suites/mtmv_p0/test_immediate_starttime_mtmv.groovy @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.time.Instant; +import java.time.ZoneId; +import org.junit.Assert; + +suite("test_immediate_starttime_mtmv","mtmv") { + String suiteName = "test_immediate_starttime_mtmv" + String tableName = "${suiteName}_table" + String mvName = "${suiteName}_mv" + + sql """drop table if exists `${tableName}`""" + sql """drop materialized view if exists ${mvName};""" + + sql """ + CREATE TABLE ${tableName} + ( + k2 INT, + k3 varchar(32) + ) + DISTRIBUTED BY HASH(k2) BUCKETS 2 + PROPERTIES ( + "replication_num" = "1" + ); + """ + sql """ + insert into ${tableName} values (2,1),(2,2); + """ + def currentMs = System.currentTimeMillis() + 10000; + def dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentMs), ZoneId.systemDefault()); + def formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + def startTime= dateTime.format(formatter); + sql """ + CREATE MATERIALIZED VIEW ${mvName} + REFRESH AUTO ON SCHEDULE EVERY 1 DAY STARTS '${startTime}' + DISTRIBUTED BY hash(k2) BUCKETS 2 + PROPERTIES ( + 'replication_num' = '1' + ) + AS + SELECT * from ${tableName}; + """ + Thread.sleep(20000) + order_qt_immediate "SELECT count(*) from tasks('type'='mv') where MvName='${mvName}'" + + sql """drop materialized view if exists ${mvName};""" + currentMs = System.currentTimeMillis() + 10000; + dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentMs), ZoneId.systemDefault()); + formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + startTime= dateTime.format(formatter); + sql """ + CREATE MATERIALIZED VIEW ${mvName} + build deferred REFRESH AUTO ON SCHEDULE EVERY 1 DAY STARTS '${startTime}' + DISTRIBUTED BY hash(k2) BUCKETS 2 + PROPERTIES ( + 'replication_num' = '1' + ) + AS + SELECT * from ${tableName}; + """ + Thread.sleep(20000) + order_qt_deferred "SELECT count(*) from tasks('type'='mv') where MvName='${mvName}'" + sql """drop table if exists `${tableName}`""" + sql """drop materialized view if exists ${mvName};""" +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org