This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new df26475e1a5 [Enhancement](compaction) enable the compaction producer 
to generate multiple compaction tasks in a single run (#45411) (#46160)
df26475e1a5 is described below

commit df26475e1a5bf71c4c6f8880d9dd482ff29d6dfe
Author: Luwei <lu...@selectdb.com>
AuthorDate: Tue Dec 31 09:51:43 2024 +0800

    [Enhancement](compaction) enable the compaction producer to generate 
multiple compaction tasks in a single run (#45411) (#46160)
    
    pick master #45411
---
 be/src/common/config.cpp         |   2 +
 be/src/common/config.h           |   2 +
 be/src/olap/olap_server.cpp      |  14 +--
 be/src/olap/tablet_manager.cpp   |  52 ++++++++--
 be/src/olap/tablet_manager.h     |   2 +-
 be/test/olap/tablet_mgr_test.cpp | 217 +++++++++++++++++++++++++++++++++++++++
 6 files changed, 276 insertions(+), 13 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index c9917afecb0..783d1d65922 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1349,6 +1349,8 @@ DEFINE_mBool(enable_pipeline_task_leakage_detect, 
"false");
 DEFINE_Bool(force_regenerate_rowsetid_on_start_error, "false");
 DEFINE_mBool(enable_sleep_between_delete_cumu_compaction, "false");
 
+DEFINE_mInt32(compaction_num_per_round, "1");
+
 // clang-format off
 #ifdef BE_TEST
 // test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 91b8f6bcb6d..303aecd2a23 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1419,6 +1419,8 @@ DECLARE_mInt32(lz4_compression_block_size);
 DECLARE_mBool(enable_pipeline_task_leakage_detect);
 DECLARE_Bool(force_regenerate_rowsetid_on_start_error);
 
+DECLARE_mInt32(compaction_num_per_round);
+
 // Enable sleep 5s between delete cumulative compaction.
 DECLARE_mBool(enable_sleep_between_delete_cumu_compaction);
 
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index f3fdd552164..71c8fb38681 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -919,11 +919,11 @@ std::vector<TabletSharedPtr> 
StorageEngine::_generate_compaction_tasks(
             continue;
         }
 
-        // Even if need_pick_tablet is false, we still need to call 
find_best_tablet_to_compaction(),
+        // Even if need_pick_tablet is false, we still need to call 
find_best_tablets_to_compaction(),
         // So that we can update the max_compaction_score metric.
         if (!data_dir->reach_capacity_limit(0)) {
             uint32_t disk_max_score = 0;
-            TabletSharedPtr tablet = 
_tablet_manager->find_best_tablet_to_compaction(
+            std::vector<TabletSharedPtr> tablets = 
_tablet_manager->find_best_tablets_to_compaction(
                     compaction_type, data_dir,
                     compaction_type == CompactionType::CUMULATIVE_COMPACTION
                             ? copied_cumu_map[data_dir]
@@ -932,11 +932,13 @@ std::vector<TabletSharedPtr> 
StorageEngine::_generate_compaction_tasks(
             int concurrent_num = get_concurrent_per_disk(disk_max_score, 
thread_per_disk);
             need_pick_tablet = need_generate_compaction_tasks(
                     count, concurrent_num, compaction_type, 
copied_cumu_map[data_dir].empty());
-            if (tablet != nullptr) {
-                if (need_pick_tablet) {
-                    tablets_compaction.emplace_back(tablet);
+            for (const auto& tablet : tablets) {
+                if (tablet != nullptr) {
+                    if (need_pick_tablet) {
+                        tablets_compaction.emplace_back(tablet);
+                    }
+                    max_compaction_score = std::max(max_compaction_score, 
disk_max_score);
                 }
-                max_compaction_score = std::max(max_compaction_score, 
disk_max_score);
             }
         }
     }
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 23d2b11fc45..7f9ab2aad4c 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -728,7 +728,12 @@ void TabletManager::get_tablet_stat(TTabletStatResult* 
result) {
     result->__set_tablet_stat_list(*local_cache);
 }
 
-TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
+struct TabletScore {
+    TabletSharedPtr tablet_ptr;
+    int score;
+};
+
+std::vector<TabletSharedPtr> TabletManager::find_best_tablets_to_compaction(
         CompactionType compaction_type, DataDir* data_dir,
         const std::unordered_set<TabletSharedPtr>& 
tablet_submitted_compaction, uint32_t* score,
         const std::unordered_map<std::string_view, 
std::shared_ptr<CumulativeCompactionPolicy>>&
@@ -739,6 +744,9 @@ TabletSharedPtr 
TabletManager::find_best_tablet_to_compaction(
     uint32_t highest_score = 0;
     uint32_t compaction_score = 0;
     TabletSharedPtr best_tablet;
+    auto cmp = [](TabletScore left, TabletScore right) { return left.score > 
right.score; };
+    std::priority_queue<TabletScore, std::vector<TabletScore>, decltype(cmp)> 
top_tablets(cmp);
+
     auto handler = [&](const TabletSharedPtr& tablet_ptr) {
         if 
(tablet_ptr->tablet_meta()->tablet_schema()->disable_auto_compaction()) {
             LOG_EVERY_N(INFO, 500) << "Tablet " << tablet_ptr->tablet_id()
@@ -794,23 +802,55 @@ TabletSharedPtr 
TabletManager::find_best_tablet_to_compaction(
         if (current_compaction_score < 5) {
             tablet_ptr->set_skip_compaction(true, compaction_type, 
UnixSeconds());
         }
-        if (current_compaction_score > highest_score) {
-            highest_score = current_compaction_score;
-            compaction_score = current_compaction_score;
-            best_tablet = tablet_ptr;
+
+        if (config::compaction_num_per_round > 1) {
+            TabletScore ts;
+            ts.score = current_compaction_score;
+            ts.tablet_ptr = tablet_ptr;
+            if ((top_tablets.size() >= config::compaction_num_per_round &&
+                 current_compaction_score > top_tablets.top().score) ||
+                top_tablets.size() < config::compaction_num_per_round) {
+                top_tablets.push(ts);
+                if (top_tablets.size() > config::compaction_num_per_round) {
+                    top_tablets.pop();
+                }
+                if (current_compaction_score > highest_score) {
+                    highest_score = current_compaction_score;
+                    compaction_score = current_compaction_score;
+                }
+            }
+        } else {
+            if (current_compaction_score > highest_score) {
+                highest_score = current_compaction_score;
+                compaction_score = current_compaction_score;
+                best_tablet = tablet_ptr;
+            }
         }
     };
 
     for_each_tablet(handler, filter_all_tablets);
+    std::vector<TabletSharedPtr> picked_tablet;
     if (best_tablet != nullptr) {
         VLOG_CRITICAL << "Found the best tablet for compaction. "
                       << "compaction_type=" << compaction_type_str
                       << ", tablet_id=" << best_tablet->tablet_id() << ", 
path=" << data_dir->path()
                       << ", compaction_score=" << compaction_score
                       << ", highest_score=" << highest_score;
+        picked_tablet.emplace_back(std::move(best_tablet));
         *score = compaction_score;
     }
-    return best_tablet;
+
+    std::vector<TabletSharedPtr> reverse_top_tablets;
+    while (!top_tablets.empty()) {
+        reverse_top_tablets.emplace_back(top_tablets.top().tablet_ptr);
+        top_tablets.pop();
+    }
+
+    for (auto it = reverse_top_tablets.rbegin(); it != 
reverse_top_tablets.rend(); ++it) {
+        picked_tablet.emplace_back(*it);
+    }
+
+    return picked_tablet;
 }
 
 Status TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId 
tablet_id,
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 5f6b8f31bd8..f4f3765df65 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -73,7 +73,7 @@ public:
     // If `is_drop_table_or_partition` is true, we need to remove all remote 
rowsets in this tablet.
     Status drop_tablet(TTabletId tablet_id, TReplicaId replica_id, bool 
is_drop_table_or_partition);
 
-    TabletSharedPtr find_best_tablet_to_compaction(
+    std::vector<TabletSharedPtr> find_best_tablets_to_compaction(
             CompactionType compaction_type, DataDir* data_dir,
             const std::unordered_set<TabletSharedPtr>& 
tablet_submitted_compaction, uint32_t* score,
             const std::unordered_map<std::string_view, 
std::shared_ptr<CumulativeCompactionPolicy>>&
diff --git a/be/test/olap/tablet_mgr_test.cpp b/be/test/olap/tablet_mgr_test.cpp
index af5bf285fc8..830ca135123 100644
--- a/be/test/olap/tablet_mgr_test.cpp
+++ b/be/test/olap/tablet_mgr_test.cpp
@@ -32,10 +32,13 @@
 #include "common/status.h"
 #include "gtest/gtest_pred_impl.h"
 #include "io/fs/local_file_system.h"
+#include "olap/cumulative_compaction_policy.h"
+#include "olap/cumulative_compaction_time_series_policy.h"
 #include "olap/data_dir.h"
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
 #include "olap/options.h"
+#include "olap/rowset/beta_rowset.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet.h"
 #include "olap/tablet_manager.h"
@@ -84,6 +87,7 @@ public:
         SAFE_DELETE(k_engine);
         ExecEnv::GetInstance()->set_storage_engine(nullptr);
         _tablet_mgr = nullptr;
+        config::compaction_num_per_round = 1;
     }
     StorageEngine* k_engine = nullptr;
 
@@ -341,4 +345,217 @@ TEST_F(TabletMgrTest, GetRowsetId) {
     }
 }
 
+TEST_F(TabletMgrTest, FindTabletWithCompact) {
+    auto create_tablet = [this](int64_t tablet_id, bool enable_single_compact, 
int rowset_size) {
+        std::vector<TColumn> cols;
+        TColumn col1;
+        col1.column_type.type = TPrimitiveType::SMALLINT;
+        col1.__set_column_name("col1");
+        col1.__set_is_key(true);
+        cols.push_back(col1);
+
+        TColumn col2;
+        col2.column_type.type = TPrimitiveType::INT;
+        col2.__set_column_name(SEQUENCE_COL);
+        col2.__set_is_key(false);
+        col2.__set_aggregation_type(TAggregationType::REPLACE);
+        cols.push_back(col2);
+
+        TColumn col3;
+        col3.column_type.type = TPrimitiveType::INT;
+        col3.__set_column_name("v1");
+        col3.__set_is_key(false);
+        col3.__set_aggregation_type(TAggregationType::REPLACE);
+        cols.push_back(col3);
+
+        RuntimeProfile profile("CreateTablet");
+        TTabletSchema tablet_schema;
+        tablet_schema.__set_short_key_column_count(1);
+        tablet_schema.__set_schema_hash(3333);
+        tablet_schema.__set_keys_type(TKeysType::UNIQUE_KEYS);
+        tablet_schema.__set_storage_type(TStorageType::COLUMN);
+        tablet_schema.__set_columns(cols);
+        tablet_schema.__set_sequence_col_idx(1);
+        
tablet_schema.__set_enable_single_replica_compaction(enable_single_compact);
+        TCreateTabletReq create_tablet_req;
+        create_tablet_req.__set_tablet_schema(tablet_schema);
+        create_tablet_req.__set_tablet_id(tablet_id);
+        create_tablet_req.__set_version(1);
+        create_tablet_req.__set_replica_id(tablet_id * 10);
+        std::vector<DataDir*> data_dirs;
+        data_dirs.push_back(_data_dir);
+        Status create_st = _tablet_mgr->create_tablet(create_tablet_req, 
data_dirs, &profile);
+        ASSERT_TRUE(create_st.ok()) << create_st;
+
+        TabletSharedPtr tablet = _tablet_mgr->get_tablet(tablet_id);
+        ASSERT_TRUE(tablet);
+        // check dir exist
+        bool dir_exist = false;
+        Status exist_st = 
io::global_local_filesystem()->exists(tablet->tablet_path(), &dir_exist);
+        ASSERT_TRUE(exist_st.ok()) << exist_st;
+        ASSERT_TRUE(dir_exist);
+        // check meta has this tablet
+        TabletMetaSharedPtr new_tablet_meta(new TabletMeta());
+        Status check_meta_st =
+                TabletMetaManager::get_meta(_data_dir, tablet_id, 3333, 
new_tablet_meta);
+        ASSERT_TRUE(check_meta_st.ok()) << check_meta_st;
+        // insert into rowset
+        auto create_rowset = [=, this](int64_t start, int64 end) {
+            auto rowset_meta = std::make_shared<RowsetMeta>();
+            Version version(start, end);
+            rowset_meta->set_version(version);
+            rowset_meta->set_tablet_id(tablet->tablet_id());
+            rowset_meta->set_tablet_uid(tablet->tablet_uid());
+            rowset_meta->set_rowset_id(k_engine->next_rowset_id());
+            return std::make_shared<BetaRowset>(tablet->tablet_schema(), 
tablet->tablet_path(),
+                                                std::move(rowset_meta));
+        };
+        auto st = tablet->init();
+        ASSERT_TRUE(st.ok()) << st;
+        for (int i = 2; i <= rowset_size; ++i) {
+            auto rs = create_rowset(i, i);
+            auto st = tablet->add_inc_rowset(rs);
+            ASSERT_TRUE(st.ok()) << st;
+        }
+    };
+
+    int rowset_size = 5;
+
+    // create 10 tablets
+    for (int64_t id = 1; id <= 10; ++id) {
+        create_tablet(id, false, rowset_size++);
+    }
+
+    std::unordered_set<TabletSharedPtr> cumu_set;
+    std::unordered_map<std::string_view, 
std::shared_ptr<CumulativeCompactionPolicy>>
+            cumulative_compaction_policies;
+    cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] =
+            
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
+                    CUMULATIVE_SIZE_BASED_POLICY);
+    cumulative_compaction_policies[CUMULATIVE_TIME_SERIES_POLICY] =
+            
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
+                    CUMULATIVE_TIME_SERIES_POLICY);
+    uint32_t score = 0;
+    auto compact_tablets = _tablet_mgr->find_best_tablets_to_compaction(
+            CompactionType::CUMULATIVE_COMPACTION, _data_dir, cumu_set, &score,
+            cumulative_compaction_policies);
+    ASSERT_EQ(compact_tablets.size(), 1);
+    ASSERT_EQ(compact_tablets[0]->tablet_id(), 10);
+    ASSERT_EQ(score, 13);
+
+    // create 10 tablets enable single compact
+    // 5 tablets do cumu compaction, 5 tablets do single compaction
+    // if BE_TEST is defined, tablet_id % 2 == 0 means that tablet needs to do 
single compact
+    for (int64_t id = 11; id <= 20; ++id) {
+        create_tablet(id, true, rowset_size++);
+    }
+
+    compact_tablets = _tablet_mgr->find_best_tablets_to_compaction(
+            CompactionType::CUMULATIVE_COMPACTION, _data_dir, cumu_set, &score,
+            cumulative_compaction_policies);
+    ASSERT_EQ(compact_tablets.size(), 1);
+    ASSERT_EQ(compact_tablets[0]->tablet_id(), 20);
+    ASSERT_EQ(score, 23);
+
+    create_tablet(21, false, rowset_size++);
+
+    compact_tablets = _tablet_mgr->find_best_tablets_to_compaction(
+            CompactionType::CUMULATIVE_COMPACTION, _data_dir, cumu_set, &score,
+            cumulative_compaction_policies);
+    ASSERT_EQ(compact_tablets.size(), 1);
+    ASSERT_EQ(compact_tablets[0]->tablet_id(), 21);
+    ASSERT_EQ(score, 24);
+
+    // drop all tablets
+    for (int64_t id = 1; id <= 21; ++id) {
+        Status drop_st = _tablet_mgr->drop_tablet(id, id * 10, false);
+        ASSERT_TRUE(drop_st.ok()) << drop_st;
+    }
+
+    {
+        config::compaction_num_per_round = 10;
+        for (int64_t i = 1; i <= 100; ++i) {
+            create_tablet(10000 + i, false, i);
+        }
+
+        compact_tablets = _tablet_mgr->find_best_tablets_to_compaction(
+                CompactionType::CUMULATIVE_COMPACTION, _data_dir, cumu_set, 
&score,
+                cumulative_compaction_policies);
+        ASSERT_EQ(compact_tablets.size(), 10);
+        int index = 0;
+        for (auto t : compact_tablets) {
+            ASSERT_EQ(t->tablet_id(), 10100 - index);
+            ASSERT_EQ(t->calc_compaction_score(
+                              CompactionType::CUMULATIVE_COMPACTION,
+                              
cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY]),
+                      99 - index);
+            index++;
+        }
+        config::compaction_num_per_round = 1;
+        // drop all tablets
+        for (int64_t id = 10001; id <= 10100; ++id) {
+            Status drop_st = _tablet_mgr->drop_tablet(id, id * 10, false);
+            ASSERT_TRUE(drop_st.ok()) << drop_st;
+        }
+    }
+
+    {
+        config::compaction_num_per_round = 10;
+        for (int64_t i = 1; i <= 100; ++i) {
+            create_tablet(20000 + i, false, i);
+        }
+
+        compact_tablets = _tablet_mgr->find_best_tablets_to_compaction(
+                CompactionType::CUMULATIVE_COMPACTION, _data_dir, cumu_set, 
&score,
+                cumulative_compaction_policies);
+        ASSERT_EQ(compact_tablets.size(), 10);
+        for (int i = 0; i < 10; ++i) {
+            ASSERT_EQ(compact_tablets[i]->tablet_id(), 20100 - i);
+            ASSERT_EQ(compact_tablets[i]->calc_compaction_score(
+                              CompactionType::CUMULATIVE_COMPACTION,
+                              
cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY]),
+                      99 - i);
+        }
+
+        config::compaction_num_per_round = 1;
+        // drop all tablets
+        for (int64_t id = 20001; id <= 20100; ++id) {
+            Status drop_st = _tablet_mgr->drop_tablet(id, id * 10, false);
+            ASSERT_TRUE(drop_st.ok()) << drop_st;
+        }
+
+        Status drop_st = _tablet_mgr->drop_tablet(20102, 20102 * 10, false);
+        ASSERT_TRUE(drop_st.ok()) << drop_st;
+    }
+
+    {
+        config::compaction_num_per_round = 10;
+        for (int64_t i = 1; i <= 5; ++i) {
+            create_tablet(30000 + i, false, i + 5);
+        }
+
+        compact_tablets = _tablet_mgr->find_best_tablets_to_compaction(
+                CompactionType::CUMULATIVE_COMPACTION, _data_dir, cumu_set, 
&score,
+                cumulative_compaction_policies);
+        ASSERT_EQ(compact_tablets.size(), 5);
+        for (int i = 0; i < 5; ++i) {
+            ASSERT_EQ(compact_tablets[i]->tablet_id(), 30000 + 5 - i);
+            ASSERT_EQ(compact_tablets[i]->calc_compaction_score(
+                              CompactionType::CUMULATIVE_COMPACTION,
+                              
cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY]),
+                      9 - i);
+        }
+
+        config::compaction_num_per_round = 1;
+        // drop all tablets
+        for (int64_t id = 30001; id <= 30005; ++id) {
+            Status drop_st = _tablet_mgr->drop_tablet(id, id * 10, false);
+            ASSERT_TRUE(drop_st.ok()) << drop_st;
+        }
+    }
+
+    Status trash_st = _tablet_mgr->start_trash_sweep();
+    ASSERT_TRUE(trash_st.ok()) << trash_st;
+}
+
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to