This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d138b523f8d [improvement](create tablet) backend create tablet round robin among … (#29818) d138b523f8d is described below commit d138b523f8da74ce710d5dec3c6beb4b33670f13 Author: deardeng <565620...@qq.com> AuthorDate: Thu Jan 18 14:46:55 2024 +0800 [improvement](create tablet) backend create tablet round robin among … (#29818) --- be/src/agent/task_worker_pool.cpp | 2 +- be/src/common/config.cpp | 3 + be/src/common/config.h | 3 + be/src/olap/storage_engine.cpp | 135 ++++++++++++----- be/src/olap/storage_engine.h | 89 +++++++---- be/src/olap/task/engine_clone_task.cpp | 2 +- be/src/runtime/memory/cache_policy.h | 3 + .../test_partition_create_tablet_rr.groovy | 162 +++++++++++++++++++++ 8 files changed, 331 insertions(+), 68 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index f40fe73758f..7d11b6c75e0 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -257,7 +257,7 @@ Status check_migrate_request(StorageEngine& engine, const TStorageMediumMigrateR storage_medium); } // get a random store of specified storage medium - auto stores = engine.get_stores_for_create_tablet(storage_medium); + auto stores = engine.get_stores_for_create_tablet(tablet->partition_id(), storage_medium); if (stores.empty()) { return Status::InternalError("failed to get root path for create tablet"); } diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 8b7edb86065..04e537cd324 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1166,6 +1166,9 @@ DEFINE_mInt32(report_query_statistics_interval_ms, "3000"); // 30s DEFINE_mInt32(query_statistics_reserve_timeout_ms, "30000"); +// create tablet in partition random robin idx lru size, default 10000 +DEFINE_Int32(partition_disk_index_lru_size, "10000"); + // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index 6d5b54a270a..661192c1178 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1242,6 +1242,9 @@ DECLARE_Int32(ignore_invalid_partition_id_rowset_num); DECLARE_mInt32(report_query_statistics_interval_ms); DECLARE_mInt32(query_statistics_reserve_timeout_ms); +// create tablet in partition random robin idx lru size, default 10000 +DECLARE_Int32(partition_disk_index_lru_size); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index be987de3271..82ab02ea39e 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -18,6 +18,7 @@ #include "olap/storage_engine.h" // IWYU pragma: no_include <bthread/errno.h> +#include <assert.h> #include <errno.h> // IWYU pragma: keep #include <fmt/format.h> #include <gen_cpp/AgentService_types.h> @@ -95,7 +96,8 @@ using std::vector; namespace doris { using namespace ErrorCode; - +extern void get_round_robin_stores(int64 curr_index, const std::vector<DirInfo>& dir_infos, + std::vector<DataDir*>& stores); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(unused_rowsets_count, MetricUnit::ROWSETS); static Status _validate_options(const EngineOptions& options) { @@ -129,7 +131,10 @@ StorageEngine::StorageEngine(const EngineOptions& options) _memtable_flush_executor(nullptr), _calc_delete_bitmap_executor(nullptr), _default_rowset_type(BETA_ROWSET), - _stream_load_recorder(nullptr) { + _heartbeat_flags(nullptr), + _stream_load_recorder(nullptr), + _create_tablet_idx_lru_cache( + new CreateTabletIdxCache(config::partition_disk_index_lru_size)) { REGISTER_HOOK_METRIC(unused_rowsets_count, [this]() { // std::lock_guard<std::mutex> lock(_gc_mutex); return _unused_rowsets.size(); @@ -430,46 +435,80 @@ Status StorageEngine::set_cluster_id(int32_t cluster_id) { return Status::OK(); } +StorageEngine::DiskRemainingLevel get_available_level(double disk_usage_percent) { + assert(disk_usage_percent <= 1); + if (disk_usage_percent < 0.7) { + return StorageEngine::DiskRemainingLevel::LOW; + } else if (disk_usage_percent < 0.85) { + return StorageEngine::DiskRemainingLevel::MID; + } + return StorageEngine::DiskRemainingLevel::HIGH; +} + +int StorageEngine::_get_and_set_next_disk_index(int64 partition_id, + TStorageMedium::type storage_medium) { + auto key = CreateTabletIdxCache::get_key(partition_id, storage_medium); + int curr_index = _create_tablet_idx_lru_cache->get_index(key); + // -1, lru can't find key + if (curr_index == -1) { + curr_index = std::max(0, _last_use_index[storage_medium] + 1); + } + _last_use_index[storage_medium] = curr_index; + _create_tablet_idx_lru_cache->set_index(key, std::max(0, curr_index + 1)); + return curr_index; +} + +void StorageEngine::_get_candidate_stores(TStorageMedium::type storage_medium, + std::vector<DirInfo>& dir_infos) { + for (auto& it : _store_map) { + DataDir* data_dir = it.second.get(); + if (data_dir->is_used()) { + if ((_available_storage_medium_type_count == 1 || + data_dir->storage_medium() == storage_medium) && + !data_dir->reach_capacity_limit(0)) { + DirInfo dir_info; + dir_info.data_dir = data_dir; + dir_info.available_level = get_available_level(data_dir->get_usage(0)); + dir_infos.push_back(dir_info); + } + } + } +} + std::vector<DataDir*> StorageEngine::get_stores_for_create_tablet( - TStorageMedium::type storage_medium) { + int64 partition_id, TStorageMedium::type storage_medium) { + std::vector<DirInfo> dir_infos; + int curr_index = 0; std::vector<DataDir*> stores; { std::lock_guard<std::mutex> l(_store_lock); - for (auto&& [_, store] : _store_map) { - if (store->is_used()) { - if ((_available_storage_medium_type_count == 1 || - store->storage_medium() == storage_medium) && - !store->reach_capacity_limit(0)) { - stores.push_back(store.get()); - } - } - } + curr_index = _get_and_set_next_disk_index(partition_id, storage_medium); + _get_candidate_stores(storage_medium, dir_infos); } - std::sort(stores.begin(), stores.end(), - [](DataDir* a, DataDir* b) { return a->get_usage(0) < b->get_usage(0); }); + std::sort(dir_infos.begin(), dir_infos.end()); + get_round_robin_stores(curr_index, dir_infos, stores); - size_t seventy_percent_index = stores.size(); - size_t eighty_five_percent_index = stores.size(); - for (size_t index = 0; index < stores.size(); index++) { - // If the usage of the store is less than 70%, we choose disk randomly. - if (stores[index]->get_usage(0) > 0.7 && seventy_percent_index == stores.size()) { - seventy_percent_index = index; + return stores; +} + +// maintain in stores LOW,MID,HIGH level round robin +void get_round_robin_stores(int64 curr_index, const std::vector<DirInfo>& dir_infos, + std::vector<DataDir*>& stores) { + for (size_t i = 0; i < dir_infos.size();) { + size_t end = i + 1; + while (end < dir_infos.size() && + dir_infos[i].available_level == dir_infos[end].available_level) { + end++; } - if (stores[index]->get_usage(0) > 0.85 && eighty_five_percent_index == stores.size()) { - eighty_five_percent_index = index; - break; + // data dirs [i, end) have the same tablet size, round robin range [i, end) + size_t count = end - i; + for (size_t k = 0; k < count; k++) { + size_t index = i + (k + curr_index) % count; + stores.push_back(dir_infos[index].data_dir); } + i = end; } - - std::random_device rd; - std::mt19937 g(rd()); - std::shuffle(stores.begin(), stores.begin() + seventy_percent_index, g); - std::shuffle(stores.begin() + seventy_percent_index, stores.begin() + eighty_five_percent_index, - g); - std::shuffle(stores.begin() + eighty_five_percent_index, stores.end(), g); - - return stores; } DataDir* StorageEngine::get_store(const std::string& path) { @@ -1034,7 +1073,7 @@ Status StorageEngine::create_tablet(const TCreateTabletReq& request, RuntimeProf std::vector<DataDir*> stores; { SCOPED_TIMER(ADD_TIMER(profile, "GetStores")); - stores = get_stores_for_create_tablet(request.storage_medium); + stores = get_stores_for_create_tablet(request.partition_id, request.storage_medium); } if (stores.empty()) { return Status::Error<CE_CMD_PARAMS_ERROR>( @@ -1044,7 +1083,8 @@ Status StorageEngine::create_tablet(const TCreateTabletReq& request, RuntimeProf } Status StorageEngine::obtain_shard_path(TStorageMedium::type storage_medium, int64_t path_hash, - std::string* shard_path, DataDir** store) { + std::string* shard_path, DataDir** store, + int64_t partition_id) { LOG(INFO) << "begin to process obtain root path. storage_medium=" << storage_medium; if (shard_path == nullptr) { @@ -1052,7 +1092,7 @@ Status StorageEngine::obtain_shard_path(TStorageMedium::type storage_medium, int "invalid output parameter which is null pointer."); } - auto stores = get_stores_for_create_tablet(storage_medium); + auto stores = get_stores_for_create_tablet(partition_id, storage_medium); if (stores.empty()) { return Status::Error<NO_AVAILABLE_ROOT_PATH>( "no available disk can be used to create tablet."); @@ -1343,4 +1383,29 @@ void StorageEngine::_decrease_low_priority_task_nums(DataDir* dir) { } } +int CreateTabletIdxCache::get_index(const std::string& key) { + auto lru_handle = cache()->lookup(key); + if (lru_handle) { + Defer release([cache = cache(), lru_handle] { cache->release(lru_handle); }); + auto value = (CacheValue*)cache()->value(lru_handle); + value->last_visit_time = UnixMillis(); + VLOG_DEBUG << "use create tablet idx cache key=" << key << " value=" << value->idx; + return value->idx; + } + return -1; +} + +void CreateTabletIdxCache::set_index(const std::string& key, int next_idx) { + assert(next_idx >= 0); + CacheValue* value = new CacheValue; + value->last_visit_time = UnixMillis(); + value->idx = next_idx; + auto deleter = [](const doris::CacheKey& key, void* value) { + CacheValue* cache_value = (CacheValue*)value; + delete cache_value; + }; + auto lru_handle = cache()->insert(key, value, 1, deleter, CachePriority::NORMAL, sizeof(int)); + cache()->release(lru_handle); +} + } // namespace doris diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index eca5212b5aa..002d7d67159 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -70,6 +70,8 @@ class Thread; class ThreadPool; class TxnManager; class ReportWorker; +class CreateTabletIdxCache; +struct DirInfo; using SegCompactionCandidates = std::vector<segment_v2::SegmentSharedPtr>; using SegCompactionCandidatesSharedPtr = std::shared_ptr<SegCompactionCandidates>; @@ -83,6 +85,8 @@ public: StorageEngine(const EngineOptions& options); ~StorageEngine(); + enum class DiskRemainingLevel { LOW, MID, HIGH }; + [[nodiscard]] Status open(); static StorageEngine* instance() { return ExecEnv::GetInstance()->get_storage_engine(); } @@ -104,9 +108,11 @@ public: int64_t get_file_or_directory_size(const std::string& file_path); - // get root path for creating tablet. The returned vector of root path should be random, + // get root path for creating tablet. The returned vector of root path should be round robin, // for avoiding that all the tablet would be deployed one disk. - std::vector<DataDir*> get_stores_for_create_tablet(TStorageMedium::type storage_medium); + std::vector<DataDir*> get_stores_for_create_tablet(int64 partition_id, + TStorageMedium::type storage_medium); + DataDir* get_store(const std::string& path); uint32_t available_storage_medium_type_count() const { @@ -124,7 +130,7 @@ public: // @param [out] shard_path choose an available root_path to clone new tablet // @return error code Status obtain_shard_path(TStorageMedium::type storage_medium, int64_t path_hash, - std::string* shared_path, DataDir** store); + std::string* shared_path, DataDir** store, int64_t partition_id); // Load new tablet to make it effective. // @@ -328,36 +334,12 @@ private: void _decrease_low_priority_task_nums(DataDir* dir); -private: - struct CompactionCandidate { - CompactionCandidate(uint32_t nicumulative_compaction_, int64_t tablet_id_, uint32_t index_) - : nice(nicumulative_compaction_), tablet_id(tablet_id_), disk_index(index_) {} - uint32_t nice; // priority - int64_t tablet_id; - uint32_t disk_index = -1; - }; - - // In descending order - struct CompactionCandidateComparator { - bool operator()(const CompactionCandidate& a, const CompactionCandidate& b) { - return a.nice > b.nice; - } - }; + void _get_candidate_stores(TStorageMedium::type storage_medium, + std::vector<DirInfo>& dir_infos); - struct CompactionDiskStat { - CompactionDiskStat(std::string path, uint32_t index, bool used) - : storage_path(path), - disk_index(index), - task_running(0), - task_remaining(0), - is_used(used) {} - const std::string storage_path; - const uint32_t disk_index; - uint32_t task_running; - uint32_t task_remaining; - bool is_used; - }; + int _get_and_set_next_disk_index(int64 partition_id, TStorageMedium::type storage_medium); +private: EngineOptions _options; std::mutex _store_lock; std::mutex _trash_sweep_lock; @@ -488,6 +470,51 @@ private: bool _clear_segment_cache = false; std::atomic<bool> _need_clean_trash {false}; + + // next index for create tablet + std::map<TStorageMedium::type, int> _last_use_index; + + std::unique_ptr<CreateTabletIdxCache> _create_tablet_idx_lru_cache; + + DISALLOW_COPY_AND_ASSIGN(StorageEngine); +}; + +// lru cache for create tabelt round robin in disks +// key: partitionId_medium +// value: index +class CreateTabletIdxCache : public LRUCachePolicy { +public: + // get key, delimiter with DELIMITER '-' + static std::string get_key(int64_t partition_id, TStorageMedium::type medium) { + return fmt::format("{}-{}", partition_id, medium); + } + + // -1 not found key in lru + int get_index(const std::string& key); + + void set_index(const std::string& key, int next_idx); + + struct CacheValue : public LRUCacheValueBase { + int idx = 0; + }; + + CreateTabletIdxCache(size_t capacity) + : LRUCachePolicy(CachePolicy::CacheType::CREATE_TABLET_RR_IDX_CACHE, capacity, + LRUCacheType::NUMBER, + /*stale_sweep_time_s*/ 30 * 60) {} +}; + +struct DirInfo { + DataDir* data_dir; + + StorageEngine::DiskRemainingLevel available_level; + + bool operator<(const DirInfo& other) const { + if (available_level != other.available_level) { + return available_level < other.available_level; + } + return data_dir->path_hash() < other.data_dir->path_hash(); + } }; } // namespace doris diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index 75509bb2635..d8c7a54cb74 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -254,7 +254,7 @@ Status EngineCloneTask::_do_clone() { DataDir* store = nullptr; RETURN_IF_ERROR(StorageEngine::instance()->obtain_shard_path( _clone_req.storage_medium, _clone_req.dest_path_hash, &local_shard_root_path, - &store)); + &store, _clone_req.partition_id)); auto tablet_dir = fmt::format("{}/{}/{}", local_shard_root_path, _clone_req.tablet_id, _clone_req.schema_hash); diff --git a/be/src/runtime/memory/cache_policy.h b/be/src/runtime/memory/cache_policy.h index e965802ed2b..9a9f2c36e84 100644 --- a/be/src/runtime/memory/cache_policy.h +++ b/be/src/runtime/memory/cache_policy.h @@ -42,6 +42,7 @@ public: COMMON_OBJ_LRU_CACHE = 12, FOR_UT = 13, TABLET_SCHEMA_CACHE = 14, + CREATE_TABLET_RR_IDX_CACHE = 15 }; static std::string type_string(CacheType type) { @@ -76,6 +77,8 @@ public: return "ForUT"; case CacheType::TABLET_SCHEMA_CACHE: return "TabletSchemaCache"; + case CacheType::CREATE_TABLET_RR_IDX_CACHE: + return "CreateTabletRRIdxCache"; default: LOG(FATAL) << "not match type of cache policy :" << static_cast<int>(type); } diff --git a/regression-test/suites/partition_p0/test_partition_create_tablet_rr.groovy b/regression-test/suites/partition_p0/test_partition_create_tablet_rr.groovy new file mode 100644 index 00000000000..f7e77f06f38 --- /dev/null +++ b/regression-test/suites/partition_p0/test_partition_create_tablet_rr.groovy @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.NodeType +import org.apache.doris.regression.suite.SuiteCluster + +suite("test_partition_create_tablet_rr") { + def options = new ClusterOptions() + options.beNum = 1 + options.feConfigs.add('disable_balance=true') + def partition_disk_index_lru_size = 50 + options.beConfigs += [ + 'report_tablet_interval_seconds=1', + 'report_disk_state_interval_seconds=1', + "partition_disk_index_lru_size=$partition_disk_index_lru_size" + ] + options.beDisks = ['HDD=4','SSD=4'] + options.enableDebugPoints() + + def checkTabletOnDiskTabletNumEq = {tbl -> + sleep 5000 + + def tablets = sql_return_maparray "SHOW TABLETS FROM $tbl" + def pathTabletNum = [:] + tablets.each { + def num = pathTabletNum.get(it.PathHash) + if (num) { + pathTabletNum.put(it.PathHash, ++num) + } else { + pathTabletNum.put(it.PathHash, 1) + } + } + + log.info("table ${tbl} tablet in path ${pathTabletNum.values()}") + def count = pathTabletNum.values().stream().distinct().count() + assertEquals(count, 1) + } + + docker(options) { + sleep 2000 + def single_hdd_tbl = "single_HDD_tbl" + def single_ssd_tbl = "single_SDD_tbl" + def single_partition_tbl = "single_partition_tbl" + sql """drop table if exists $single_hdd_tbl""" + sql """drop table if exists $single_ssd_tbl""" + sql """drop table if exists $single_partition_tbl""" + for (def j = 0; j < partition_disk_index_lru_size + 10; j++) { + def tbl = single_partition_tbl + j.toString() + sql """drop table if exists $tbl""" + } + try { + // 1. test single partition table + // a. create 1 table, has 100 buckets + // b. check disk's tablet num + + sql """ + CREATE TABLE $single_hdd_tbl ( + `k1` int(11) NULL, + `k2` int(11) NULL + ) DUPLICATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 12000 + PROPERTIES ( + "replication_num"="1", + "storage_medium" = "HDD" + ); + """ + + checkTabletOnDiskTabletNumEq single_hdd_tbl + + sql """ + CREATE TABLE $single_ssd_tbl ( + `k1` int(11) NULL, + `k2` int(11) NULL + ) DUPLICATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 12000 + PROPERTIES ( + "replication_num"="1", + "storage_medium" = "SSD" + ); + """ + checkTabletOnDiskTabletNumEq single_ssd_tbl + + sql """ + CREATE TABLE $single_partition_tbl + ( + k1 DATE, + k2 DECIMAL(10, 2) DEFAULT "10.5", + k3 CHAR(10) COMMENT "string column", + k4 INT NOT NULL DEFAULT "1" COMMENT "int column" + ) + DUPLICATE KEY(k1, k2) + PARTITION BY RANGE(k1) + ( + PARTITION p1 VALUES LESS THAN ("2020-02-01"), + PARTITION p2 VALUES LESS THAN ("2020-03-01"), + PARTITION p3 VALUES LESS THAN ("2020-04-01") + ) + DISTRIBUTED BY HASH(k1) BUCKETS 320 + PROPERTIES ( + "replication_num" = "1" + ); + """ + checkTabletOnDiskTabletNumEq single_partition_tbl + + // 2. test multi thread create single partition tables + // a. multi thread create partition_disk_index_lru_size + 10 table + // b. check disk's tablet num + def futures = [] + for (def i = 0; i < partition_disk_index_lru_size + 10; i++) { + def tblMulti = single_partition_tbl + i.toString() + futures.add(thread { + sql """ + CREATE TABLE $tblMulti + ( + k1 DATE, + k2 DECIMAL(10, 2) DEFAULT "10.5", + k3 CHAR(10) COMMENT "string column", + k4 INT NOT NULL DEFAULT "1" COMMENT "int column" + ) + DUPLICATE KEY(k1, k2) + PARTITION BY RANGE(k1) + ( + PARTITION p1 VALUES LESS THAN ("2020-02-01"), + PARTITION p2 VALUES LESS THAN ("2020-03-01"), + PARTITION p3 VALUES LESS THAN ("2020-04-01") + ) + DISTRIBUTED BY HASH(k1) BUCKETS 320 + PROPERTIES ( + "replication_num" = "1" + ); + """ + checkTabletOnDiskTabletNumEq tblMulti + }) + } + futures.each { it.get() } + } finally { + sql """drop table if exists $single_hdd_tbl""" + sql """drop table if exists $single_ssd_tbl""" + sql """drop table if exists $single_partition_tbl""" + for (def j = 0; j < partition_disk_index_lru_size + 10; j++) { + def tbl = single_partition_tbl + j.toString() + sql """drop table if exists $tbl""" + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org