This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new df7558d5d40 branch-4.0: [Fix](Compaction) Fix cumulative compaction 
pick rowsets to trim by max score after filtering #59268 (#59475)
df7558d5d40 is described below

commit df7558d5d40e17d458a479070555e8907611a4fd
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Dec 31 11:47:21 2025 +0800

    branch-4.0: [Fix](Compaction) Fix cumulative compaction pick rowsets to 
trim by max score after filtering #59268 (#59475)
    
    Cherry-picked from #59268
    
    Co-authored-by: Jimmy <[email protected]>
---
 .../cloud/cloud_cumulative_compaction_policy.cpp   |   31 +-
 be/src/olap/cumulative_compaction_policy.cpp       |   25 +-
 .../cloud_cumulative_compaction_policy_test.cpp    |  122 ++
 be/test/olap/cumulative_compaction_policy_test.cpp | 1801 +++++++++++++++++++-
 4 files changed, 1967 insertions(+), 12 deletions(-)

diff --git a/be/src/cloud/cloud_cumulative_compaction_policy.cpp 
b/be/src/cloud/cloud_cumulative_compaction_policy.cpp
index 7f0259b5f02..5fee34119ac 100644
--- a/be/src/cloud/cloud_cumulative_compaction_policy.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction_policy.cpp
@@ -30,6 +30,7 @@
 #include "olap/olap_common.h"
 #include "olap/tablet.h"
 #include "olap/tablet_meta.h"
+#include "util/defer_op.h"
 
 namespace doris {
 #include "common/compile_check_begin.h"
@@ -120,6 +121,26 @@ int64_t 
CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
     int transient_size = 0;
     *compaction_score = 0;
     int64_t total_size = 0;
+    bool skip_trim = false; // Skip trim for Empty Rowset Compaction
+
+    // DEFER: trim input_rowsets from back if score > max_compaction_score
+    // This ensures we don't return more rowsets than allowed by 
max_compaction_score,
+    // while still collecting enough rowsets to pass min_compaction_score 
check after level_size removal.
+    // Must be placed after variable initialization and before collection loop.
+    DEFER({
+        if (skip_trim) {
+            return;
+        }
+        // Keep at least 1 rowset to avoid removing the only rowset 
(consistent with fallback branch)
+        while (input_rowsets->size() > 1 &&
+               *compaction_score > static_cast<size_t>(max_compaction_score)) {
+            auto& last_rowset = input_rowsets->back();
+            *compaction_score -= 
last_rowset->rowset_meta()->get_compaction_score();
+            total_size -= last_rowset->rowset_meta()->total_disk_size();
+            input_rowsets->pop_back();
+        }
+    });
+
     for (auto& rowset : candidate_rowsets) {
         // check whether this rowset is delete version
         if (!allow_delete && rowset->rowset_meta()->has_delete_predicate()) {
@@ -143,10 +164,8 @@ int64_t 
CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
                 continue;
             }
         }
-        if (*compaction_score >= max_compaction_score) {
-            // got enough segments
-            break;
-        }
+        // Removed: max_compaction_score check here
+        // We now collect all candidate rowsets and trim from back at return 
time via DEFER
         *compaction_score += rowset->rowset_meta()->get_compaction_score();
         total_size += rowset->rowset_meta()->total_disk_size();
 
@@ -184,8 +203,10 @@ int64_t 
CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
                             static_cast<double>(input_rowsets->size()) >=
                     config::empty_rowset_compaction_min_ratio) {
             // Prioritize consecutive empty rowset compaction
+            // Skip trim: empty rowset compaction has very low cost and the 
goal is to reduce rowset count
             *input_rowsets = consecutive_empty_rowsets;
             *compaction_score = consecutive_empty_rowsets.size();
+            skip_trim = true;
             return consecutive_empty_rowsets.size();
         }
     }
@@ -229,7 +250,7 @@ int64_t 
CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
                 *compaction_score = max_score;
                 return transient_size;
             }
-            // Exceeding max compaction score, do compaction on all candidate 
rowsets anyway
+            // no rowset is OVERLAPPING, return all input rowsets (DEFER will 
trim to max_compaction_score)
             return transient_size;
         }
     }
diff --git a/be/src/olap/cumulative_compaction_policy.cpp 
b/be/src/olap/cumulative_compaction_policy.cpp
index 6f74f86bf9d..2fef133f769 100644
--- a/be/src/olap/cumulative_compaction_policy.cpp
+++ b/be/src/olap/cumulative_compaction_policy.cpp
@@ -29,6 +29,7 @@
 #include "olap/tablet.h"
 #include "olap/tablet_meta.h"
 #include "util/debug_points.h"
+#include "util/defer_op.h"
 
 namespace doris {
 #include "common/compile_check_begin.h"
@@ -268,6 +269,22 @@ int 
SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
     int transient_size = 0;
     *compaction_score = 0;
     int64_t total_size = 0;
+
+    // DEFER: trim input_rowsets from back if score > max_compaction_score
+    // This ensures we don't return more rowsets than allowed by 
max_compaction_score,
+    // while still collecting enough rowsets to pass min_compaction_score 
check after level_size removal.
+    // Must be placed after variable initialization and before collection loop.
+    DEFER({
+        // Keep at least 1 rowset to avoid removing the only rowset 
(consistent with fallback branch)
+        while (input_rowsets->size() > 1 &&
+               *compaction_score > static_cast<size_t>(max_compaction_score)) {
+            auto& last_rowset = input_rowsets->back();
+            *compaction_score -= 
last_rowset->rowset_meta()->get_compaction_score();
+            total_size -= last_rowset->rowset_meta()->total_disk_size();
+            input_rowsets->pop_back();
+        }
+    });
+
     for (auto& rowset : candidate_rowsets) {
         // check whether this rowset is delete version
         if (!allow_delete && rowset->rowset_meta()->has_delete_predicate()) {
@@ -291,10 +308,8 @@ int 
SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
                 continue;
             }
         }
-        if (*compaction_score >= max_compaction_score) {
-            // got enough segments
-            break;
-        }
+        // Removed: max_compaction_score check here
+        // We now collect all candidate rowsets and trim from back at return 
time via DEFER
         *compaction_score += rowset->rowset_meta()->get_compaction_score();
         total_size += rowset->rowset_meta()->total_disk_size();
 
@@ -355,7 +370,7 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
             *compaction_score = max_score;
             return transient_size;
         }
