This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5da1025adef [fix](compaction) fix time series compaction time 
threshold (#50306)
5da1025adef is described below

commit 5da1025adef87ab9c232ca3a90d64c2526ab58aa
Author: Sun Chenyang <suncheny...@selectdb.com>
AuthorDate: Thu Apr 24 15:56:07 2025 +0800

    [fix](compaction) fix time series compaction time threshold (#50306)
    
    
    fix time series compaction time threshold
---
 .../cumulative_compaction_time_series_policy.cpp   | 62 +++++++++++++---------
 ...mulative_compaction_time_series_policy_test.cpp | 11 ++--
 .../test_time_series_compaction_policy.groovy      | 30 +++++++++++
 3 files changed, 76 insertions(+), 27 deletions(-)

diff --git a/be/src/olap/cumulative_compaction_time_series_policy.cpp 
b/be/src/olap/cumulative_compaction_time_series_policy.cpp
index ebaf79f5abc..7d7ce9e732f 100644
--- a/be/src/olap/cumulative_compaction_time_series_policy.cpp
+++ b/be/src/olap/cumulative_compaction_time_series_policy.cpp
@@ -46,8 +46,9 @@ uint32_t 
TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
     }
     int64_t compaction_file_count =
             
tablet->tablet_meta()->time_series_compaction_file_count_threshold();
-    int64_t compaction_time_threshold =
+    int64_t compaction_time_threshold_seconds =
             
tablet->tablet_meta()->time_series_compaction_time_threshold_seconds();
+    int64_t earliest_rowset_creation_time = INT64_MAX;
 
     int64_t level0_total_size = 0;
     RowsetMetaSharedPtr first_meta;
@@ -79,6 +80,9 @@ uint32_t 
TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
             } else {
                 checked_rs_metas.push_back(rs_meta);
             }
+            if (rs_meta->creation_time() < earliest_rowset_creation_time) {
+                earliest_rowset_creation_time = rs_meta->creation_time();
+            }
         }
     }
 
@@ -103,24 +107,22 @@ uint32_t 
TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
         return score;
     }
 
