github-actions[bot] commented on code in PR #27726: URL: https://github.com/apache/doris/pull/27726#discussion_r1436133347
########## be/src/olap/wal_dirs_info.cpp: ########## @@ -0,0 +1,220 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_dirs_info.h" + +#include <memory> +#include <mutex> +#include <shared_mutex> + +#include "common/status.h" +#include "io/fs/local_file_system.h" +#include "util/parse_util.h" + +namespace doris { + +std::string WalDirInfo::get_wal_dir() { + return _wal_dir; +} + +size_t WalDirInfo::get_limit() { + std::shared_lock rlock(_lock); + return _limit; +} + +size_t WalDirInfo::get_used() { + std::shared_lock rlock(_lock); + return _used; +} + +size_t WalDirInfo::get_pre_allocated() { + std::shared_lock rlock(_lock); + return _pre_allocated; +} + +Status WalDirInfo::set_limit(size_t limit) { + std::unique_lock wlock(_lock); + _limit = limit; + return Status::OK(); +} + +Status WalDirInfo::set_used(size_t used) { + std::unique_lock wlock(_lock); + _used = used; + return Status::OK(); +} + +Status WalDirInfo::set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) { + std::unique_lock wlock(_lock); + if (is_add_pre_allocated) { + _pre_allocated += pre_allocated; + } else { + _pre_allocated -= pre_allocated; + } + return Status::OK(); +} + +size_t WalDirInfo::available() { + std::unique_lock wlock(_lock); + int64_t available = _limit - _used - _pre_allocated; + return available > 0 ? available : 0; +} + +Status WalDirInfo::update_wal_dir_limit(size_t limit) { + if (limit != static_cast<size_t>(-1)) { + RETURN_IF_ERROR(set_limit(limit)); + } else { + size_t available_bytes; + size_t disk_capacity_bytes; + RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info( + _wal_dir, &disk_capacity_bytes, &available_bytes)); + bool is_percent = true; + int64_t wal_disk_limit = ParseUtil::parse_mem_spec(config::group_commit_wal_max_disk_limit, + -1, available_bytes, &is_percent); + if (wal_disk_limit <= 0) { + return Status::InternalError("Disk full! Please check your disk usage!"); + } + size_t wal_dir_size = 0; + RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir, &wal_dir_size)); + RETURN_IF_ERROR(set_limit(wal_disk_limit)); + } + return Status::OK(); +} + +Status WalDirInfo::update_wal_dir_used(size_t used) { + if (used != static_cast<size_t>(-1)) { + RETURN_IF_ERROR(set_used(used)); + } else { + size_t wal_dir_size = 0; + RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir, &wal_dir_size)); + RETURN_IF_ERROR(set_used(wal_dir_size)); + } + return Status::OK(); +} + +Status WalDirInfo::update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) { Review Comment: warning: method 'update_wal_dir_pre_allocated' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status WalDirInfo::update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) { ``` ########## be/src/olap/wal_dirs_info.cpp: ########## @@ -0,0 +1,220 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_dirs_info.h" + +#include <memory> +#include <mutex> +#include <shared_mutex> + +#include "common/status.h" +#include "io/fs/local_file_system.h" +#include "util/parse_util.h" + +namespace doris { + +std::string WalDirInfo::get_wal_dir() { + return _wal_dir; +} + +size_t WalDirInfo::get_limit() { + std::shared_lock rlock(_lock); + return _limit; +} + +size_t WalDirInfo::get_used() { + std::shared_lock rlock(_lock); + return _used; +} + +size_t WalDirInfo::get_pre_allocated() { + std::shared_lock rlock(_lock); + return _pre_allocated; +} + +Status WalDirInfo::set_limit(size_t limit) { + std::unique_lock wlock(_lock); + _limit = limit; + return Status::OK(); +} + +Status WalDirInfo::set_used(size_t used) { + std::unique_lock wlock(_lock); + _used = used; + return Status::OK(); +} + +Status WalDirInfo::set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) { + std::unique_lock wlock(_lock); + if (is_add_pre_allocated) { + _pre_allocated += pre_allocated; + } else { + _pre_allocated -= pre_allocated; + } + return Status::OK(); +} + +size_t WalDirInfo::available() { + std::unique_lock wlock(_lock); + int64_t available = _limit - _used - _pre_allocated; + return available > 0 ? available : 0; +} + +Status WalDirInfo::update_wal_dir_limit(size_t limit) { + if (limit != static_cast<size_t>(-1)) { + RETURN_IF_ERROR(set_limit(limit)); + } else { + size_t available_bytes; + size_t disk_capacity_bytes; + RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info( + _wal_dir, &disk_capacity_bytes, &available_bytes)); + bool is_percent = true; + int64_t wal_disk_limit = ParseUtil::parse_mem_spec(config::group_commit_wal_max_disk_limit, + -1, available_bytes, &is_percent); + if (wal_disk_limit <= 0) { + return Status::InternalError("Disk full! Please check your disk usage!"); + } + size_t wal_dir_size = 0; + RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir, &wal_dir_size)); + RETURN_IF_ERROR(set_limit(wal_disk_limit)); + } + return Status::OK(); +} + +Status WalDirInfo::update_wal_dir_used(size_t used) { + if (used != static_cast<size_t>(-1)) { + RETURN_IF_ERROR(set_used(used)); + } else { + size_t wal_dir_size = 0; + RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir, &wal_dir_size)); + RETURN_IF_ERROR(set_used(wal_dir_size)); + } + return Status::OK(); +} + +Status WalDirInfo::update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) { + RETURN_IF_ERROR(set_pre_allocated(pre_allocated, is_add_pre_allocated)); + return Status::OK(); +} + +Status WalDirsInfo::add(const std::string& wal_dir, size_t limit, size_t used, + size_t pre_allocated) { + for (const auto& it : _wal_dirs_info_vec) { + if (it->get_wal_dir() == wal_dir) { + return Status::InternalError("wal dir {} exists!", wal_dir); + } + } + std::unique_lock wlock(_lock); + _wal_dirs_info_vec.emplace_back( + std::make_shared<WalDirInfo>(wal_dir, limit, used, pre_allocated)); + return Status::OK(); +} + +Status WalDirsInfo::clear() { + std::unique_lock wlock(_lock); + _wal_dirs_info_vec.clear(); + return Status::OK(); +} + +std::string WalDirsInfo::get_available_random_wal_dir() { + if (_wal_dirs_info_vec.size() == 1) { + return (*_wal_dirs_info_vec.begin())->get_wal_dir(); + } else { + std::vector<std::string> available_wal_dirs; + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + if (wal_dir_info->available() > wal_dir_info->get_limit() * 0.2) { + available_wal_dirs.emplace_back(wal_dir_info->get_wal_dir()); + } + } + if (available_wal_dirs.empty()) { + return (*std::min_element(_wal_dirs_info_vec.begin(), _wal_dirs_info_vec.end(), + [](const auto& info1, const auto& info2) { + return info1->available() < info2->available(); + })) + ->get_wal_dir(); + } else { + return (*std::next(_wal_dirs_info_vec.begin(), rand() % _wal_dirs_info_vec.size())) + ->get_wal_dir(); + } + } +} + +size_t WalDirsInfo::get_max_available_size() { + return _wal_dirs_info_vec.size() == 1 + ? (*_wal_dirs_info_vec.begin())->available() + : (*std::max_element(_wal_dirs_info_vec.begin(), _wal_dirs_info_vec.end(), + [](const auto& info1, const auto& info2) { + return info1->available() < info2->available(); + })) + ->available(); +} + +Status WalDirsInfo::update_wal_dir_limit(std::string wal_dir, size_t limit) { Review Comment: warning: method 'update_wal_dir_limit' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status WalDirsInfo::update_wal_dir_limit(std::string wal_dir, size_t limit) { ``` ########## be/src/olap/wal_dirs_info.cpp: ########## @@ -0,0 +1,220 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_dirs_info.h" + +#include <memory> +#include <mutex> +#include <shared_mutex> + +#include "common/status.h" +#include "io/fs/local_file_system.h" +#include "util/parse_util.h" + +namespace doris { + +std::string WalDirInfo::get_wal_dir() { + return _wal_dir; +} + +size_t WalDirInfo::get_limit() { + std::shared_lock rlock(_lock); + return _limit; +} + +size_t WalDirInfo::get_used() { + std::shared_lock rlock(_lock); + return _used; +} + +size_t WalDirInfo::get_pre_allocated() { + std::shared_lock rlock(_lock); + return _pre_allocated; +} + +Status WalDirInfo::set_limit(size_t limit) { + std::unique_lock wlock(_lock); + _limit = limit; + return Status::OK(); +} + +Status WalDirInfo::set_used(size_t used) { + std::unique_lock wlock(_lock); + _used = used; + return Status::OK(); +} + +Status WalDirInfo::set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) { + std::unique_lock wlock(_lock); + if (is_add_pre_allocated) { + _pre_allocated += pre_allocated; + } else { + _pre_allocated -= pre_allocated; + } + return Status::OK(); +} + +size_t WalDirInfo::available() { + std::unique_lock wlock(_lock); + int64_t available = _limit - _used - _pre_allocated; + return available > 0 ? available : 0; +} + +Status WalDirInfo::update_wal_dir_limit(size_t limit) { + if (limit != static_cast<size_t>(-1)) { + RETURN_IF_ERROR(set_limit(limit)); + } else { + size_t available_bytes; + size_t disk_capacity_bytes; + RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info( + _wal_dir, &disk_capacity_bytes, &available_bytes)); + bool is_percent = true; + int64_t wal_disk_limit = ParseUtil::parse_mem_spec(config::group_commit_wal_max_disk_limit, + -1, available_bytes, &is_percent); + if (wal_disk_limit <= 0) { + return Status::InternalError("Disk full! Please check your disk usage!"); + } + size_t wal_dir_size = 0; + RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir, &wal_dir_size)); + RETURN_IF_ERROR(set_limit(wal_disk_limit)); + } + return Status::OK(); +} + +Status WalDirInfo::update_wal_dir_used(size_t used) { Review Comment: warning: method 'update_wal_dir_used' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status WalDirInfo::update_wal_dir_used(size_t used) { ``` ########## be/src/olap/wal_dirs_info.cpp: ########## @@ -0,0 +1,220 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_dirs_info.h" + +#include <memory> +#include <mutex> +#include <shared_mutex> + +#include "common/status.h" +#include "io/fs/local_file_system.h" +#include "util/parse_util.h" + +namespace doris { + +std::string WalDirInfo::get_wal_dir() { + return _wal_dir; +} + +size_t WalDirInfo::get_limit() { + std::shared_lock rlock(_lock); + return _limit; +} + +size_t WalDirInfo::get_used() { + std::shared_lock rlock(_lock); + return _used; +} + +size_t WalDirInfo::get_pre_allocated() { + std::shared_lock rlock(_lock); + return _pre_allocated; +} + +Status WalDirInfo::set_limit(size_t limit) { + std::unique_lock wlock(_lock); + _limit = limit; + return Status::OK(); +} + +Status WalDirInfo::set_used(size_t used) { + std::unique_lock wlock(_lock); + _used = used; + return Status::OK(); +} + +Status WalDirInfo::set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) { + std::unique_lock wlock(_lock); + if (is_add_pre_allocated) { + _pre_allocated += pre_allocated; + } else { + _pre_allocated -= pre_allocated; + } + return Status::OK(); +} + +size_t WalDirInfo::available() { + std::unique_lock wlock(_lock); + int64_t available = _limit - _used - _pre_allocated; + return available > 0 ? available : 0; +} + +Status WalDirInfo::update_wal_dir_limit(size_t limit) { + if (limit != static_cast<size_t>(-1)) { + RETURN_IF_ERROR(set_limit(limit)); + } else { + size_t available_bytes; + size_t disk_capacity_bytes; + RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info( + _wal_dir, &disk_capacity_bytes, &available_bytes)); + bool is_percent = true; + int64_t wal_disk_limit = ParseUtil::parse_mem_spec(config::group_commit_wal_max_disk_limit, + -1, available_bytes, &is_percent); + if (wal_disk_limit <= 0) { + return Status::InternalError("Disk full! Please check your disk usage!"); + } + size_t wal_dir_size = 0; + RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir, &wal_dir_size)); + RETURN_IF_ERROR(set_limit(wal_disk_limit)); + } + return Status::OK(); +} + +Status WalDirInfo::update_wal_dir_used(size_t used) { + if (used != static_cast<size_t>(-1)) { + RETURN_IF_ERROR(set_used(used)); + } else { + size_t wal_dir_size = 0; + RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir, &wal_dir_size)); + RETURN_IF_ERROR(set_used(wal_dir_size)); + } + return Status::OK(); +} + +Status WalDirInfo::update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) { + RETURN_IF_ERROR(set_pre_allocated(pre_allocated, is_add_pre_allocated)); + return Status::OK(); +} + +Status WalDirsInfo::add(const std::string& wal_dir, size_t limit, size_t used, + size_t pre_allocated) { + for (const auto& it : _wal_dirs_info_vec) { + if (it->get_wal_dir() == wal_dir) { + return Status::InternalError("wal dir {} exists!", wal_dir); + } + } + std::unique_lock wlock(_lock); + _wal_dirs_info_vec.emplace_back( + std::make_shared<WalDirInfo>(wal_dir, limit, used, pre_allocated)); + return Status::OK(); +} + +Status WalDirsInfo::clear() { + std::unique_lock wlock(_lock); + _wal_dirs_info_vec.clear(); + return Status::OK(); +} + +std::string WalDirsInfo::get_available_random_wal_dir() { + if (_wal_dirs_info_vec.size() == 1) { + return (*_wal_dirs_info_vec.begin())->get_wal_dir(); + } else { + std::vector<std::string> available_wal_dirs; + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + if (wal_dir_info->available() > wal_dir_info->get_limit() * 0.2) { + available_wal_dirs.emplace_back(wal_dir_info->get_wal_dir()); + } + } + if (available_wal_dirs.empty()) { + return (*std::min_element(_wal_dirs_info_vec.begin(), _wal_dirs_info_vec.end(), + [](const auto& info1, const auto& info2) { + return info1->available() < info2->available(); + })) + ->get_wal_dir(); + } else { + return (*std::next(_wal_dirs_info_vec.begin(), rand() % _wal_dirs_info_vec.size())) + ->get_wal_dir(); + } + } +} + +size_t WalDirsInfo::get_max_available_size() { + return _wal_dirs_info_vec.size() == 1 + ? (*_wal_dirs_info_vec.begin())->available() + : (*std::max_element(_wal_dirs_info_vec.begin(), _wal_dirs_info_vec.end(), + [](const auto& info1, const auto& info2) { + return info1->available() < info2->available(); + })) + ->available(); +} + +Status WalDirsInfo::update_wal_dir_limit(std::string wal_dir, size_t limit) { + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + if (wal_dir_info->get_wal_dir() == wal_dir) { + return wal_dir_info->update_wal_dir_limit(limit); + } + } + return Status::InternalError("Can not find wal dir in wal disks info."); +} + +Status WalDirsInfo::update_all_wal_dir_limit() { Review Comment: warning: method 'update_all_wal_dir_limit' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status WalDirsInfo::update_all_wal_dir_limit() { ``` ########## be/src/olap/wal_dirs_info.h: ########## @@ -0,0 +1,84 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstddef> +#include <memory> +#include <mutex> +#include <shared_mutex> +#include <unordered_map> +#include <utility> +#include <vector> + +#include "common/factory_creator.h" + +namespace doris { +class WalDirInfo { + ENABLE_FACTORY_CREATOR(WalDirInfo); + +public: + WalDirInfo(std::string wal_dir, size_t limit, size_t used, size_t pre_allocated) + : _wal_dir(std::move(wal_dir)), + _limit(limit), + _used(used), + _pre_allocated(pre_allocated) {} + std::string get_wal_dir(); + size_t get_limit(); + size_t get_used(); + size_t get_pre_allocated(); + Status set_limit(size_t limit); + Status set_used(size_t used); + Status set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated); + size_t available(); + Status update_wal_dir_limit(size_t limit = -1); + Status update_wal_dir_used(size_t used = -1); + Status update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated = true); Review Comment: warning: unknown type name 'Status' [clang-diagnostic-error] ```cpp Status update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated = true); ^ ``` ########## be/src/olap/wal_dirs_info.cpp: ########## @@ -0,0 +1,220 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_dirs_info.h" + +#include <memory> +#include <mutex> +#include <shared_mutex> + +#include "common/status.h" +#include "io/fs/local_file_system.h" +#include "util/parse_util.h" + +namespace doris { + +std::string WalDirInfo::get_wal_dir() { + return _wal_dir; +} + +size_t WalDirInfo::get_limit() { + std::shared_lock rlock(_lock); + return _limit; +} + +size_t WalDirInfo::get_used() { + std::shared_lock rlock(_lock); + return _used; +} + +size_t WalDirInfo::get_pre_allocated() { + std::shared_lock rlock(_lock); + return _pre_allocated; +} + +Status WalDirInfo::set_limit(size_t limit) { + std::unique_lock wlock(_lock); + _limit = limit; + return Status::OK(); +} + +Status WalDirInfo::set_used(size_t used) { + std::unique_lock wlock(_lock); + _used = used; + return Status::OK(); +} + +Status WalDirInfo::set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) { + std::unique_lock wlock(_lock); + if (is_add_pre_allocated) { + _pre_allocated += pre_allocated; + } else { + _pre_allocated -= pre_allocated; + } + return Status::OK(); +} + +size_t WalDirInfo::available() { + std::unique_lock wlock(_lock); + int64_t available = _limit - _used - _pre_allocated; + return available > 0 ? available : 0; +} + +Status WalDirInfo::update_wal_dir_limit(size_t limit) { + if (limit != static_cast<size_t>(-1)) { + RETURN_IF_ERROR(set_limit(limit)); + } else { + size_t available_bytes; + size_t disk_capacity_bytes; + RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info( + _wal_dir, &disk_capacity_bytes, &available_bytes)); + bool is_percent = true; + int64_t wal_disk_limit = ParseUtil::parse_mem_spec(config::group_commit_wal_max_disk_limit, + -1, available_bytes, &is_percent); + if (wal_disk_limit <= 0) { + return Status::InternalError("Disk full! Please check your disk usage!"); + } + size_t wal_dir_size = 0; + RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir, &wal_dir_size)); + RETURN_IF_ERROR(set_limit(wal_disk_limit)); + } + return Status::OK(); +} + +Status WalDirInfo::update_wal_dir_used(size_t used) { + if (used != static_cast<size_t>(-1)) { + RETURN_IF_ERROR(set_used(used)); + } else { + size_t wal_dir_size = 0; + RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir, &wal_dir_size)); + RETURN_IF_ERROR(set_used(wal_dir_size)); + } + return Status::OK(); +} + +Status WalDirInfo::update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) { + RETURN_IF_ERROR(set_pre_allocated(pre_allocated, is_add_pre_allocated)); + return Status::OK(); +} + +Status WalDirsInfo::add(const std::string& wal_dir, size_t limit, size_t used, + size_t pre_allocated) { + for (const auto& it : _wal_dirs_info_vec) { + if (it->get_wal_dir() == wal_dir) { + return Status::InternalError("wal dir {} exists!", wal_dir); + } + } + std::unique_lock wlock(_lock); + _wal_dirs_info_vec.emplace_back( + std::make_shared<WalDirInfo>(wal_dir, limit, used, pre_allocated)); + return Status::OK(); +} + +Status WalDirsInfo::clear() { + std::unique_lock wlock(_lock); + _wal_dirs_info_vec.clear(); + return Status::OK(); +} + +std::string WalDirsInfo::get_available_random_wal_dir() { + if (_wal_dirs_info_vec.size() == 1) { + return (*_wal_dirs_info_vec.begin())->get_wal_dir(); + } else { + std::vector<std::string> available_wal_dirs; + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + if (wal_dir_info->available() > wal_dir_info->get_limit() * 0.2) { + available_wal_dirs.emplace_back(wal_dir_info->get_wal_dir()); + } + } + if (available_wal_dirs.empty()) { + return (*std::min_element(_wal_dirs_info_vec.begin(), _wal_dirs_info_vec.end(), + [](const auto& info1, const auto& info2) { + return info1->available() < info2->available(); + })) + ->get_wal_dir(); + } else { + return (*std::next(_wal_dirs_info_vec.begin(), rand() % _wal_dirs_info_vec.size())) + ->get_wal_dir(); + } + } +} + +size_t WalDirsInfo::get_max_available_size() { + return _wal_dirs_info_vec.size() == 1 + ? (*_wal_dirs_info_vec.begin())->available() + : (*std::max_element(_wal_dirs_info_vec.begin(), _wal_dirs_info_vec.end(), + [](const auto& info1, const auto& info2) { + return info1->available() < info2->available(); + })) + ->available(); +} + +Status WalDirsInfo::update_wal_dir_limit(std::string wal_dir, size_t limit) { + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + if (wal_dir_info->get_wal_dir() == wal_dir) { + return wal_dir_info->update_wal_dir_limit(limit); + } + } + return Status::InternalError("Can not find wal dir in wal disks info."); +} + +Status WalDirsInfo::update_all_wal_dir_limit() { + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + RETURN_IF_ERROR(wal_dir_info->update_wal_dir_limit(-1)); + } + return Status::OK(); +} + +Status WalDirsInfo::update_wal_dir_used(std::string wal_dir, size_t used) { Review Comment: warning: method 'update_wal_dir_used' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status WalDirsInfo::update_wal_dir_used(std::string wal_dir, size_t used) { ``` ########## be/src/olap/wal_dirs_info.h: ########## @@ -0,0 +1,84 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstddef> +#include <memory> +#include <mutex> +#include <shared_mutex> +#include <unordered_map> +#include <utility> +#include <vector> + +#include "common/factory_creator.h" + +namespace doris { +class WalDirInfo { + ENABLE_FACTORY_CREATOR(WalDirInfo); + +public: + WalDirInfo(std::string wal_dir, size_t limit, size_t used, size_t pre_allocated) + : _wal_dir(std::move(wal_dir)), + _limit(limit), + _used(used), + _pre_allocated(pre_allocated) {} + std::string get_wal_dir(); + size_t get_limit(); + size_t get_used(); + size_t get_pre_allocated(); + Status set_limit(size_t limit); + Status set_used(size_t used); + Status set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated); + size_t available(); + Status update_wal_dir_limit(size_t limit = -1); + Status update_wal_dir_used(size_t used = -1); + Status update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated = true); + +private: + std::string _wal_dir; + size_t _limit; + size_t _used; + size_t _pre_allocated; + std::shared_mutex _lock; +}; + +class WalDirsInfo { + ENABLE_FACTORY_CREATOR(WalDirsInfo); + +public: + WalDirsInfo() = default; + ~WalDirsInfo() = default; + Status add(const std::string& wal_dir, size_t limit, size_t used, size_t pre_allocated); + Status clear(); + std::string get_available_random_wal_dir(); + size_t get_max_available_size(); + Status update_wal_dir_limit(std::string wal_dir, size_t limit = -1); + Status update_all_wal_dir_limit(); Review Comment: warning: unknown type name 'Status' [clang-diagnostic-error] ```cpp Status update_all_wal_dir_limit(); ^ ``` ########## be/src/olap/wal_dirs_info.h: ########## @@ -0,0 +1,84 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstddef> +#include <memory> +#include <mutex> +#include <shared_mutex> +#include <unordered_map> +#include <utility> +#include <vector> + +#include "common/factory_creator.h" + +namespace doris { +class WalDirInfo { + ENABLE_FACTORY_CREATOR(WalDirInfo); + +public: + WalDirInfo(std::string wal_dir, size_t limit, size_t used, size_t pre_allocated) + : _wal_dir(std::move(wal_dir)), + _limit(limit), + _used(used), + _pre_allocated(pre_allocated) {} + std::string get_wal_dir(); + size_t get_limit(); + size_t get_used(); + size_t get_pre_allocated(); + Status set_limit(size_t limit); + Status set_used(size_t used); + Status set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated); + size_t available(); + Status update_wal_dir_limit(size_t limit = -1); + Status update_wal_dir_used(size_t used = -1); + Status update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated = true); + +private: + std::string _wal_dir; + size_t _limit; + size_t _used; + size_t _pre_allocated; + std::shared_mutex _lock; +}; + +class WalDirsInfo { + ENABLE_FACTORY_CREATOR(WalDirsInfo); + +public: + WalDirsInfo() = default; + ~WalDirsInfo() = default; + Status add(const std::string& wal_dir, size_t limit, size_t used, size_t pre_allocated); + Status clear(); + std::string get_available_random_wal_dir(); + size_t get_max_available_size(); + Status update_wal_dir_limit(std::string wal_dir, size_t limit = -1); + Status update_all_wal_dir_limit(); + Status update_wal_dir_used(std::string wal_dir, size_t used = -1); + Status update_all_wal_dir_used(); Review Comment: warning: unknown type name 'Status' [clang-diagnostic-error] ```cpp Status update_all_wal_dir_used(); ^ ``` ########## be/src/olap/wal_manager.cpp: ########## @@ -409,4 +480,46 @@ return Status::OK(); } +size_t WalManager::get_max_available_size() { + return _wal_dirs_info->get_max_available_size(); +} + +Status WalManager::update_wal_dir_limit(const std::string& wal_dir, size_t limit) { + return _wal_dirs_info->update_wal_dir_limit(wal_dir, limit); +} + +Status WalManager::update_wal_dir_used(const std::string& wal_dir, size_t used) { + return _wal_dirs_info->update_wal_dir_used(wal_dir, used); +} + +Status WalManager::update_wal_dir_pre_allocated(const std::string& wal_dir, size_t pre_allocated, + bool is_add_pre_allocated) { + return _wal_dirs_info->update_wal_dir_pre_allocated(wal_dir, pre_allocated, + is_add_pre_allocated); +} + +Status WalManager::_update_wal_dir_info_thread() { Review Comment: warning: method '_update_wal_dir_info_thread' can be made static [readability-convert-member-functions-to-static] be/src/olap/wal_manager.h:95: ```diff - Status _update_wal_dir_info_thread(); + static Status _update_wal_dir_info_thread(); ``` ########## be/test/olap/wal_manager_test.cpp: ########## @@ -123,4 +123,107 @@ TEST_F(WalManagerTest, recovery_normal) { ASSERT_TRUE(!std::filesystem::exists(wal_200)); ASSERT_TRUE(!std::filesystem::exists(wal_201)); } + +TEST_F(WalManagerTest, TestDynamicWalSpaceLimt) { Review Comment: warning: function 'TEST_F' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp TEST_F(WalManagerTest, TestDynamicWalSpaceLimt) { ^ ``` <details> <summary>Additional context</summary> **be/test/olap/wal_manager_test.cpp:126:** 101 lines including whitespace and comments (threshold 80) ```cpp TEST_F(WalManagerTest, TestDynamicWalSpaceLimt) { ^ ``` </details> ########## be/src/olap/wal_dirs_info.cpp: ########## @@ -0,0 +1,220 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_dirs_info.h" + +#include <memory> +#include <mutex> +#include <shared_mutex> + +#include "common/status.h" +#include "io/fs/local_file_system.h" +#include "util/parse_util.h" + +namespace doris { + +std::string WalDirInfo::get_wal_dir() { + return _wal_dir; +} + +size_t WalDirInfo::get_limit() { + std::shared_lock rlock(_lock); + return _limit; +} + +size_t WalDirInfo::get_used() { + std::shared_lock rlock(_lock); + return _used; +} + +size_t WalDirInfo::get_pre_allocated() { + std::shared_lock rlock(_lock); + return _pre_allocated; +} + +Status WalDirInfo::set_limit(size_t limit) { + std::unique_lock wlock(_lock); + _limit = limit; + return Status::OK(); +} + +Status WalDirInfo::set_used(size_t used) { + std::unique_lock wlock(_lock); + _used = used; + return Status::OK(); +} + +Status WalDirInfo::set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) { + std::unique_lock wlock(_lock); + if (is_add_pre_allocated) { + _pre_allocated += pre_allocated; + } else { + _pre_allocated -= pre_allocated; + } + return Status::OK(); +} + +size_t WalDirInfo::available() { + std::unique_lock wlock(_lock); + int64_t available = _limit - _used - _pre_allocated; + return available > 0 ? available : 0; +} + +Status WalDirInfo::update_wal_dir_limit(size_t limit) { Review Comment: warning: method 'update_wal_dir_limit' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status WalDirInfo::update_wal_dir_limit(size_t limit) { ``` ########## be/src/olap/wal_dirs_info.cpp: ########## @@ -0,0 +1,220 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_dirs_info.h" + +#include <memory> +#include <mutex> +#include <shared_mutex> + +#include "common/status.h" +#include "io/fs/local_file_system.h" +#include "util/parse_util.h" + +namespace doris { + +std::string WalDirInfo::get_wal_dir() { + return _wal_dir; +} + +size_t WalDirInfo::get_limit() { + std::shared_lock rlock(_lock); + return _limit; +} + +size_t WalDirInfo::get_used() { + std::shared_lock rlock(_lock); + return _used; +} + +size_t WalDirInfo::get_pre_allocated() { + std::shared_lock rlock(_lock); + return _pre_allocated; +} + +Status WalDirInfo::set_limit(size_t limit) { + std::unique_lock wlock(_lock); + _limit = limit; + return Status::OK(); +} + +Status WalDirInfo::set_used(size_t used) { + std::unique_lock wlock(_lock); + _used = used; + return Status::OK(); +} + +Status WalDirInfo::set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) { + std::unique_lock wlock(_lock); + if (is_add_pre_allocated) { + _pre_allocated += pre_allocated; + } else { + _pre_allocated -= pre_allocated; + } + return Status::OK(); +} + +size_t WalDirInfo::available() { + std::unique_lock wlock(_lock); + int64_t available = _limit - _used - _pre_allocated; + return available > 0 ? available : 0; +} + +Status WalDirInfo::update_wal_dir_limit(size_t limit) { + if (limit != static_cast<size_t>(-1)) { + RETURN_IF_ERROR(set_limit(limit)); + } else { + size_t available_bytes; + size_t disk_capacity_bytes; + RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info( + _wal_dir, &disk_capacity_bytes, &available_bytes)); + bool is_percent = true; + int64_t wal_disk_limit = ParseUtil::parse_mem_spec(config::group_commit_wal_max_disk_limit, + -1, available_bytes, &is_percent); + if (wal_disk_limit <= 0) { + return Status::InternalError("Disk full! Please check your disk usage!"); + } + size_t wal_dir_size = 0; + RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir, &wal_dir_size)); + RETURN_IF_ERROR(set_limit(wal_disk_limit)); + } + return Status::OK(); +} + +Status WalDirInfo::update_wal_dir_used(size_t used) { + if (used != static_cast<size_t>(-1)) { + RETURN_IF_ERROR(set_used(used)); + } else { + size_t wal_dir_size = 0; + RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir, &wal_dir_size)); + RETURN_IF_ERROR(set_used(wal_dir_size)); + } + return Status::OK(); +} + +Status WalDirInfo::update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) { + RETURN_IF_ERROR(set_pre_allocated(pre_allocated, is_add_pre_allocated)); + return Status::OK(); +} + +Status WalDirsInfo::add(const std::string& wal_dir, size_t limit, size_t used, + size_t pre_allocated) { + for (const auto& it : _wal_dirs_info_vec) { + if (it->get_wal_dir() == wal_dir) { + return Status::InternalError("wal dir {} exists!", wal_dir); + } + } + std::unique_lock wlock(_lock); + _wal_dirs_info_vec.emplace_back( + std::make_shared<WalDirInfo>(wal_dir, limit, used, pre_allocated)); + return Status::OK(); +} + +Status WalDirsInfo::clear() { + std::unique_lock wlock(_lock); + _wal_dirs_info_vec.clear(); + return Status::OK(); +} + +std::string WalDirsInfo::get_available_random_wal_dir() { + if (_wal_dirs_info_vec.size() == 1) { + return (*_wal_dirs_info_vec.begin())->get_wal_dir(); + } else { + std::vector<std::string> available_wal_dirs; + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + if (wal_dir_info->available() > wal_dir_info->get_limit() * 0.2) { + available_wal_dirs.emplace_back(wal_dir_info->get_wal_dir()); + } + } + if (available_wal_dirs.empty()) { + return (*std::min_element(_wal_dirs_info_vec.begin(), _wal_dirs_info_vec.end(), + [](const auto& info1, const auto& info2) { + return info1->available() < info2->available(); + })) + ->get_wal_dir(); + } else { + return (*std::next(_wal_dirs_info_vec.begin(), rand() % _wal_dirs_info_vec.size())) + ->get_wal_dir(); + } + } +} + +size_t WalDirsInfo::get_max_available_size() { + return _wal_dirs_info_vec.size() == 1 + ? (*_wal_dirs_info_vec.begin())->available() + : (*std::max_element(_wal_dirs_info_vec.begin(), _wal_dirs_info_vec.end(), + [](const auto& info1, const auto& info2) { + return info1->available() < info2->available(); + })) + ->available(); +} + +Status WalDirsInfo::update_wal_dir_limit(std::string wal_dir, size_t limit) { + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + if (wal_dir_info->get_wal_dir() == wal_dir) { + return wal_dir_info->update_wal_dir_limit(limit); + } + } + return Status::InternalError("Can not find wal dir in wal disks info."); +} + +Status WalDirsInfo::update_all_wal_dir_limit() { + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + RETURN_IF_ERROR(wal_dir_info->update_wal_dir_limit(-1)); + } + return Status::OK(); +} + +Status WalDirsInfo::update_wal_dir_used(std::string wal_dir, size_t used) { + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + if (wal_dir_info->get_wal_dir() == wal_dir) { + return wal_dir_info->update_wal_dir_used(used); + } + } + return Status::InternalError("Can not find wal dir in wal disks info."); +} + +Status WalDirsInfo::update_all_wal_dir_used() { + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + RETURN_IF_ERROR(wal_dir_info->update_wal_dir_used(-1)); + } + return Status::OK(); +} + +Status WalDirsInfo::update_wal_dir_pre_allocated(std::string wal_dir, size_t pre_allocated, + bool is_add_pre_allocated) { + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + if (wal_dir_info->get_wal_dir() == wal_dir) { + return wal_dir_info->update_wal_dir_pre_allocated(pre_allocated, is_add_pre_allocated); + } + } + return Status::InternalError("Can not find wal dir in wal disks info."); +} + +Status WalDirsInfo::get_wal_dir_available_size(const std::string& wal_dir, Review Comment: warning: method 'get_wal_dir_available_size' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status WalDirsInfo::get_wal_dir_available_size(const std::string& wal_dir, ``` ########## be/src/olap/wal_dirs_info.h: ########## @@ -0,0 +1,84 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstddef> +#include <memory> +#include <mutex> +#include <shared_mutex> +#include <unordered_map> +#include <utility> +#include <vector> + +#include "common/factory_creator.h" + +namespace doris { +class WalDirInfo { + ENABLE_FACTORY_CREATOR(WalDirInfo); + +public: + WalDirInfo(std::string wal_dir, size_t limit, size_t used, size_t pre_allocated) + : _wal_dir(std::move(wal_dir)), + _limit(limit), + _used(used), + _pre_allocated(pre_allocated) {} + std::string get_wal_dir(); + size_t get_limit(); + size_t get_used(); + size_t get_pre_allocated(); + Status set_limit(size_t limit); + Status set_used(size_t used); + Status set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated); + size_t available(); + Status update_wal_dir_limit(size_t limit = -1); + Status update_wal_dir_used(size_t used = -1); Review Comment: warning: unknown type name 'Status' [clang-diagnostic-error] ```cpp Status update_wal_dir_used(size_t used = -1); ^ ``` ########## be/src/olap/wal_dirs_info.h: ########## @@ -0,0 +1,84 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstddef> +#include <memory> +#include <mutex> +#include <shared_mutex> +#include <unordered_map> +#include <utility> +#include <vector> + +#include "common/factory_creator.h" + +namespace doris { +class WalDirInfo { + ENABLE_FACTORY_CREATOR(WalDirInfo); + +public: + WalDirInfo(std::string wal_dir, size_t limit, size_t used, size_t pre_allocated) + : _wal_dir(std::move(wal_dir)), + _limit(limit), + _used(used), + _pre_allocated(pre_allocated) {} + std::string get_wal_dir(); + size_t get_limit(); + size_t get_used(); + size_t get_pre_allocated(); + Status set_limit(size_t limit); + Status set_used(size_t used); + Status set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated); + size_t available(); + Status update_wal_dir_limit(size_t limit = -1); + Status update_wal_dir_used(size_t used = -1); + Status update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated = true); + +private: + std::string _wal_dir; + size_t _limit; + size_t _used; + size_t _pre_allocated; + std::shared_mutex _lock; +}; + +class WalDirsInfo { + ENABLE_FACTORY_CREATOR(WalDirsInfo); + +public: + WalDirsInfo() = default; + ~WalDirsInfo() = default; + Status add(const std::string& wal_dir, size_t limit, size_t used, size_t pre_allocated); + Status clear(); + std::string get_available_random_wal_dir(); + size_t get_max_available_size(); + Status update_wal_dir_limit(std::string wal_dir, size_t limit = -1); + Status update_all_wal_dir_limit(); + Status update_wal_dir_used(std::string wal_dir, size_t used = -1); Review Comment: warning: unknown type name 'Status' [clang-diagnostic-error] ```cpp Status update_wal_dir_used(std::string wal_dir, size_t used = -1); ^ ``` ########## be/src/olap/wal_dirs_info.cpp: ########## @@ -0,0 +1,220 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_dirs_info.h" + +#include <memory> +#include <mutex> +#include <shared_mutex> + +#include "common/status.h" +#include "io/fs/local_file_system.h" +#include "util/parse_util.h" + +namespace doris { + +std::string WalDirInfo::get_wal_dir() { + return _wal_dir; +} + +size_t WalDirInfo::get_limit() { + std::shared_lock rlock(_lock); + return _limit; +} + +size_t WalDirInfo::get_used() { + std::shared_lock rlock(_lock); + return _used; +} + +size_t WalDirInfo::get_pre_allocated() { + std::shared_lock rlock(_lock); + return _pre_allocated; +} + +Status WalDirInfo::set_limit(size_t limit) { + std::unique_lock wlock(_lock); + _limit = limit; + return Status::OK(); +} + +Status WalDirInfo::set_used(size_t used) { + std::unique_lock wlock(_lock); + _used = used; + return Status::OK(); +} + +Status WalDirInfo::set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) { + std::unique_lock wlock(_lock); + if (is_add_pre_allocated) { + _pre_allocated += pre_allocated; + } else { + _pre_allocated -= pre_allocated; + } + return Status::OK(); +} + +size_t WalDirInfo::available() { + std::unique_lock wlock(_lock); + int64_t available = _limit - _used - _pre_allocated; + return available > 0 ? available : 0; +} + +Status WalDirInfo::update_wal_dir_limit(size_t limit) { + if (limit != static_cast<size_t>(-1)) { + RETURN_IF_ERROR(set_limit(limit)); + } else { + size_t available_bytes; + size_t disk_capacity_bytes; + RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info( + _wal_dir, &disk_capacity_bytes, &available_bytes)); + bool is_percent = true; + int64_t wal_disk_limit = ParseUtil::parse_mem_spec(config::group_commit_wal_max_disk_limit, + -1, available_bytes, &is_percent); + if (wal_disk_limit <= 0) { + return Status::InternalError("Disk full! Please check your disk usage!"); + } + size_t wal_dir_size = 0; + RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir, &wal_dir_size)); + RETURN_IF_ERROR(set_limit(wal_disk_limit)); + } + return Status::OK(); +} + +Status WalDirInfo::update_wal_dir_used(size_t used) { + if (used != static_cast<size_t>(-1)) { + RETURN_IF_ERROR(set_used(used)); + } else { + size_t wal_dir_size = 0; + RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir, &wal_dir_size)); + RETURN_IF_ERROR(set_used(wal_dir_size)); + } + return Status::OK(); +} + +Status WalDirInfo::update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) { + RETURN_IF_ERROR(set_pre_allocated(pre_allocated, is_add_pre_allocated)); + return Status::OK(); +} + +Status WalDirsInfo::add(const std::string& wal_dir, size_t limit, size_t used, + size_t pre_allocated) { + for (const auto& it : _wal_dirs_info_vec) { + if (it->get_wal_dir() == wal_dir) { + return Status::InternalError("wal dir {} exists!", wal_dir); + } + } + std::unique_lock wlock(_lock); + _wal_dirs_info_vec.emplace_back( + std::make_shared<WalDirInfo>(wal_dir, limit, used, pre_allocated)); + return Status::OK(); +} + +Status WalDirsInfo::clear() { + std::unique_lock wlock(_lock); + _wal_dirs_info_vec.clear(); + return Status::OK(); +} + +std::string WalDirsInfo::get_available_random_wal_dir() { + if (_wal_dirs_info_vec.size() == 1) { + return (*_wal_dirs_info_vec.begin())->get_wal_dir(); + } else { + std::vector<std::string> available_wal_dirs; + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + if (wal_dir_info->available() > wal_dir_info->get_limit() * 0.2) { + available_wal_dirs.emplace_back(wal_dir_info->get_wal_dir()); + } + } + if (available_wal_dirs.empty()) { + return (*std::min_element(_wal_dirs_info_vec.begin(), _wal_dirs_info_vec.end(), + [](const auto& info1, const auto& info2) { + return info1->available() < info2->available(); + })) + ->get_wal_dir(); + } else { + return (*std::next(_wal_dirs_info_vec.begin(), rand() % _wal_dirs_info_vec.size())) + ->get_wal_dir(); + } + } +} + +size_t WalDirsInfo::get_max_available_size() { + return _wal_dirs_info_vec.size() == 1 + ? (*_wal_dirs_info_vec.begin())->available() + : (*std::max_element(_wal_dirs_info_vec.begin(), _wal_dirs_info_vec.end(), + [](const auto& info1, const auto& info2) { + return info1->available() < info2->available(); + })) + ->available(); +} + +Status WalDirsInfo::update_wal_dir_limit(std::string wal_dir, size_t limit) { + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + if (wal_dir_info->get_wal_dir() == wal_dir) { + return wal_dir_info->update_wal_dir_limit(limit); + } + } + return Status::InternalError("Can not find wal dir in wal disks info."); +} + +Status WalDirsInfo::update_all_wal_dir_limit() { + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + RETURN_IF_ERROR(wal_dir_info->update_wal_dir_limit(-1)); + } + return Status::OK(); +} + +Status WalDirsInfo::update_wal_dir_used(std::string wal_dir, size_t used) { + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + if (wal_dir_info->get_wal_dir() == wal_dir) { + return wal_dir_info->update_wal_dir_used(used); + } + } + return Status::InternalError("Can not find wal dir in wal disks info."); +} + +Status WalDirsInfo::update_all_wal_dir_used() { Review Comment: warning: method 'update_all_wal_dir_used' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status WalDirsInfo::update_all_wal_dir_used() { ``` ########## be/src/olap/wal_dirs_info.cpp: ########## @@ -0,0 +1,220 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_dirs_info.h" + +#include <memory> +#include <mutex> +#include <shared_mutex> + +#include "common/status.h" +#include "io/fs/local_file_system.h" +#include "util/parse_util.h" + +namespace doris { + +std::string WalDirInfo::get_wal_dir() { + return _wal_dir; +} + +size_t WalDirInfo::get_limit() { + std::shared_lock rlock(_lock); + return _limit; +} + +size_t WalDirInfo::get_used() { + std::shared_lock rlock(_lock); + return _used; +} + +size_t WalDirInfo::get_pre_allocated() { + std::shared_lock rlock(_lock); + return _pre_allocated; +} + +Status WalDirInfo::set_limit(size_t limit) { + std::unique_lock wlock(_lock); + _limit = limit; + return Status::OK(); +} + +Status WalDirInfo::set_used(size_t used) { + std::unique_lock wlock(_lock); + _used = used; + return Status::OK(); +} + +Status WalDirInfo::set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) { + std::unique_lock wlock(_lock); + if (is_add_pre_allocated) { + _pre_allocated += pre_allocated; + } else { + _pre_allocated -= pre_allocated; + } + return Status::OK(); +} + +size_t WalDirInfo::available() { + std::unique_lock wlock(_lock); + int64_t available = _limit - _used - _pre_allocated; + return available > 0 ? available : 0; +} + +Status WalDirInfo::update_wal_dir_limit(size_t limit) { + if (limit != static_cast<size_t>(-1)) { + RETURN_IF_ERROR(set_limit(limit)); + } else { + size_t available_bytes; + size_t disk_capacity_bytes; + RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info( + _wal_dir, &disk_capacity_bytes, &available_bytes)); + bool is_percent = true; + int64_t wal_disk_limit = ParseUtil::parse_mem_spec(config::group_commit_wal_max_disk_limit, + -1, available_bytes, &is_percent); + if (wal_disk_limit <= 0) { + return Status::InternalError("Disk full! Please check your disk usage!"); + } + size_t wal_dir_size = 0; + RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir, &wal_dir_size)); + RETURN_IF_ERROR(set_limit(wal_disk_limit)); + } + return Status::OK(); +} + +Status WalDirInfo::update_wal_dir_used(size_t used) { + if (used != static_cast<size_t>(-1)) { + RETURN_IF_ERROR(set_used(used)); + } else { + size_t wal_dir_size = 0; + RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir, &wal_dir_size)); + RETURN_IF_ERROR(set_used(wal_dir_size)); + } + return Status::OK(); +} + +Status WalDirInfo::update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) { + RETURN_IF_ERROR(set_pre_allocated(pre_allocated, is_add_pre_allocated)); + return Status::OK(); +} + +Status WalDirsInfo::add(const std::string& wal_dir, size_t limit, size_t used, + size_t pre_allocated) { + for (const auto& it : _wal_dirs_info_vec) { + if (it->get_wal_dir() == wal_dir) { + return Status::InternalError("wal dir {} exists!", wal_dir); + } + } + std::unique_lock wlock(_lock); + _wal_dirs_info_vec.emplace_back( + std::make_shared<WalDirInfo>(wal_dir, limit, used, pre_allocated)); + return Status::OK(); +} + +Status WalDirsInfo::clear() { + std::unique_lock wlock(_lock); + _wal_dirs_info_vec.clear(); + return Status::OK(); +} + +std::string WalDirsInfo::get_available_random_wal_dir() { + if (_wal_dirs_info_vec.size() == 1) { + return (*_wal_dirs_info_vec.begin())->get_wal_dir(); + } else { + std::vector<std::string> available_wal_dirs; + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + if (wal_dir_info->available() > wal_dir_info->get_limit() * 0.2) { + available_wal_dirs.emplace_back(wal_dir_info->get_wal_dir()); + } + } + if (available_wal_dirs.empty()) { + return (*std::min_element(_wal_dirs_info_vec.begin(), _wal_dirs_info_vec.end(), + [](const auto& info1, const auto& info2) { + return info1->available() < info2->available(); + })) + ->get_wal_dir(); + } else { + return (*std::next(_wal_dirs_info_vec.begin(), rand() % _wal_dirs_info_vec.size())) + ->get_wal_dir(); + } + } +} + +size_t WalDirsInfo::get_max_available_size() { + return _wal_dirs_info_vec.size() == 1 + ? (*_wal_dirs_info_vec.begin())->available() + : (*std::max_element(_wal_dirs_info_vec.begin(), _wal_dirs_info_vec.end(), + [](const auto& info1, const auto& info2) { + return info1->available() < info2->available(); + })) + ->available(); +} + +Status WalDirsInfo::update_wal_dir_limit(std::string wal_dir, size_t limit) { + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + if (wal_dir_info->get_wal_dir() == wal_dir) { + return wal_dir_info->update_wal_dir_limit(limit); + } + } + return Status::InternalError("Can not find wal dir in wal disks info."); +} + +Status WalDirsInfo::update_all_wal_dir_limit() { + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + RETURN_IF_ERROR(wal_dir_info->update_wal_dir_limit(-1)); + } + return Status::OK(); +} + +Status WalDirsInfo::update_wal_dir_used(std::string wal_dir, size_t used) { + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + if (wal_dir_info->get_wal_dir() == wal_dir) { + return wal_dir_info->update_wal_dir_used(used); + } + } + return Status::InternalError("Can not find wal dir in wal disks info."); +} + +Status WalDirsInfo::update_all_wal_dir_used() { + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + RETURN_IF_ERROR(wal_dir_info->update_wal_dir_used(-1)); + } + return Status::OK(); +} + +Status WalDirsInfo::update_wal_dir_pre_allocated(std::string wal_dir, size_t pre_allocated, Review Comment: warning: method 'update_wal_dir_pre_allocated' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status WalDirsInfo::update_wal_dir_pre_allocated(std::string wal_dir, size_t pre_allocated, ``` ########## be/src/olap/wal_dirs_info.h: ########## @@ -0,0 +1,84 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstddef> +#include <memory> +#include <mutex> +#include <shared_mutex> +#include <unordered_map> +#include <utility> +#include <vector> + +#include "common/factory_creator.h" + +namespace doris { +class WalDirInfo { + ENABLE_FACTORY_CREATOR(WalDirInfo); + +public: + WalDirInfo(std::string wal_dir, size_t limit, size_t used, size_t pre_allocated) + : _wal_dir(std::move(wal_dir)), + _limit(limit), + _used(used), + _pre_allocated(pre_allocated) {} + std::string get_wal_dir(); + size_t get_limit(); + size_t get_used(); + size_t get_pre_allocated(); + Status set_limit(size_t limit); + Status set_used(size_t used); + Status set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated); + size_t available(); + Status update_wal_dir_limit(size_t limit = -1); Review Comment: warning: unknown type name 'Status' [clang-diagnostic-error] ```cpp Status update_wal_dir_limit(size_t limit = -1); ^ ``` ########## be/src/olap/wal_dirs_info.h: ########## @@ -0,0 +1,84 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstddef> +#include <memory> +#include <mutex> +#include <shared_mutex> +#include <unordered_map> +#include <utility> +#include <vector> + +#include "common/factory_creator.h" + +namespace doris { +class WalDirInfo { + ENABLE_FACTORY_CREATOR(WalDirInfo); + +public: + WalDirInfo(std::string wal_dir, size_t limit, size_t used, size_t pre_allocated) + : _wal_dir(std::move(wal_dir)), + _limit(limit), + _used(used), + _pre_allocated(pre_allocated) {} + std::string get_wal_dir(); + size_t get_limit(); + size_t get_used(); + size_t get_pre_allocated(); + Status set_limit(size_t limit); + Status set_used(size_t used); + Status set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated); + size_t available(); + Status update_wal_dir_limit(size_t limit = -1); + Status update_wal_dir_used(size_t used = -1); + Status update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated = true); + +private: + std::string _wal_dir; + size_t _limit; + size_t _used; + size_t _pre_allocated; + std::shared_mutex _lock; +}; + +class WalDirsInfo { + ENABLE_FACTORY_CREATOR(WalDirsInfo); + +public: + WalDirsInfo() = default; + ~WalDirsInfo() = default; + Status add(const std::string& wal_dir, size_t limit, size_t used, size_t pre_allocated); + Status clear(); + std::string get_available_random_wal_dir(); + size_t get_max_available_size(); + Status update_wal_dir_limit(std::string wal_dir, size_t limit = -1); + Status update_all_wal_dir_limit(); + Status update_wal_dir_used(std::string wal_dir, size_t used = -1); + Status update_all_wal_dir_used(); + Status update_wal_dir_pre_allocated(std::string wal_dir, size_t pre_allocated, + bool is_add_pre_allocated); + Status get_wal_dir_available_size(const std::string& wal_dir, size_t* available_bytes); Review Comment: warning: unknown type name 'Status' [clang-diagnostic-error] ```cpp Status get_wal_dir_available_size(const std::string& wal_dir, size_t* available_bytes); ^ ``` ########## be/src/olap/wal_dirs_info.h: ########## @@ -0,0 +1,84 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstddef> +#include <memory> +#include <mutex> +#include <shared_mutex> +#include <unordered_map> +#include <utility> +#include <vector> + +#include "common/factory_creator.h" + +namespace doris { +class WalDirInfo { + ENABLE_FACTORY_CREATOR(WalDirInfo); + +public: + WalDirInfo(std::string wal_dir, size_t limit, size_t used, size_t pre_allocated) + : _wal_dir(std::move(wal_dir)), + _limit(limit), + _used(used), + _pre_allocated(pre_allocated) {} + std::string get_wal_dir(); + size_t get_limit(); + size_t get_used(); + size_t get_pre_allocated(); + Status set_limit(size_t limit); + Status set_used(size_t used); + Status set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated); + size_t available(); + Status update_wal_dir_limit(size_t limit = -1); + Status update_wal_dir_used(size_t used = -1); + Status update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated = true); + +private: + std::string _wal_dir; + size_t _limit; + size_t _used; + size_t _pre_allocated; + std::shared_mutex _lock; +}; + +class WalDirsInfo { + ENABLE_FACTORY_CREATOR(WalDirsInfo); + +public: + WalDirsInfo() = default; + ~WalDirsInfo() = default; + Status add(const std::string& wal_dir, size_t limit, size_t used, size_t pre_allocated); + Status clear(); + std::string get_available_random_wal_dir(); + size_t get_max_available_size(); + Status update_wal_dir_limit(std::string wal_dir, size_t limit = -1); Review Comment: warning: unknown type name 'Status' [clang-diagnostic-error] ```cpp Status update_wal_dir_limit(std::string wal_dir, size_t limit = -1); ^ ``` ########## be/src/olap/wal_dirs_info.h: ########## @@ -0,0 +1,84 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstddef> +#include <memory> +#include <mutex> +#include <shared_mutex> +#include <unordered_map> +#include <utility> +#include <vector> + +#include "common/factory_creator.h" + +namespace doris { +class WalDirInfo { + ENABLE_FACTORY_CREATOR(WalDirInfo); + +public: + WalDirInfo(std::string wal_dir, size_t limit, size_t used, size_t pre_allocated) + : _wal_dir(std::move(wal_dir)), + _limit(limit), + _used(used), + _pre_allocated(pre_allocated) {} + std::string get_wal_dir(); + size_t get_limit(); + size_t get_used(); + size_t get_pre_allocated(); + Status set_limit(size_t limit); + Status set_used(size_t used); + Status set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated); + size_t available(); + Status update_wal_dir_limit(size_t limit = -1); + Status update_wal_dir_used(size_t used = -1); + Status update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated = true); + +private: + std::string _wal_dir; + size_t _limit; + size_t _used; + size_t _pre_allocated; + std::shared_mutex _lock; +}; + +class WalDirsInfo { + ENABLE_FACTORY_CREATOR(WalDirsInfo); + +public: + WalDirsInfo() = default; + ~WalDirsInfo() = default; + Status add(const std::string& wal_dir, size_t limit, size_t used, size_t pre_allocated); + Status clear(); + std::string get_available_random_wal_dir(); + size_t get_max_available_size(); + Status update_wal_dir_limit(std::string wal_dir, size_t limit = -1); + Status update_all_wal_dir_limit(); + Status update_wal_dir_used(std::string wal_dir, size_t used = -1); + Status update_all_wal_dir_used(); + Status update_wal_dir_pre_allocated(std::string wal_dir, size_t pre_allocated, Review Comment: warning: unknown type name 'Status' [clang-diagnostic-error] ```cpp Status update_wal_dir_pre_allocated(std::string wal_dir, size_t pre_allocated, ^ ``` ########## be/src/olap/wal_manager.cpp: ########## @@ -60,12 +70,46 @@ void WalManager::stop() { if (_replay_thread) { _replay_thread->join(); } + if (_update_wal_dirs_info_thread) { + _update_wal_dirs_info_thread->join(); + } _thread_pool->shutdown(); LOG(INFO) << "WalManager is stopped"; } } Status WalManager::init() { + RETURN_IF_ERROR(_init_wal_dirs_conf()); + RETURN_IF_ERROR(_init_wal_dirs()); + RETURN_IF_ERROR(_init_wal_dirs_info()); + return Thread::create( + "WalMgr", "replay_wal", [this]() { static_cast<void>(this->replay()); }, + &_replay_thread); +} + +Status WalManager::_init_wal_dirs_conf() { Review Comment: warning: method '_init_wal_dirs_conf' can be made static [readability-convert-member-functions-to-static] be/src/olap/wal_manager.h:90: ```diff - Status _init_wal_dirs_conf(); + static Status _init_wal_dirs_conf(); ``` ########## be/src/olap/wal_dirs_info.h: ########## @@ -0,0 +1,84 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstddef> +#include <memory> +#include <mutex> +#include <shared_mutex> +#include <unordered_map> +#include <utility> +#include <vector> + +#include "common/factory_creator.h" + +namespace doris { +class WalDirInfo { + ENABLE_FACTORY_CREATOR(WalDirInfo); + +public: + WalDirInfo(std::string wal_dir, size_t limit, size_t used, size_t pre_allocated) + : _wal_dir(std::move(wal_dir)), + _limit(limit), + _used(used), + _pre_allocated(pre_allocated) {} + std::string get_wal_dir(); + size_t get_limit(); + size_t get_used(); + size_t get_pre_allocated(); + Status set_limit(size_t limit); + Status set_used(size_t used); + Status set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated); + size_t available(); + Status update_wal_dir_limit(size_t limit = -1); + Status update_wal_dir_used(size_t used = -1); + Status update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated = true); + +private: + std::string _wal_dir; + size_t _limit; + size_t _used; + size_t _pre_allocated; + std::shared_mutex _lock; +}; + +class WalDirsInfo { + ENABLE_FACTORY_CREATOR(WalDirsInfo); + +public: + WalDirsInfo() = default; + ~WalDirsInfo() = default; + Status add(const std::string& wal_dir, size_t limit, size_t used, size_t pre_allocated); + Status clear(); Review Comment: warning: unknown type name 'Status' [clang-diagnostic-error] ```cpp Status clear(); ^ ``` ########## be/src/olap/wal_manager.cpp: ########## @@ -344,31 +426,20 @@ } } -Status WalManager::delete_wal(int64_t wal_id) { +Status WalManager::delete_wal(int64_t wal_id, size_t block_queue_pre_allocated) { Review Comment: warning: method 'delete_wal' can be made static [readability-convert-member-functions-to-static] be/src/olap/wal_manager.h:58: ```diff - Status delete_wal(int64_t wal_id, size_t block_queue_pre_allocated = 0); + static Status delete_wal(int64_t wal_id, size_t block_queue_pre_allocated = 0); ``` ########## be/src/olap/wal_manager.cpp: ########## @@ -60,12 +70,46 @@ if (_replay_thread) { _replay_thread->join(); } + if (_update_wal_dirs_info_thread) { + _update_wal_dirs_info_thread->join(); + } _thread_pool->shutdown(); LOG(INFO) << "WalManager is stopped"; } } Status WalManager::init() { + RETURN_IF_ERROR(_init_wal_dirs_conf()); + RETURN_IF_ERROR(_init_wal_dirs()); + RETURN_IF_ERROR(_init_wal_dirs_info()); + return Thread::create( + "WalMgr", "replay_wal", [this]() { static_cast<void>(this->replay()); }, + &_replay_thread); +} + +Status WalManager::_init_wal_dirs_conf() { + std::vector<std::string> tmp_dirs; + if (_wal_dirs.empty()) { + // default case. + for (const StorePath& path : ExecEnv::GetInstance()->store_paths()) { + tmp_dirs.emplace_back(path.path + "/wal"); + } + } else { + // user config must be absolute path. + for (const std::string& wal_dir : _wal_dirs) { + if (std::filesystem::path(wal_dir).is_absolute()) { + tmp_dirs.emplace_back(wal_dir); + } else { + return Status::InternalError( + "BE config group_commit_replay_wal_dir has to be absolute path!"); + } + } + } + _wal_dirs = tmp_dirs; + return Status::OK(); +} + +Status WalManager::_init_wal_dirs() { Review Comment: warning: method '_init_wal_dirs' can be made static [readability-convert-member-functions-to-static] be/src/olap/wal_manager.h:91: ```diff - Status _init_wal_dirs(); + static Status _init_wal_dirs(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org