This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 5da1025adef [fix](compaction) fix time series compaction time threshold (#50306) 5da1025adef is described below commit 5da1025adef87ab9c232ca3a90d64c2526ab58aa Author: Sun Chenyang <suncheny...@selectdb.com> AuthorDate: Thu Apr 24 15:56:07 2025 +0800 [fix](compaction) fix time series compaction time threshold (#50306) fix time series compaction time threshold --- .../cumulative_compaction_time_series_policy.cpp | 62 +++++++++++++--------- ...mulative_compaction_time_series_policy_test.cpp | 11 ++-- .../test_time_series_compaction_policy.groovy | 30 +++++++++++ 3 files changed, 76 insertions(+), 27 deletions(-) diff --git a/be/src/olap/cumulative_compaction_time_series_policy.cpp b/be/src/olap/cumulative_compaction_time_series_policy.cpp index ebaf79f5abc..7d7ce9e732f 100644 --- a/be/src/olap/cumulative_compaction_time_series_policy.cpp +++ b/be/src/olap/cumulative_compaction_time_series_policy.cpp @@ -46,8 +46,9 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score( } int64_t compaction_file_count = tablet->tablet_meta()->time_series_compaction_file_count_threshold(); - int64_t compaction_time_threshold = + int64_t compaction_time_threshold_seconds = tablet->tablet_meta()->time_series_compaction_time_threshold_seconds(); + int64_t earliest_rowset_creation_time = INT64_MAX; int64_t level0_total_size = 0; RowsetMetaSharedPtr first_meta; @@ -79,6 +80,9 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score( } else { checked_rs_metas.push_back(rs_meta); } + if (rs_meta->creation_time() < earliest_rowset_creation_time) { + earliest_rowset_creation_time = rs_meta->creation_time(); + } } } @@ -103,24 +107,22 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score( return score; } - int64_t now = UnixMillis(); - int64_t last_cumu = tablet->last_cumu_compaction_success_time(); - if (last_cumu != 0) { - int64_t cumu_interval = now - last_cumu; + // current time in seconds + int64_t now = time(nullptr); + + if (earliest_rowset_creation_time < now) { + int64_t cumu_interval = now - earliest_rowset_creation_time; // Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second - if (cumu_interval > (compaction_time_threshold * 1000) && score > 0) { + if (cumu_interval > compaction_time_threshold_seconds && score > 0) { return score; } - } else if (score > 0) { - // If the compaction process has not been successfully executed, - // the condition for triggering compaction based on the last successful compaction time (condition 3) will never be met - tablet->set_last_cumu_compaction_success_time(now); } if (compaction_level >= 2) { int64_t continuous_size = 0; std::vector<RowsetMetaSharedPtr> level1_rowsets; + int64_t earliest_level1_rowset_creation_time = INT64_MAX; for (const auto& rs_meta : checked_rs_metas) { if (rs_meta->compaction_level() == 0) { break; @@ -137,12 +139,15 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score( return level1_rowsets.size(); } } + if (rs_meta->creation_time() < earliest_level1_rowset_creation_time) { + earliest_level1_rowset_creation_time = rs_meta->creation_time(); + } } - // Condition 5: level1 achieve compaction_time_threshold - if (last_cumu != 0 && level1_rowsets.size() >= 2) { - int64_t cumu_interval = now - last_cumu; - if (cumu_interval > compaction_time_threshold * 10 * 1000) { + // Condition 5: level1 achieve compaction_time_threshold_seconds + if (level1_rowsets.size() >= 2) { + int64_t cumu_interval = now - earliest_level1_rowset_creation_time; + if (cumu_interval > compaction_time_threshold_seconds * 10) { return level1_rowsets.size(); } } @@ -196,7 +201,9 @@ void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point( CHECK((*base_rowset_meta)->start_version() == 0); int64_t prev_version = -1; - int64_t now = UnixSeconds(); + + // current time in seconds + int64_t now = time(nullptr); for (const RowsetMetaSharedPtr& rs : existing_rss) { if (rs->version().first > prev_version + 1) { // There is a hole, do not continue @@ -272,7 +279,7 @@ int32_t TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets( } int64_t compaction_file_count = tablet->tablet_meta()->time_series_compaction_file_count_threshold(); - int64_t compaction_time_threshold = + int64_t compaction_time_threshold_seconds = tablet->tablet_meta()->time_series_compaction_time_threshold_seconds(); int transient_size = 0; @@ -347,10 +354,17 @@ int32_t TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets( } // Condition 3: the time interval between compactions exceeds the value specified by parameter compaction_time_threshold_second - int64_t now = UnixMillis(); - if (last_cumu != 0) { - int64_t cumu_interval = now - last_cumu; - if (cumu_interval > (compaction_time_threshold * 1000) && transient_size > 0) { + // current time in seconds + int64_t now = time(nullptr); + if (!input_rowsets->empty()) { + LOG_EVERY_N(INFO, 1000) << "tablet is: " << tablet->tablet_id() << ", now: " << now + << ", earliest rowset creation time: " + << input_rowsets->front()->rowset_meta()->creation_time() + << ", compaction_time_threshold_seconds: " + << compaction_time_threshold_seconds + << ", rowset count: " << transient_size; + int64_t cumu_interval = now - input_rowsets->front()->rowset_meta()->creation_time(); + if (cumu_interval > compaction_time_threshold_seconds && transient_size > 0) { return transient_size; } } @@ -385,10 +399,10 @@ int32_t TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets( } }) - // Condition 5: level1 achieve compaction_time_threshold - if (last_cumu != 0 && level1_rowsets.size() >= 2) { - int64_t cumu_interval = now - last_cumu; - if (cumu_interval > compaction_time_threshold * 10 * 1000) { + // Condition 5: level1 achieve compaction_time_threshold_seconds + if (level1_rowsets.size() >= 2) { + int64_t cumu_interval = now - level1_rowsets.front()->rowset_meta()->creation_time(); + if (cumu_interval > compaction_time_threshold_seconds * 10) { input_rowsets->swap(level1_rowsets); return input_rowsets->size(); } diff --git a/be/test/olap/cumulative_compaction_time_series_policy_test.cpp b/be/test/olap/cumulative_compaction_time_series_policy_test.cpp index 386a7a2d211..43f70957226 100644 --- a/be/test/olap/cumulative_compaction_time_series_policy_test.cpp +++ b/be/test/olap/cumulative_compaction_time_series_policy_test.cpp @@ -77,7 +77,7 @@ public: json2pb::JsonToProtoMessage(_json_rowset_meta, &rowset_meta_pb); rowset_meta_pb.set_start_version(start); rowset_meta_pb.set_end_version(end); - rowset_meta_pb.set_creation_time(10000); + rowset_meta_pb.set_creation_time(time(nullptr)); pb1->init_from_pb(rowset_meta_pb); pb1->set_total_disk_size(41); @@ -536,8 +536,13 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy, pick_input_rowsets_time_interva new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY)); static_cast<void>(_tablet->init()); _tablet->calculate_cumulative_point(); - int64_t now = UnixMillis(); - _tablet->set_last_cumu_compaction_success_time(now - 3700 * 1000); + + _tablet->_tablet_meta->set_time_series_compaction_time_threshold_seconds(1); + std::this_thread::sleep_for(std::chrono::seconds(2)); + + int score = + _tablet->_cumulative_compaction_policy->calc_cumulative_compaction_score(_tablet.get()); + EXPECT_EQ(3, score); auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction(); diff --git a/regression-test/suites/compaction/test_time_series_compaction_policy.groovy b/regression-test/suites/compaction/test_time_series_compaction_policy.groovy index d211ba98bdd..69c8fb97fa7 100644 --- a/regression-test/suites/compaction/test_time_series_compaction_policy.groovy +++ b/regression-test/suites/compaction/test_time_series_compaction_policy.groovy @@ -120,4 +120,34 @@ suite("test_time_series_compaction_polciy", "p0") { rowsetCount = get_rowset_count.call(tablets); assert (rowsetCount == 11 * replicaNum) qt_sql_3 """ select count() from ${tableName}""" + + sql """ DROP TABLE IF EXISTS ${tableName}; """ + sql """ + CREATE TABLE ${tableName} ( + `id` int(11) NULL, + `name` varchar(255) NULL, + `hobbies` text NULL, + `score` int(11) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "disable_auto_compaction" = "true", + "compaction_policy" = "time_series", + "time_series_compaction_time_threshold_seconds" = "70" + ); + """ + + sql """ INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple", 100); """ + sql """ INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear", 99); """ + sql """ INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple", 100); """ + sql """ INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear", 99); """ + sql """ INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple", 100); """ + sql """ INSERT INTO ${tableName} VALUES (100, "andy", "andy love apple", 100); """ + sql """ INSERT INTO ${tableName} VALUES (100, "bason", "bason hate pear", 99); """ + + Thread.sleep(75000) + trigger_and_wait_compaction(tableName, "cumulative") } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org