-    int64_t now = UnixMillis();
-    int64_t last_cumu = tablet->last_cumu_compaction_success_time();
-    if (last_cumu != 0) {
-        int64_t cumu_interval = now - last_cumu;
+    // current time in seconds
+    int64_t now = time(nullptr);
+
+    if (earliest_rowset_creation_time < now) {
+        int64_t cumu_interval = now - earliest_rowset_creation_time;
 
         // Condition 3: the time interval between compactions exceeds the 
value specified by parameter _compaction_time_threshold_second
-        if (cumu_interval > (compaction_time_threshold * 1000) && score > 0) {
+        if (cumu_interval > compaction_time_threshold_seconds && score > 0) {
             return score;
         }
-    } else if (score > 0) {
-        // If the compaction process has not been successfully executed,
-        // the condition for triggering compaction based on the last 
successful compaction time (condition 3) will never be met
-        tablet->set_last_cumu_compaction_success_time(now);
     }
 
     if (compaction_level >= 2) {
         int64_t continuous_size = 0;
         std::vector<RowsetMetaSharedPtr> level1_rowsets;
+        int64_t earliest_level1_rowset_creation_time = INT64_MAX;
         for (const auto& rs_meta : checked_rs_metas) {
             if (rs_meta->compaction_level() == 0) {
                 break;
@@ -137,12 +139,15 @@ uint32_t 
TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
                     return level1_rowsets.size();
                 }
             }
+            if (rs_meta->creation_time() < 
earliest_level1_rowset_creation_time) {
+                earliest_level1_rowset_creation_time = 
rs_meta->creation_time();
+            }
         }
 
-        // Condition 5: level1 achieve compaction_time_threshold
-        if (last_cumu != 0 && level1_rowsets.size() >= 2) {
-            int64_t cumu_interval = now - last_cumu;
-            if (cumu_interval > compaction_time_threshold * 10 * 1000) {
+        // Condition 5: level1 achieve compaction_time_threshold_seconds
+        if (level1_rowsets.size() >= 2) {
+            int64_t cumu_interval = now - earliest_level1_rowset_creation_time;
+            if (cumu_interval > compaction_time_threshold_seconds * 10) {
                 return level1_rowsets.size();
             }
         }
@@ -196,7 +201,9 @@ void 
TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
     CHECK((*base_rowset_meta)->start_version() == 0);
 
     int64_t prev_version = -1;
-    int64_t now = UnixSeconds();
+
+    // current time in seconds
+    int64_t now = time(nullptr);
     for (const RowsetMetaSharedPtr& rs : existing_rss) {
         if (rs->version().first > prev_version + 1) {
             // There is a hole, do not continue
@@ -272,7 +279,7 @@ int32_t 
TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
     }
     int64_t compaction_file_count =
             
tablet->tablet_meta()->time_series_compaction_file_count_threshold();
-    int64_t compaction_time_threshold =
+    int64_t compaction_time_threshold_seconds =
             
tablet->tablet_meta()->time_series_compaction_time_threshold_seconds();
 
     int transient_size = 0;
@@ -347,10 +354,17 @@ int32_t 
TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
     }
 
     // Condition 3: the time interval between compactions exceeds the value 
specified by parameter compaction_time_threshold_second
-    int64_t now = UnixMillis();
-    if (last_cumu != 0) {
-        int64_t cumu_interval = now - last_cumu;
-        if (cumu_interval > (compaction_time_threshold * 1000) && 
transient_size > 0) {
+    // current time in seconds
+    int64_t now = time(nullptr);
+    if (!input_rowsets->empty()) {
+        LOG_EVERY_N(INFO, 1000) << "tablet is: " << tablet->tablet_id() << ", 
now: " << now
+                                << ", earliest rowset creation time: "
+                                << 
input_rowsets->front()->rowset_meta()->creation_time()
+                                << ", compaction_time_threshold_seconds: "
+                                << compaction_time_threshold_seconds
+                                << ", rowset count: " << transient_size;
+        int64_t cumu_interval = now - 
input_rowsets->front()->rowset_meta()->creation_time();
+        if (cumu_interval > compaction_time_threshold_seconds && 
transient_size > 0) {
             return transient_size;
         }
     }
@@ -385,10 +399,10 @@ int32_t 
TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
             }
         })
 
-        // Condition 5: level1 achieve compaction_time_threshold
-        if (last_cumu != 0 && level1_rowsets.size() >= 2) {
-            int64_t cumu_interval = now - last_cumu;
-            if (cumu_interval > compaction_time_threshold * 10 * 1000) {
+        // Condition 5: level1 achieve compaction_time_threshold_seconds
+        if (level1_rowsets.size() >= 2) {
+            int64_t cumu_interval = now - 
level1_rowsets.front()->rowset_meta()->creation_time();
+            if (cumu_interval > compaction_time_threshold_seconds * 10) {
                 input_rowsets->swap(level1_rowsets);
                 return input_rowsets->size();
             }
diff --git a/be/test/olap/cumulative_compaction_time_series_policy_test.cpp 
b/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
index 386a7a2d211..43f70957226 100644
--- a/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
+++ b/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
@@ -77,7 +77,7 @@ public:
         json2pb::JsonToProtoMessage(_json_rowset_meta, &rowset_meta_pb);
         rowset_meta_pb.set_start_version(start);
         rowset_meta_pb.set_end_version(end);
-        rowset_meta_pb.set_creation_time(10000);
+        rowset_meta_pb.set_creation_time(time(nullptr));
 
         pb1->init_from_pb(rowset_meta_pb);
         pb1->set_total_disk_size(41);
@@ -536,8 +536,13 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy, 
pick_input_rowsets_time_interva
             new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_TIME_SERIES_POLICY));
     static_cast<void>(_tablet->init());
     _tablet->calculate_cumulative_point();
-    int64_t now = UnixMillis();
-    _tablet->set_last_cumu_compaction_success_time(now - 3700 * 1000);
+
+    
_tablet->_tablet_meta->set_time_series_compaction_time_threshold_seconds(1);
+    std::this_thread::sleep_for(std::chrono::seconds(2));
+
+    int score =
+            
_tablet->_cumulative_compaction_policy->calc_cumulative_compaction_score(_tablet.get());
+    EXPECT_EQ(3, score);
 
     auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
 
diff --git 
a/regression-test/suites/compaction/test_time_series_compaction_policy.groovy 
b/regression-test/suites/compaction/test_time_series_compaction_policy.groovy
index d211ba98bdd..69c8fb97fa7 100644
--- 
a/regression-test/suites/compaction/test_time_series_compaction_policy.groovy
+++ 
b/regression-test/suites/compaction/test_time_series_compaction_policy.groovy
@@ -120,4 +120,34 @@ suite("test_time_series_compaction_polciy", "p0") {
     rowsetCount = get_rowset_count.call(tablets);
     assert (rowsetCount == 11 * replicaNum)
     qt_sql_3 """ select count() from ${tableName}"""
+
+    sql """ DROP TABLE IF EXISTS ${tableName}; """
+    sql """
+        CREATE TABLE ${tableName} (
+            `id` int(11) NULL,
+            `name` varchar(255) NULL,
+            `hobbies` text NULL,
+            `score` int(11) NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1",
+            "disable_auto_compaction" = "true",
+            "compaction_policy" = "time_series",
+            "time_series_compaction_time_threshold_seconds" = "70"
+        );
+    """
+
+    sql """ INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple", 
100); """
+    sql """ INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear", 
99); """
+    sql """ INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple", 
100); """
+    sql """ INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear", 
99); """
+    sql """ INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple", 
100); """
+    sql """ INSERT INTO ${tableName} VALUES (100, "andy", "andy love apple", 
100); """
+    sql """ INSERT INTO ${tableName} VALUES (100, "bason", "bason hate pear", 
99); """
+
+    Thread.sleep(75000)
+    trigger_and_wait_compaction(tableName, "cumulative")
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to