-        // no rowset is OVERLAPPING, execute compaction on all input rowsets
+        // no rowset is OVERLAPPING, return all input rowsets (DEFER will trim 
to max_compaction_score)
         return transient_size;
     }
     input_rowsets->erase(input_rowsets->begin(), rs_begin);
diff --git a/be/test/cloud/cloud_cumulative_compaction_policy_test.cpp 
b/be/test/cloud/cloud_cumulative_compaction_policy_test.cpp
index 4805d579df8..b509056c04c 100644
--- a/be/test/cloud/cloud_cumulative_compaction_policy_test.cpp
+++ b/be/test/cloud/cloud_cumulative_compaction_policy_test.cpp
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "cloud/cloud_cumulative_compaction_policy.h"
+
 #include <gen_cpp/AgentService_types.h>
 #include <gen_cpp/olap_file.pb.h>
 #include <gtest/gtest-message.h>
@@ -22,6 +24,8 @@
 #include <gtest/gtest.h>
 
 #include "cloud/cloud_storage_engine.h"
+#include "cloud/config.h"
+#include "common/config.h"
 #include "gtest/gtest_pred_impl.h"
 #include "json2pb/json_to_pb.h"
 #include "olap/olap_common.h"
@@ -146,4 +150,122 @@ TEST_F(TestCloudSizeBasedCumulativeCompactionPolicy, 
new_cumulative_point) {
     Version version(1, 1);
     EXPECT_EQ(policy.new_cumulative_point(&_tablet, output_rowset, version, 
2), 6);
 }
+
+// Test case: Empty rowset compaction with skip_trim
+TEST_F(TestCloudSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_empty_rowset_compaction) {
+    // Save original config values
+    bool orig_enable_empty_rowset_compaction = 
config::enable_empty_rowset_compaction;
+    int32_t orig_empty_rowset_compaction_min_count = 
config::empty_rowset_compaction_min_count;
+    double orig_empty_rowset_compaction_min_ratio = 
config::empty_rowset_compaction_min_ratio;
+
+    // Enable empty rowset compaction
+    config::enable_empty_rowset_compaction = true;
+    config::empty_rowset_compaction_min_count = 5;
+    config::empty_rowset_compaction_min_ratio = 0.5;
+
+    CloudTablet _tablet(_engine, _tablet_meta);
+    _tablet._base_size = 1024L * 1024 * 1024; // 1GB base
+
+    // Create candidate rowsets: 2 normal + 150 empty rowsets
+    // This tests that skip_trim = true for empty rowset compaction
+    std::vector<RowsetSharedPtr> candidate_rowsets;
+
+    // 2 normal rowsets
+    for (int i = 0; i < 2; i++) {
+        auto rowset = create_rowset(Version(i + 2, i + 2), 1, true, 1024 * 
1024); // 1MB
+        candidate_rowsets.push_back(rowset);
+    }
+
+    // 150 empty rowsets (consecutive)
+    for (int i = 0; i < 150; i++) {
+        auto rowset = create_rowset(Version(i + 4, i + 4), 0, false, 0); // 
empty
+        candidate_rowsets.push_back(rowset);
+    }
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    CloudSizeBasedCumulativeCompactionPolicy policy;
+    // max=100, but empty rowset compaction should return 150 (skip_trim = 
true)
+    policy.pick_input_rowsets(&_tablet, candidate_rowsets, 100, 50, 
&input_rowsets,
+                              &last_delete_version, &compaction_score, true);
+
+    // Empty rowset compaction should return all 150 empty rowsets
+    // skip_trim = true, so no trimming even though score > max
+    EXPECT_EQ(150, input_rowsets.size());
+    EXPECT_EQ(150, compaction_score);
+
+    // Verify all returned rowsets are empty
+    for (const auto& rs : input_rowsets) {
+        EXPECT_EQ(0, rs->num_segments());
+    }
+
+    // Restore original config values
+    config::enable_empty_rowset_compaction = 
orig_enable_empty_rowset_compaction;
+    config::empty_rowset_compaction_min_count = 
orig_empty_rowset_compaction_min_count;
+    config::empty_rowset_compaction_min_ratio = 
orig_empty_rowset_compaction_min_ratio;
+}
+
+// Test case: prioritize_query_perf_in_compaction for non-DUP_KEYS table
+// This tests the branch: rs_begin == end && prioritize_query_perf && 
keys_type != DUP_KEYS
+TEST_F(TestCloudSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_prioritize_query_perf) {
+    // Save original config value
+    bool orig_prioritize_query_perf = 
config::prioritize_query_perf_in_compaction;
+
+    // Enable prioritize_query_perf_in_compaction
+    config::prioritize_query_perf_in_compaction = true;
+
+    // Create tablet with UNIQUE keys (not DUP_KEYS)
+    TTabletSchema schema;
+    schema.keys_type = TKeysType::UNIQUE_KEYS;
+    TabletMetaSharedPtr tablet_meta(new TabletMeta(1, 2, 15673, 15674, 4, 5, 
schema, 6, {{7, 8}},
+                                                   UniqueId(9, 10), 
TTabletType::TABLET_TYPE_DISK,
+                                                   TCompressionType::LZ4F));
+
+    CloudTablet _tablet(_engine, tablet_meta);
+    // Use large base_size to get large promotion_size, ensuring total_size < 
promotion_size
+    // so we don't trigger promotion_size early return and can reach 
level_size logic
+    _tablet._base_size = 20L * 1024 * 1024 * 1024; // 20GB base, 
promotion_size ~= 1GB
+
+    // Create candidate rowsets that will ALL be removed by level_size
+    // Key: each rowset's level > remain_level after removal
+    std::vector<RowsetSharedPtr> candidate_rowsets;
+
+    // 3 rowsets with decreasing sizes, all will be removed by level_size:
+    // - 40MB: level(40)=32, remain=35, level(35)=32, 32>32? NO... need 
adjustment
+    // Let's use sizes that guarantee all removal:
+    // - 50MB: level(50)=32, after remove remain=25, level(25)=16, 32>16 -> 
remove
+    // - 20MB: level(20)=16, after remove remain=5, level(5)=4, 16>4 -> remove
+    // - 5MB:  level(5)=4, after remove remain=0, level(0)=0, 4>0 -> remove
+    auto rowset1 = create_rowset(Version(2, 2), 30, true, 50L * 1024 * 1024); 
// 50MB, score=30
+    auto rowset2 = create_rowset(Version(3, 3), 20, true, 20L * 1024 * 1024); 
// 20MB, score=20
+    auto rowset3 = create_rowset(Version(4, 4), 10, true, 5L * 1024 * 1024);  
// 5MB, score=10
+    candidate_rowsets.push_back(rowset1);
+    candidate_rowsets.push_back(rowset2);
+    candidate_rowsets.push_back(rowset3);
+
+    // total_size = 75MB < promotion_size (~1GB), enters level_size logic
+    // All 3 rowsets will be removed by level_size -> rs_begin == end
+    // With prioritize_query_perf=true and UNIQUE_KEYS, should return all 
candidates
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    CloudSizeBasedCumulativeCompactionPolicy policy;
+    policy.pick_input_rowsets(&_tablet, candidate_rowsets, 100, 5, 
&input_rowsets,
+                              &last_delete_version, &compaction_score, true);
+
+    // With prioritize_query_perf enabled for non-DUP_KEYS table,
+    // when all rowsets are removed by level_size, should return all candidates
+    // (before DEFER trim)
+    // Total score = 60, max = 100, so no trimming needed
+    EXPECT_EQ(3, input_rowsets.size());
+    EXPECT_EQ(60, compaction_score);
+
+    // Restore original config value
+    config::prioritize_query_perf_in_compaction = orig_prioritize_query_perf;
+}
+
 } // namespace doris
