github-actions[bot] commented on code in PR #27726: URL: https://github.com/apache/doris/pull/27726#discussion_r1434468281
########## be/src/olap/wal_manager.cpp: ########## @@ -66,6 +72,37 @@ void WalManager::stop() { } Status WalManager::init() { + RETURN_IF_ERROR(_init_wal_dirs_conf()); + RETURN_IF_ERROR(_init_wal_dirs()); + RETURN_IF_ERROR(_init_wal_disk_info()); + return Thread::create( + "WalMgr", "replay_wal", [this]() { static_cast<void>(this->replay()); }, + &_replay_thread); +} + +Status WalManager::_init_wal_dirs_conf() { Review Comment: warning: method '_init_wal_dirs_conf' can be made static [readability-convert-member-functions-to-static] be/src/olap/wal_manager.h:101: ```diff - Status _init_wal_dirs_conf(); + static Status _init_wal_dirs_conf(); ``` ########## be/src/olap/wal_manager.cpp: ########## @@ -410,4 +469,106 @@ return Status::OK(); } +bool WalManager::is_wal_disk_space_enough() { + // if all disks space usage < 80% + std::shared_lock l(_wal_disk_info_lock); + for (const auto& it : _wal_disk_info_map) { + size_t limit = it.second->limit; + size_t available = it.second->available(); + if (available >= limit * 0.8) { + return false; + } + } + return true; +} + +const string& WalManager::get_min_disk_usage_wal_dir() { + std::shared_lock l(_wal_disk_info_lock); + return _wal_dirs.size() == 1 + ? _wal_dirs[0] + : *std::min_element(_wal_dirs.begin(), _wal_dirs.end(), + [this](const std::string& dir1, const std::string& dir2) { + return _wal_disk_info_map[dir1]->available() < + _wal_disk_info_map[dir2]->available(); + }); +} + +const string& WalManager::get_random_wal_dir() { + std::shared_lock l(_wal_disk_info_lock); + return _wal_disk_info_map.size() == 1 + ? _wal_disk_info_map.begin()->first + : std::next(_wal_disk_info_map.begin(), rand() % _wal_disk_info_map.size()) + ->first; +} + +size_t WalManager::get_max_available_size() { Review Comment: warning: method 'get_max_available_size' can be made static [readability-convert-member-functions-to-static] be/src/olap/wal_manager.h:97: ```diff - size_t get_max_available_size(); + static size_t get_max_available_size(); ``` ########## be/src/runtime/group_commit_mgr.cpp: ########## @@ -472,4 +484,52 @@ Status LoadBlockQueue::close_wal() { } return Status::OK(); } + +bool LoadBlockQueue::is_wal_disk_space_enough( Review Comment: warning: method 'is_wal_disk_space_enough' can be made static [readability-convert-member-functions-to-static] ```suggestion static bool LoadBlockQueue::is_wal_disk_space_enough( ``` ########## be/src/olap/wal_manager.cpp: ########## @@ -66,6 +72,37 @@ } Status WalManager::init() { + RETURN_IF_ERROR(_init_wal_dirs_conf()); + RETURN_IF_ERROR(_init_wal_dirs()); + RETURN_IF_ERROR(_init_wal_disk_info()); + return Thread::create( + "WalMgr", "replay_wal", [this]() { static_cast<void>(this->replay()); }, + &_replay_thread); +} + +Status WalManager::_init_wal_dirs_conf() { + std::vector<std::string> tmp_dirs; + if (_wal_dirs.empty()) { + // default case. + for (const StorePath& path : ExecEnv::GetInstance()->store_paths()) { + tmp_dirs.emplace_back(path.path + "/wal"); + } + } else { + // user config must be absolute path. + for (const std::string& wal_dir : _wal_dirs) { + if (std::filesystem::path(wal_dir).is_absolute()) { + tmp_dirs.emplace_back(wal_dir); + } else { + return Status::InternalError( + "BE config group_commit_replay_wal_dir has to be absolute path!"); + } + } + } + _wal_dirs = tmp_dirs; + return Status::OK(); +} + +Status WalManager::_init_wal_dirs() { Review Comment: warning: method '_init_wal_dirs' can be made static [readability-convert-member-functions-to-static] be/src/olap/wal_manager.h:102: ```diff - Status _init_wal_dirs(); + static Status _init_wal_dirs(); ``` ########## be/src/olap/wal_manager.cpp: ########## @@ -80,9 +117,41 @@ } RETURN_IF_ERROR(scan_wals(wal_dir)); } - return Thread::create( - "WalMgr", "replay_wal", [this]() { static_cast<void>(this->replay()); }, - &_replay_thread); + return Status::OK(); +} + +Status WalManager::_init_wal_disk_info() { Review Comment: warning: method '_init_wal_disk_info' can be made static [readability-convert-member-functions-to-static] be/src/olap/wal_manager.h:103: ```diff - Status _init_wal_disk_info(); + static Status _init_wal_disk_info(); ``` ########## be/src/olap/wal_manager.cpp: ########## @@ -410,4 +469,106 @@ return Status::OK(); } +bool WalManager::is_wal_disk_space_enough() { + // if all disks space usage < 80% + std::shared_lock l(_wal_disk_info_lock); + for (const auto& it : _wal_disk_info_map) { + size_t limit = it.second->limit; + size_t available = it.second->available(); + if (available >= limit * 0.8) { + return false; + } + } + return true; +} + +const string& WalManager::get_min_disk_usage_wal_dir() { + std::shared_lock l(_wal_disk_info_lock); + return _wal_dirs.size() == 1 + ? _wal_dirs[0] + : *std::min_element(_wal_dirs.begin(), _wal_dirs.end(), + [this](const std::string& dir1, const std::string& dir2) { + return _wal_disk_info_map[dir1]->available() < + _wal_disk_info_map[dir2]->available(); + }); +} + +const string& WalManager::get_random_wal_dir() { + std::shared_lock l(_wal_disk_info_lock); + return _wal_disk_info_map.size() == 1 + ? _wal_disk_info_map.begin()->first + : std::next(_wal_disk_info_map.begin(), rand() % _wal_disk_info_map.size()) + ->first; +} + +size_t WalManager::get_max_available_size() { + std::shared_lock l(_wal_disk_info_lock); + return _wal_disk_info_map.size() == 1 + ? _wal_disk_info_map.begin()->second->available() + : std::max_element(_wal_disk_info_map.begin(), _wal_disk_info_map.end(), + [](const auto& map1, const auto& map2) { + return map1.second->available() < + map2.second->available(); + }) + ->second->available(); +} + +std::shared_mutex& WalManager::lock_for_wal_disk_info() { + return _wal_disk_info_lock; +} + +Status WalManager::update_wal_disk_info_map(std::shared_mutex& wal_disk_info_lock, Review Comment: warning: method 'update_wal_disk_info_map' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status WalManager::update_wal_disk_info_map(std::shared_mutex& wal_disk_info_lock, ``` ########## be/src/olap/wal_manager.cpp: ########## @@ -410,4 +469,106 @@ return Status::OK(); } +bool WalManager::is_wal_disk_space_enough() { Review Comment: warning: method 'is_wal_disk_space_enough' can be made static [readability-convert-member-functions-to-static] be/src/olap/wal_manager.h:95: ```diff - bool is_wal_disk_space_enough(); + static bool is_wal_disk_space_enough(); ``` ########## be/src/olap/wal_manager.cpp: ########## @@ -410,4 +469,106 @@ return Status::OK(); } +bool WalManager::is_wal_disk_space_enough() { + // if all disks space usage < 80% + std::shared_lock l(_wal_disk_info_lock); + for (const auto& it : _wal_disk_info_map) { + size_t limit = it.second->limit; + size_t available = it.second->available(); + if (available >= limit * 0.8) { + return false; + } + } + return true; +} + +const string& WalManager::get_min_disk_usage_wal_dir() { + std::shared_lock l(_wal_disk_info_lock); + return _wal_dirs.size() == 1 + ? _wal_dirs[0] + : *std::min_element(_wal_dirs.begin(), _wal_dirs.end(), + [this](const std::string& dir1, const std::string& dir2) { + return _wal_disk_info_map[dir1]->available() < + _wal_disk_info_map[dir2]->available(); + }); +} + +const string& WalManager::get_random_wal_dir() { + std::shared_lock l(_wal_disk_info_lock); + return _wal_disk_info_map.size() == 1 + ? _wal_disk_info_map.begin()->first + : std::next(_wal_disk_info_map.begin(), rand() % _wal_disk_info_map.size()) + ->first; +} + +size_t WalManager::get_max_available_size() { + std::shared_lock l(_wal_disk_info_lock); + return _wal_disk_info_map.size() == 1 + ? _wal_disk_info_map.begin()->second->available() + : std::max_element(_wal_disk_info_map.begin(), _wal_disk_info_map.end(), + [](const auto& map1, const auto& map2) { + return map1.second->available() < + map2.second->available(); + }) + ->second->available(); +} + +std::shared_mutex& WalManager::lock_for_wal_disk_info() { + return _wal_disk_info_lock; +} + +Status WalManager::update_wal_disk_info_map(std::shared_mutex& wal_disk_info_lock, + std::string wal_dir, size_t limit, size_t used, + size_t pre_allocated) { + if (_wal_disk_info_map.find(wal_dir) != _wal_disk_info_map.end()) { + std::unique_lock l(wal_disk_info_lock); + if (limit != static_cast<size_t>(-1)) { + _wal_disk_info_map[wal_dir]->limit = limit; + } else { + size_t available_bytes; + size_t disk_capacity_bytes; + RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info( + wal_dir, &disk_capacity_bytes, &available_bytes)); + bool is_percent = true; + int64_t wal_disk_limit = ParseUtil::parse_mem_spec( + config::group_commit_wal_max_disk_limit, -1, available_bytes, &is_percent); + if (wal_disk_limit <= 0) { + return Status::InternalError("Disk full! Please check your disk usage!"); + } + int64_t wal_dir_size = 0; + RETURN_IF_ERROR(io::global_local_filesystem()->file_size(wal_dir, &wal_dir_size)); + wal_disk_limit += wal_dir_size; + _wal_disk_info_map[wal_dir]->limit = wal_disk_limit; + } + if (used != static_cast<size_t>(-1)) { + _wal_disk_info_map[wal_dir]->used = used; + } else { + int64_t wal_dir_size = 0; + RETURN_IF_ERROR(io::global_local_filesystem()->file_size(wal_dir, &wal_dir_size)); + _wal_disk_info_map[wal_dir]->used = wal_dir_size; + } + if (pre_allocated != static_cast<size_t>(-1)) { + _wal_disk_info_map[wal_dir]->pre_allocated = pre_allocated; + } + } else { + return Status::InternalError("Can not find wal dir in wal disk info map."); + } + return Status::OK(); +} + +Status WalManager::get_wal_disk_available_size(std::shared_mutex& wal_disk_info_lock, Review Comment: warning: method 'get_wal_disk_available_size' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status WalManager::get_wal_disk_available_size(std::shared_mutex& wal_disk_info_lock, ``` ########## be/src/runtime/group_commit_mgr.cpp: ########## @@ -472,4 +484,52 @@ } return Status::OK(); } + +bool LoadBlockQueue::is_wal_disk_space_enough( + const std::vector<std::shared_ptr<vectorized::Block>>& blocks, const TUniqueId& load_id) { + size_t blocks_size = 0; + for (auto block : blocks) { + blocks_size += block->bytes(); + } + size_t content_length = 0; + Status st = ExecEnv::GetInstance()->group_commit_mgr()->get_load_info(load_id, &content_length); + if (!st.ok()) { + LOG(WARNING) << "failed to get load info!"; + } + size_t pre_allocated = blocks_size > 0 ? blocks_size : content_length; + auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr(); + std::shared_mutex& wal_disk_info_lock = wal_mgr->lock_for_wal_disk_info(); + size_t available_bytes = 0; + st = wal_mgr->get_wal_disk_available_size(wal_disk_info_lock, wal_dir_to_disk_usage.first, + &available_bytes); + if (!st.ok()) { + LOG(WARNING) << "get wal disk available size filed!"; + } + if (pre_allocated < available_bytes) { + std::shared_mutex& l = wal_mgr->lock_for_wal_disk_info(); + RETURN_IF_ERROR(wal_mgr->update_wal_disk_info_map(l, wal_dir_to_disk_usage.first, -1, -1, + pre_allocated)); + return true; + } else { + return false; + } +} + +Status GroupCommitMgr::update_load_info(TUniqueId load_id, size_t content_length) { Review Comment: warning: method 'update_load_info' can be made static [readability-convert-member-functions-to-static] be/src/runtime/group_commit_mgr.h:157: ```diff - Status update_load_info(TUniqueId load_id, size_t content_length); + static Status update_load_info(TUniqueId load_id, size_t content_length); ``` ########## be/src/runtime/group_commit_mgr.cpp: ########## @@ -472,4 +484,52 @@ } return Status::OK(); } + +bool LoadBlockQueue::is_wal_disk_space_enough( + const std::vector<std::shared_ptr<vectorized::Block>>& blocks, const TUniqueId& load_id) { + size_t blocks_size = 0; + for (auto block : blocks) { + blocks_size += block->bytes(); + } + size_t content_length = 0; + Status st = ExecEnv::GetInstance()->group_commit_mgr()->get_load_info(load_id, &content_length); + if (!st.ok()) { + LOG(WARNING) << "failed to get load info!"; + } + size_t pre_allocated = blocks_size > 0 ? blocks_size : content_length; + auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr(); + std::shared_mutex& wal_disk_info_lock = wal_mgr->lock_for_wal_disk_info(); + size_t available_bytes = 0; + st = wal_mgr->get_wal_disk_available_size(wal_disk_info_lock, wal_dir_to_disk_usage.first, + &available_bytes); + if (!st.ok()) { + LOG(WARNING) << "get wal disk available size filed!"; + } + if (pre_allocated < available_bytes) { + std::shared_mutex& l = wal_mgr->lock_for_wal_disk_info(); + RETURN_IF_ERROR(wal_mgr->update_wal_disk_info_map(l, wal_dir_to_disk_usage.first, -1, -1, + pre_allocated)); + return true; + } else { + return false; + } +} + +Status GroupCommitMgr::update_load_info(TUniqueId load_id, size_t content_length) { + std::unique_lock l(_load_info_lock); + if (_load_id_to_content_length_map.find(load_id) != _load_id_to_content_length_map.end()) { + return Status::InternalError("txn id already exists!"); + } + _load_id_to_content_length_map.insert(std::make_pair(load_id, content_length)); + return Status::OK(); +} + +Status GroupCommitMgr::get_load_info(TUniqueId load_id, size_t* content_length) { Review Comment: warning: method 'get_load_info' can be made static [readability-convert-member-functions-to-static] be/src/runtime/group_commit_mgr.h:158: ```diff - Status get_load_info(TUniqueId load_id, size_t* content_length); + static Status get_load_info(TUniqueId load_id, size_t* content_length); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org