diff --git a/be/test/olap/cumulative_compaction_policy_test.cpp 
b/be/test/olap/cumulative_compaction_policy_test.cpp
index 1fc735296ce..e6a6c005d6b 100644
--- a/be/test/olap/cumulative_compaction_policy_test.cpp
+++ b/be/test/olap/cumulative_compaction_policy_test.cpp
@@ -468,8 +468,10 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_normal) {
             _tablet.get(), candidate_rowsets, 10, 5, &input_rowsets, 
&last_delete_version,
             &compaction_score, config::enable_delete_when_cumu_compaction);
 
-    EXPECT_EQ(4, input_rowsets.size());
-    EXPECT_EQ(12, compaction_score);
+    // After fix: max=10, total score=12, trim from back until score <= max
+    // 4 rowsets with score=3 each = 12, trim 1 -> 3 rowsets with score=9
+    EXPECT_EQ(3, input_rowsets.size());
+    EXPECT_EQ(9, compaction_score);
     EXPECT_EQ(-1, last_delete_version.first);
     EXPECT_EQ(-1, last_delete_version.second);
 }
@@ -736,6 +738,1801 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
_level_size) {
     EXPECT_EQ(1 << 19, policy->_level_size((1 << 20) - 100));
 }
 
+// Test case: Large head rowsets with high score removed by level_size,
+// but now we collect all rowsets and trim from back, so small rowsets can 
pass min check
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_large_head_with_high_score) {
+    // Scenario from the bug:
+    // - Head rowsets have high score but get removed by level_size
+    // - Previously, max_compaction_score check in collection phase limited 
collected rowsets
+    // - After fix, we collect all and trim from back
+    //
+    // Key: total_size must be < promotion_size (64MB) to enter level_size 
logic
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset: version 0-1, 1GB, NONOVERLAPPING
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024); // 1GB
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // Large rowset with high score: version 2-2, 35MB, 80 segments (score=80)
+    // level(35MB) = 32MB, will be removed by level_size (35 > 22 remain)
+    RowsetMetaSharedPtr ptr2(new RowsetMeta());
+    init_rs_meta(ptr2, 2, 2);
+    ptr2->set_total_disk_size(35L * 1024 * 1024); // 35MB
+    ptr2->set_num_segments(80);
+    ptr2->set_segments_overlap(OVERLAPPING);
+    rs_metas.push_back(ptr2);
+
+    // Another rowset: version 3-3, 15MB, 1 segment (score=1)
+    // level(15MB) = 8MB, will also be removed by level_size (15 > 7 remain)
+    RowsetMetaSharedPtr ptr3(new RowsetMeta());
+    init_rs_meta(ptr3, 3, 3);
+    ptr3->set_total_disk_size(15L * 1024 * 1024); // 15MB
+    ptr3->set_num_segments(1);
+    ptr3->set_segments_overlap(OVERLAPPING);
+    rs_metas.push_back(ptr3);
+
+    // 70 small rowsets: version 4-73, each 100KB, score=1
+    // total small = 7MB, total_size = 35+15+7 = 57MB < promotion_size(64MB)
+    // After level_size removes head rowsets, these should pass min check 
(score=70 >= 50)
+    for (int i = 4; i <= 73; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(100 * 1024); // 100KB
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    // cumulative_point should be 2 (after base rowset)
+    EXPECT_EQ(2, _tablet->cumulative_layer_point());
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+    // Should have 72 candidate rowsets (version 2-73)
+    EXPECT_EQ(72, candidate_rowsets.size());
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    // max=100, min=50
+    // total_size = 57MB < promotion_size(64MB), enters level_size logic
+    // level_size removes rowsets 2,3 (35MB, 15MB), remaining 70 small rowsets
+    // score after level_size = 70 >= min(50), trim to score <= 100
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 50, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    // Should select rowsets and trim to max score
+    EXPECT_GT(input_rowsets.size(), 0);
+    EXPECT_LE(compaction_score, 100);
+    // The first rowset in result should not be the large head rowsets (they 
are removed by level_size)
+    if (!input_rowsets.empty()) {
+        EXPECT_GE(input_rowsets.front()->start_version(), 4);
+    }
+}
+
+// Test case: Verify trimming from back when score exceeds max
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_trim_from_back) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // 150 small rowsets, each with score=1
+    for (int i = 2; i <= 151; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(1 * 1024 * 1024); // 1MB
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+    EXPECT_EQ(150, candidate_rowsets.size());
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    // max=100, min=5
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 5, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    // Should trim to max=100
+    EXPECT_EQ(100, input_rowsets.size());
+    EXPECT_EQ(100, compaction_score);
+    // Verify we kept the earlier versions (trimmed from back)
+    EXPECT_EQ(2, input_rowsets.front()->start_version());
+    EXPECT_EQ(101, input_rowsets.back()->end_version());
+}
+
+// Test case: Score exactly at max, should not trim
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_score_exactly_max) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // Exactly 100 small rowsets, each with score=1
+    for (int i = 2; i <= 101; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(1 * 1024 * 1024); // 1MB
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    // max=100, collected score=100, should not trim
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 5, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    EXPECT_EQ(100, input_rowsets.size());
+    EXPECT_EQ(100, compaction_score);
+}
+
+// Test case: Single rowset with score > max should be kept (not trimmed to 
empty)
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_single_rowset_high_score) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // Single rowset with very high score (150 segments)
+    RowsetMetaSharedPtr ptr2(new RowsetMeta());
+    init_rs_meta(ptr2, 2, 2);
+    ptr2->set_total_disk_size(100L * 1024 * 1024); // 100MB, passes min_size 
check
+    ptr2->set_num_segments(150);
+    ptr2->set_segments_overlap(OVERLAPPING);
+    rs_metas.push_back(ptr2);
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    // max=100, but single rowset has score=150
+    // Should keep the single rowset (size > 1 protection)
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 5, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    // Should keep the single rowset even though score > max
+    EXPECT_EQ(1, input_rowsets.size());
+    EXPECT_EQ(150, compaction_score);
+}
+
+// Test case: Fallback branch - all rowsets removed by level_size but score >= 
max
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_fallback_all_removed) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset: 1GB (promotion_size = 64MB with this base)
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024); // 1GB
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // Large rowset with high score: 40MB, 50 segments
+    // level(40MB) = 32MB, will be removed (32MB > remain_level after removal)
+    RowsetMetaSharedPtr ptr2(new RowsetMeta());
+    init_rs_meta(ptr2, 2, 2);
+    ptr2->set_total_disk_size(40L * 1024 * 1024);
+    ptr2->set_num_segments(50);
+    ptr2->set_segments_overlap(OVERLAPPING);
+    rs_metas.push_back(ptr2);
+
+    // Another rowset: 15MB, 30 segments
+    // level(15MB) = 8MB, will be removed
+    RowsetMetaSharedPtr ptr3(new RowsetMeta());
+    init_rs_meta(ptr3, 3, 3);
+    ptr3->set_total_disk_size(15L * 1024 * 1024);
+    ptr3->set_num_segments(30);
+    ptr3->set_segments_overlap(OVERLAPPING);
+    rs_metas.push_back(ptr3);
+
+    // Another rowset: 5MB, 25 segments
+    // level(5MB) = 4MB, will be removed
+    RowsetMetaSharedPtr ptr4(new RowsetMeta());
+    init_rs_meta(ptr4, 4, 4);
+    ptr4->set_total_disk_size(5L * 1024 * 1024);
+    ptr4->set_num_segments(25);
+    ptr4->set_segments_overlap(OVERLAPPING);
+    rs_metas.push_back(ptr4);
+
+    // total_size = 60MB < promotion_size(64MB), enters level_size logic
+    // level_size calculation:
+    // - total=60MB, rowset2: level(40)=32, remain=20, level(20)=16, 32>16, 
remove
+    // - total=20MB, rowset3: level(15)=8, remain=5, level(5)=4, 8>4, remove
+    // - total=5MB, rowset4: level(5)=4, remain=0, level(0)=0, 4>0, remove
+    // All removed! score=105 >= max(100), triggers fallback
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    // All rowsets removed by level_size, score (105) >= max (100)
+    // Fallback should select the rowset with max score (50 segments)
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 5, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    // Fallback selects single rowset with max score
+    EXPECT_EQ(1, input_rowsets.size());
+    EXPECT_EQ(50, compaction_score);
+    EXPECT_EQ(2, input_rowsets.front()->start_version());
+}
+
+// Test case: Trim after promotion_size early return
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_trim_after_promotion_size) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset: 1GB
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // 200 rowsets, each 1MB with score=1
+    // Total size = 200MB, which is >= promotion_size (64MB default)
+    // Total score = 200, which is > max (100)
+    for (int i = 2; i <= 201; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(1 * 1024 * 1024); // 1MB
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    // Set promotion_min_size to 64MB so 200MB > promotion_size
+    config::compaction_promotion_min_size_mbytes = 64;
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 5, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    // Should trigger promotion_size early return but still trim to max
+    EXPECT_LE(compaction_score, 100);
+    EXPECT_EQ(100, input_rowsets.size());
+}
+
+// Test case: Trim with varying scores (high score rowsets at tail)
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_trim_high_score_tail) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // 95 small rowsets with score=1 each
+    for (int i = 2; i <= 96; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(1 * 1024 * 1024); // 1MB
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    // 1 rowset at tail with score=10
+    RowsetMetaSharedPtr ptr_high(new RowsetMeta());
+    init_rs_meta(ptr_high, 97, 97);
+    ptr_high->set_total_disk_size(10 * 1024 * 1024); // 10MB
+    ptr_high->set_num_segments(10);
+    ptr_high->set_segments_overlap(OVERLAPPING);
+    rs_metas.push_back(ptr_high);
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    // Total score = 95 + 10 = 105, max = 100
+    // Should trim the tail rowset (score=10), leaving 95 rowsets with score=95
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 5, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    // After trimming high-score tail, we get score <= 100
+    EXPECT_LE(compaction_score, 100);
+    // The high score rowset at version 97 should be trimmed
+    if (!input_rowsets.empty()) {
+        EXPECT_LT(input_rowsets.back()->end_version(), 97);
+    }
+}
+
+// Test case: Trim after delete version early return
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_trim_after_delete_version) {
+    config::enable_delete_when_cumu_compaction = false;
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // 150 small rowsets before delete version, each with score=1
+    for (int i = 2; i <= 151; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(1 * 1024 * 1024); // 1MB
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    // Delete version at 152
+    RowsetMetaSharedPtr ptr_del(new RowsetMeta());
+    init_rs_meta(ptr_del, 152, 152);
+    DeletePredicatePB del;
+    del.add_sub_predicates("a = 1");
+    del.set_version(152);
+    ptr_del->set_delete_predicate(del);
+    ptr_del->set_segments_overlap(OVERLAP_UNKNOWN);
+    rs_metas.push_back(ptr_del);
+
+    // More rowsets after delete version
+    for (int i = 153; i <= 160; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(1 * 1024 * 1024);
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    // max=100, rowsets before delete have score=150
+    // Should stop at delete version and trim to max
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 5, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    // Should trim to max=100
+    EXPECT_LE(compaction_score, 100);
+    EXPECT_EQ(152, last_delete_version.first);
+    // All selected rowsets should be before delete version
+    for (auto& rs : input_rowsets) {
+        EXPECT_LT(rs->end_version(), 152);
+    }
+}
+
+// Test case: Empty rowsets mixed with normal rowsets
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_with_empty_rowsets) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset: 1GB
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // Large rowset with high score: 35MB, 80 segments (score=80)
+    // Will be removed by level_size
+    RowsetMetaSharedPtr ptr2(new RowsetMeta());
+    init_rs_meta(ptr2, 2, 2);
+    ptr2->set_total_disk_size(35L * 1024 * 1024); // 35MB
+    ptr2->set_num_segments(80);
+    ptr2->set_segments_overlap(OVERLAPPING);
+    rs_metas.push_back(ptr2);
+
+    // 50 empty rowsets (score=1 each because NONOVERLAPPING)
+    for (int i = 3; i <= 52; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(0);
+        ptr->set_num_segments(0);
+        ptr->set_segments_overlap(NONOVERLAPPING); // score=1 for 
NONOVERLAPPING
+        rs_metas.push_back(ptr);
+    }
+
+    // 100 small rowsets (score=1 each)
+    for (int i = 53; i <= 152; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(200 * 1024); // 200KB
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    // total_size = 30MB + 0 + 20MB = 50MB < promotion_size(64MB)
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+    // Total: 1 large + 50 empty + 100 small = 151 rowsets
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    // max=100, min=50
+    // Total score = 80 + 50 + 100 = 230 (empty rowsets have score=1 because 
NONOVERLAPPING)
+    // After level_size removes large rowset, remaining = 50 empty + 100 
small, score = 150
+    // Trim to score <= 100
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 50, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    // Should select rowsets after level_size removal and trim
+    EXPECT_GT(input_rowsets.size(), 0);
+    EXPECT_LE(compaction_score, 100);
+    // First rowset should not be the large one (removed by level_size)
+    if (!input_rowsets.empty()) {
+        EXPECT_GE(input_rowsets.front()->start_version(), 3);
+    }
+}
+
+// Test case: Score below min after level_size removal, should return empty
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_below_min_after_level_size) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset: 20GB
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(20L * 1024 * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // Large rowset: 200MB, 80 segments (score=80)
+    // Will be removed by level_size
+    RowsetMetaSharedPtr ptr2(new RowsetMeta());
+    init_rs_meta(ptr2, 2, 2);
+    ptr2->set_total_disk_size(200L * 1024 * 1024);
+    ptr2->set_num_segments(80);
+    ptr2->set_segments_overlap(OVERLAPPING);
+    rs_metas.push_back(ptr2);
+
+    // 10 small rowsets (score=1 each, total=10)
+    // After level_size removes large, remaining score=10 < min=50
+    for (int i = 3; i <= 12; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(100 * 1024); // 100KB
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    // max=100, min=50
+    // Total score = 90, but after level_size removes large rowset, remaining 
= 10 < 50
+    // Also size < min_size (64MB), so should return empty
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 50, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    // Should return empty since score < min and size < min_size
+    EXPECT_EQ(0, input_rowsets.size());
+    EXPECT_EQ(0, compaction_score);
+}
+
+// Test case: Min equals max configuration
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_min_equals_max) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // 150 small rowsets, each with score=1
+    for (int i = 2; i <= 151; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(1 * 1024 * 1024); // 1MB
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    // min=max=100, should still work correctly
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 100, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    // Should select exactly 100 rowsets (trim to max)
+    EXPECT_EQ(100, input_rowsets.size());
+    EXPECT_EQ(100, compaction_score);
+}
+
+// Test case: Very small gap between min and max
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_small_gap_min_max) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // 150 small rowsets, each with score=1
+    for (int i = 2; i <= 151; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(1 * 1024 * 1024); // 1MB
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    // min=95, max=100, very small gap
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 95, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    // Should select exactly 100 rowsets (trim to max)
+    EXPECT_EQ(100, input_rowsets.size());
+    EXPECT_EQ(100, compaction_score);
+}
+
+// Test case: Bug scenario with empty rowsets (full version)
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_large_head_with_empty_rowsets) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset: 1GB
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // Large rowset with high score: 35MB, 79 segments (score=79)
+    // Will be removed by level_size
+    RowsetMetaSharedPtr ptr2(new RowsetMeta());
+    init_rs_meta(ptr2, 2, 2);
+    ptr2->set_total_disk_size(35L * 1024 * 1024); // 35MB
+    ptr2->set_num_segments(79);
+    ptr2->set_segments_overlap(OVERLAPPING);
+    rs_metas.push_back(ptr2);
+
+    // Another large rowset: 15MB, 1 segment (score=1)
+    // Will also be removed by level_size
+    RowsetMetaSharedPtr ptr3(new RowsetMeta());
+    init_rs_meta(ptr3, 3, 3);
+    ptr3->set_total_disk_size(15L * 1024 * 1024); // 15MB
+    ptr3->set_num_segments(1);
+    ptr3->set_segments_overlap(OVERLAPPING);
+    rs_metas.push_back(ptr3);
+
+    // 50 empty rowsets (score=1 each because NONOVERLAPPING)
+    for (int i = 4; i <= 53; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(0);
+        ptr->set_num_segments(0);
+        ptr->set_segments_overlap(NONOVERLAPPING); // score=1
+        rs_metas.push_back(ptr);
+    }
+
+    // 70 small rowsets (score=1 each)
+    for (int i = 54; i <= 123; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(100 * 1024); // 100KB
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    // total_size = 30MB + 15MB + 0 + 7MB = 52MB < promotion_size(64MB)
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    // max=100, min=50
+    // Total: 2 large (score=80) + 50 empty (score=50) + 70 small (score=70) = 
200
+    // After level_size removes 2 large: 50 empty + 70 small, score=120 >= 50
+    // Trim to score <= 100
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 50, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    // Should select rowsets and trim to max
+    EXPECT_GT(input_rowsets.size(), 0);
+    EXPECT_LE(compaction_score, 100);
+    // First rowset should be empty rowset (large ones removed by level_size)
+    if (!input_rowsets.empty()) {
+        EXPECT_GE(input_rowsets.front()->start_version(), 4);
+    }
+}
+
+// Test case: Score exactly at min
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_score_exactly_min) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // 50 small rowsets, each with score=1, total score=50=min
+    for (int i = 2; i <= 51; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(1 * 1024 * 1024); // 1MB
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    // min=50, score=50, should pass min check
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 50, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    EXPECT_EQ(50, input_rowsets.size());
+    EXPECT_EQ(50, compaction_score);
+}
+
+// Test case: Score one above max
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_score_one_above_max) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // 101 small rowsets, each with score=1, total score=101
+    for (int i = 2; i <= 102; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(1 * 1024 * 1024); // 1MB
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    // max=100, score=101, should trim 1 rowset from back
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 5, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    EXPECT_EQ(100, input_rowsets.size());
+    EXPECT_EQ(100, compaction_score);
+    EXPECT_EQ(101, input_rowsets.back()->end_version());
+}
+
+// Test case: Only empty rowsets
+// Note: Empty rowsets with NONOVERLAPPING have score=1 (not 0!)
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_only_empty_rowsets) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // 100 empty rowsets (score=1 each because NONOVERLAPPING)
+    for (int i = 2; i <= 101; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(0);
+        ptr->set_num_segments(0);
+        ptr->set_segments_overlap(NONOVERLAPPING); // score=1
+        rs_metas.push_back(ptr);
+    }
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    // 100 empty rowsets, score=100 >= min=5
+    // size=0 < min_size, but score >= min so pass min check
+    // Trim to max=100
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 5, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    // All 100 empty rowsets selected, score=100
+    EXPECT_EQ(100, input_rowsets.size());
+    EXPECT_EQ(100, compaction_score);
+}
+
+// Test case: Large count of empty rowsets mixed with normal
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_large_count_empty_rowsets) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // 500 empty rowsets (score=0)
+    for (int i = 2; i <= 501; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(0);
+        ptr->set_num_segments(0);
+        ptr->set_segments_overlap(NONOVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    // 100 normal rowsets (score=1 each)
+    for (int i = 502; i <= 601; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(1 * 1024 * 1024); // 1MB
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+    // Should have 600 candidate rowsets
+    EXPECT_EQ(600, candidate_rowsets.size());
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    // Total score = 500 (empty rowsets score=1) + 100 = 600
+    // Increase max to 600 to allow collecting all
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 600, 5, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    // Should select all 600 rowsets
+    EXPECT_EQ(600, input_rowsets.size());
+    EXPECT_EQ(600, compaction_score);
+}
+
+// Test case: Single non-overlapping rowset
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_single_non_overlapping) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // Single non-overlapping rowset with size >= min_size
+    RowsetMetaSharedPtr ptr2(new RowsetMeta());
+    init_rs_meta(ptr2, 2, 2);
+    ptr2->set_total_disk_size(100L * 1024 * 1024); // 100MB >= 64MB min_size
+    ptr2->set_num_segments(1);
+    ptr2->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr2);
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 5, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    // Single non-overlapping rowset doesn't need compaction
+    EXPECT_EQ(0, input_rowsets.size());
+    EXPECT_EQ(0, compaction_score);
+}
+
+// Test case: Fallback when all removed by level_size but score < max
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_fallback_score_below_max) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset: 20GB
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(20L * 1024 * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // Large rowset: 200MB, 30 segments (score=30)
+    RowsetMetaSharedPtr ptr2(new RowsetMeta());
+    init_rs_meta(ptr2, 2, 2);
+    ptr2->set_total_disk_size(200L * 1024 * 1024);
+    ptr2->set_num_segments(30);
+    ptr2->set_segments_overlap(OVERLAPPING);
+    rs_metas.push_back(ptr2);
+
+    // Another large rowset: 100MB, 20 segments (score=20)
+    RowsetMetaSharedPtr ptr3(new RowsetMeta());
+    init_rs_meta(ptr3, 3, 3);
+    ptr3->set_total_disk_size(100L * 1024 * 1024);
+    ptr3->set_num_segments(20);
+    ptr3->set_segments_overlap(OVERLAPPING);
+    rs_metas.push_back(ptr3);
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    // All rowsets removed by level_size, score=50 < max=100
+    // Does not trigger fallback, goes to normal flow with empty result
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 5, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    // After level_size removes all, result is empty (no fallback since score 
< max)
+    EXPECT_EQ(0, input_rowsets.size());
+    EXPECT_EQ(0, compaction_score);
+}
+
+// Test case: level_size removes large head, then trim after
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_trim_after_level_size) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // Large rowset: 35MB, 50 segments (will be removed by level_size)
+    RowsetMetaSharedPtr ptr2(new RowsetMeta());
+    init_rs_meta(ptr2, 2, 2);
+    ptr2->set_total_disk_size(35L * 1024 * 1024);
+    ptr2->set_num_segments(50);
+    ptr2->set_segments_overlap(OVERLAPPING);
+    rs_metas.push_back(ptr2);
+
+    // 150 small rowsets (score=1 each)
+    for (int i = 3; i <= 152; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(100 * 1024); // 100KB
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    // Total score = 50 + 150 = 200
+    // After level_size removes large: score = 150 > max = 100
+    // Should trim to 100
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 5, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    EXPECT_EQ(100, input_rowsets.size());
+    EXPECT_EQ(100, compaction_score);
+    // First rowset should be version 3 (large one removed)
+    EXPECT_EQ(3, input_rowsets.front()->start_version());
+}
+
+// Test case: Size exactly at min_size
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_size_exactly_min_size) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // Rowsets with total size = 64MB (exactly min_size)
+    // 64 rowsets, each 1MB
+    for (int i = 2; i <= 65; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(1 * 1024 * 1024); // 1MB
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    // size = 64MB = min_size, score = 64 > min = 50
+    // Should pass min check
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 50, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    EXPECT_EQ(64, input_rowsets.size());
+    EXPECT_EQ(64, compaction_score);
+}
+
+// Test case: min=0 configuration
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_zero_min_score) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // 3 small rowsets
+    for (int i = 2; i <= 4; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(100 * 1024); // 100KB, size < min_size
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    // min=0, score=3 >= 0, but size < min_size
+    // When min=0, always pass score check, but still need size check
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 0, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    // score >= min (0), pass min check even if size < min_size
+    EXPECT_EQ(3, input_rowsets.size());
+    EXPECT_EQ(3, compaction_score);
+}
+
+// Test case: All large rowsets with similar size (should keep all)
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_keep_all_large) {
+    config::compaction_promotion_ratio = 1.0;
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // 3 large rowsets with similar size (100MB each)
+    for (int i = 2; i <= 4; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(100L * 1024 * 1024); // 100MB each
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 50, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    // All 3 large rowsets should be kept (similar level_size)
+    // total_size = 300MB, level(300MB) = 256MB
+    // level(100MB) = 64MB, level(200MB) = 128MB, 64MB <= 128MB, so keep all
+    EXPECT_EQ(3, input_rowsets.size());
+    EXPECT_EQ(3, compaction_score);
+}
+
+// Test case: Size exactly one below min_size
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_size_one_below_min) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // Small rowsets with total size = min_size - 1 byte
+    // min_size = 64MB = 67108864 bytes
+    // Create 10 rowsets, each about 6.7MB, total = 67108863 bytes (1 byte 
less than min_size)
+    int64_t target_total = 64L * 1024 * 1024 - 1; // 64MB - 1 byte
+    int64_t per_rowset = target_total / 10;
+    int64_t remainder = target_total % 10;
+
+    for (int i = 2; i <= 11; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        int64_t size = per_rowset;
+        if (i == 11) size += remainder; // Add remainder to last rowset
+        ptr->set_total_disk_size(size);
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    // score=10 < min=50, size < min_size, should be cleared
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 50, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    EXPECT_EQ(0, input_rowsets.size());
+    EXPECT_EQ(0, compaction_score);
+}
+
+// Test case: min greater than max (abnormal configuration)
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_min_greater_than_max) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // 150 small rowsets, score=150
+    for (int i = 2; i <= 151; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(1L * 1024 * 1024); // 1MB each
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    // Abnormal config: min=200, max=100
+    // After trim, score=100, but min check happens before trim
+    // So if score >= min before trim, it will pass
+    // Here score=150 < 200, will be cleared by min check
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 200, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    // score=150 < min=200, and size=150MB >= min_size=64MB
+    // Size check passes, so min score check is bypassed
+    // Result: 100 rowsets after trim
+    EXPECT_EQ(100, input_rowsets.size());
+    EXPECT_EQ(100, compaction_score);
+}
+
+// Test case: Multiple delete versions
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_multiple_delete_versions) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // 5 normal rowsets
+    for (int i = 2; i <= 6; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(20L * 1024 * 1024); // 20MB each
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    // First delete version at version 7
+    RowsetMetaSharedPtr ptr_del1(new RowsetMeta());
+    init_rs_meta(ptr_del1, 7, 7);
+    ptr_del1->set_total_disk_size(1024);
+    ptr_del1->set_num_segments(0);
+    ptr_del1->set_segments_overlap(NONOVERLAPPING);
+    DeletePredicatePB del_pred1;
+    del_pred1.set_version(7);
+    ptr_del1->set_delete_predicate(del_pred1);
+    rs_metas.push_back(ptr_del1);
+
+    // 3 normal rowsets
+    for (int i = 8; i <= 10; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(20L * 1024 * 1024);
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    // Second delete version at version 11
+    RowsetMetaSharedPtr ptr_del2(new RowsetMeta());
+    init_rs_meta(ptr_del2, 11, 11);
+    ptr_del2->set_total_disk_size(1024);
+    ptr_del2->set_num_segments(0);
+    ptr_del2->set_segments_overlap(NONOVERLAPPING);
+    DeletePredicatePB del_pred2;
+    del_pred2.set_version(11);
+    ptr_del2->set_delete_predicate(del_pred2);
+    rs_metas.push_back(ptr_del2);
+
+    // 3 more normal rowsets
+    for (int i = 12; i <= 14; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(20L * 1024 * 1024);
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    // Should only select rowsets before first delete version
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 3, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    // Only first 5 rowsets before delete version 7
+    EXPECT_EQ(5, input_rowsets.size());
+    EXPECT_EQ(5, compaction_score);
+    EXPECT_EQ(7, last_delete_version.first);
+}
+
+// Test case: Level size exactly half
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_level_size_exactly_half) {
+    config::compaction_promotion_ratio = 1.0;
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // Two rowsets with exactly same size (64MB each)
+    // total = 128MB, level(128MB) = 128MB
+    // level(64MB) = 64MB, level(64MB) = 64MB
+    // 64MB <= 64MB, so keep both
+    for (int i = 2; i <= 3; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(64L * 1024 * 1024); // 64MB each
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 2, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    // Both rowsets should be kept (same level)
+    EXPECT_EQ(2, input_rowsets.size());
+    EXPECT_EQ(2, compaction_score);
+}
+
+// Test case: Tiny rowsets (< 1KB)
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_tiny_rowsets) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // 100 tiny rowsets (500 bytes each)
+    for (int i = 2; i <= 101; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(500); // 500 bytes
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 50, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    // All tiny rowsets have level_size = 0, so all should be kept
+    // But trim from back due to max_compaction_score
+    // total_size = 50KB < min_size, score = 100 >= min, size < min_size -> 
cleared
+    // Wait, score >= min OR size >= min_size to pass
+    // Here score = 100 >= 50, so should pass min check
+    // Then trim to 100
+    EXPECT_EQ(100, input_rowsets.size());
+    EXPECT_EQ(100, compaction_score);
+}
+
+// Test case: Trim result below min (min check happens before trim)
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_trim_result_below_min) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // 120 rowsets, each 1MB, score=1
+    // total score = 120, after trim to max=100, score = 100
+    // min = 90, after trim score = 100 >= 90
+    // But test the case where trim could bring it below min conceptually
+    for (int i = 2; i <= 121; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(1L * 1024 * 1024); // 1MB
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    // min=90, max=100, initial score=120
+    // After trim, score=100 >= 90, still passes
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 90, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    EXPECT_EQ(100, input_rowsets.size());
+    EXPECT_EQ(100, compaction_score);
+}
+
+// Test case: Single rowset with score exceeding max (fallback allows it)
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_single_rowset_exceeds_max) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // Single large rowset with very high score (150 segments)
+    RowsetMetaSharedPtr ptr2(new RowsetMeta());
+    init_rs_meta(ptr2, 2, 2);
+    ptr2->set_total_disk_size(200L * 1024 * 1024); // 200MB
+    ptr2->set_num_segments(150);                   // score = 150
+    ptr2->set_segments_overlap(OVERLAPPING);
+    rs_metas.push_back(ptr2);
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    // max=100, but single rowset score=150
+    // Due to size() > 1 protection in DEFER, won't be trimmed
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 50, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    // Single rowset kept, score = 150 (exceeds max, but allowed due to size 
protection)
+    EXPECT_EQ(1, input_rowsets.size());
+    EXPECT_EQ(150, compaction_score);
+}
+
+// Large rowsets removed by level_size, small ones remain and get trimmed
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_fallback_no_overlapping) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    // Base rowset: 1GB
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // 3 large rowsets (40MB, 15MB, 5MB) removed by level_size + 100 small 
ones remain
+    RowsetMetaSharedPtr ptr2(new RowsetMeta());
+    init_rs_meta(ptr2, 2, 2);
+    ptr2->set_total_disk_size(40L * 1024 * 1024);
+    ptr2->set_num_segments(1);
+    ptr2->set_segments_overlap(OVERLAPPING);
+    rs_metas.push_back(ptr2);
+
+    RowsetMetaSharedPtr ptr3(new RowsetMeta());
+    init_rs_meta(ptr3, 3, 3);
+    ptr3->set_total_disk_size(15L * 1024 * 1024);
+    ptr3->set_num_segments(1);
+    ptr3->set_segments_overlap(OVERLAPPING);
+    rs_metas.push_back(ptr3);
+
+    RowsetMetaSharedPtr ptr4(new RowsetMeta());
+    init_rs_meta(ptr4, 4, 4);
+    ptr4->set_total_disk_size(5L * 1024 * 1024);
+    ptr4->set_num_segments(1);
+    ptr4->set_segments_overlap(OVERLAPPING);
+    rs_metas.push_back(ptr4);
+
+    for (int i = 5; i <= 104; i++) {
+        RowsetMetaSharedPtr ptr(new RowsetMeta());
+        init_rs_meta(ptr, i, i);
+        ptr->set_total_disk_size(10 * 1024);
+        ptr->set_num_segments(1);
+        ptr->set_segments_overlap(OVERLAPPING);
+        rs_metas.push_back(ptr);
+    }
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 100, 5, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    EXPECT_EQ(100, input_rowsets.size());
+    EXPECT_EQ(100, compaction_score);
+    EXPECT_EQ(5, input_rowsets.front()->start_version());
+}
+
+// Fallback: all rowsets removed by level_size, score >= max, no high-score 
rowset found
+// -> returns all, DEFER trims to max
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_fallback_no_high_score_rowset) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+
+    RowsetMetaSharedPtr ptr1(new RowsetMeta());
+    init_rs_meta(ptr1, 0, 1);
+    ptr1->set_total_disk_size(1024L * 1024 * 1024);
+    ptr1->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr1);
+
+    // 4 NONOVERLAPPING rowsets (score=1 each), all removed by level_size
+    // max=3, score=4 >= max triggers fallback, no score > 1 found
+    RowsetMetaSharedPtr ptr2(new RowsetMeta());
+    init_rs_meta(ptr2, 2, 2);
+    ptr2->set_total_disk_size(40L * 1024 * 1024);
+    ptr2->set_num_segments(1);
+    ptr2->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr2);
+
+    RowsetMetaSharedPtr ptr3(new RowsetMeta());
+    init_rs_meta(ptr3, 3, 3);
+    ptr3->set_total_disk_size(15L * 1024 * 1024);
+    ptr3->set_num_segments(1);
+    ptr3->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr3);
+
+    RowsetMetaSharedPtr ptr4(new RowsetMeta());
+    init_rs_meta(ptr4, 4, 4);
+    ptr4->set_total_disk_size(5L * 1024 * 1024);
+    ptr4->set_num_segments(1);
+    ptr4->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr4);
+
+    RowsetMetaSharedPtr ptr5(new RowsetMeta());
+    init_rs_meta(ptr5, 5, 5);
+    ptr5->set_total_disk_size(1L * 1024 * 1024);
+    ptr5->set_num_segments(1);
+    ptr5->set_segments_overlap(NONOVERLAPPING);
+    rs_metas.push_back(ptr5);
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 3, 1, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    EXPECT_EQ(3, input_rowsets.size());
+    EXPECT_EQ(3, compaction_score);
+}
+
 } // namespace doris
 
 // @brief Test Stub


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to