This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 5f62a4462dd [Enhancement](wal) Add wal space back pressure (#26483) 5f62a4462dd is described below commit 5f62a4462dd670d83cd09349a31125c21686fc71 Author: abmdocrt <yukang.lian2...@gmail.com> AuthorDate: Thu Nov 9 12:29:05 2023 +0800 [Enhancement](wal) Add wal space back pressure (#26483) --- be/src/common/config.cpp | 7 +++++-- be/src/common/config.h | 3 +++ be/src/olap/wal_manager.cpp | 24 +++++++++++++++++++++++- be/src/olap/wal_manager.h | 4 ++++ be/src/olap/wal_writer.cpp | 20 +++++++++++++++++++- be/src/olap/wal_writer.h | 13 ++++++++++++- be/test/olap/wal_manager_test.cpp | 4 +++- be/test/olap/wal_reader_writer_test.cpp | 5 ++++- 8 files changed, 73 insertions(+), 7 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 29af4d5059a..0133a78ecf9 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1118,8 +1118,11 @@ DEFINE_Bool(ignore_always_true_predicate_for_segment, "true"); // Dir of default timezone files DEFINE_String(default_tzfiles_path, "${DORIS_HOME}/zoneinfo"); -// Max size(bytes) of group commit queues, used for mem back pressure. -DEFINE_Int32(group_commit_max_queue_size, "65536"); +// Max size(bytes) of group commit queues, used for mem back pressure, defult 64M. +DEFINE_Int32(group_commit_max_queue_size, "67108864"); + +// Max size(bytes) of wal disk using, used for disk space back pressure, default 64M. +DEFINE_Int32(wal_max_disk_size, "67108864"); // Ingest binlog work pool size, -1 is disable, 0 is hardware concurrency DEFINE_Int32(ingest_binlog_work_pool_size, "-1"); diff --git a/be/src/common/config.h b/be/src/common/config.h index f0fdf58558c..d9276aef9a5 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1195,6 +1195,9 @@ DECLARE_String(default_tzfiles_path); // Max size(bytes) of group commit queues, used for mem back pressure. DECLARE_Int32(group_commit_max_queue_size); +// Max size(bytes) of wal disk using, used for disk space back pressure. +DECLARE_Int32(wal_max_disk_size); + // Ingest binlog work pool size DECLARE_Int32(ingest_binlog_work_pool_size); diff --git a/be/src/olap/wal_manager.cpp b/be/src/olap/wal_manager.cpp index cf84c4a9661..921f7da8e38 100644 --- a/be/src/olap/wal_manager.cpp +++ b/be/src/olap/wal_manager.cpp @@ -19,10 +19,15 @@ #include <thrift/protocol/TDebugProtocol.h> +#include <atomic> #include <chrono> +#include <cstdint> #include <filesystem> +#include <memory> +#include <utility> #include "io/fs/local_file_system.h" +#include "olap/wal_writer.h" #include "runtime/client_cache.h" #include "runtime/fragment_mgr.h" #include "runtime/plan_fragment_executor.h" @@ -35,11 +40,13 @@ namespace doris { WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list) : _exec_env(exec_env), _stop_background_threads_latch(1) { doris::vectorized::WalReader::string_split(wal_dir_list, ",", _wal_dirs); + _all_wal_disk_bytes = std::make_shared<std::atomic_size_t>(0); } WalManager::~WalManager() { LOG(INFO) << "WalManager is destoried"; } + void WalManager::stop() { _stop = true; _stop_background_threads_latch.count_down(); @@ -117,8 +124,12 @@ Status WalManager::create_wal_writer(int64_t wal_id, std::shared_ptr<WalWriter>& RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(base_path)); } LOG(INFO) << "create wal " << wal_path; - wal_writer = std::make_shared<WalWriter>(wal_path); + wal_writer = std::make_shared<WalWriter>(wal_path, _all_wal_disk_bytes); RETURN_IF_ERROR(wal_writer->init()); + { + std::lock_guard<std::shared_mutex> wrlock(_wal_lock); + _wal_id_to_writer_map.emplace(wal_id, wal_writer); + } return Status::OK(); } @@ -241,6 +252,17 @@ size_t WalManager::get_wal_table_size(const std::string& table_id) { Status WalManager::delete_wal(int64_t wal_id) { { std::lock_guard<std::shared_mutex> wrlock(_wal_lock); + if (_wal_id_to_writer_map.find(wal_id) != _wal_id_to_writer_map.end()) { + _all_wal_disk_bytes->store( + _all_wal_disk_bytes->fetch_sub(_wal_id_to_writer_map[wal_id]->disk_bytes(), + std::memory_order_relaxed), + std::memory_order_relaxed); + _wal_id_to_writer_map[wal_id]->cv.notify_one(); + _wal_id_to_writer_map.erase(wal_id); + } + if (_wal_id_to_writer_map.empty()) { + CHECK_EQ(_all_wal_disk_bytes->load(std::memory_order_relaxed), 0); + } std::string wal_path = _wal_path_map[wal_id]; RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(wal_path)); LOG(INFO) << "delete file=" << wal_path; diff --git a/be/src/olap/wal_manager.h b/be/src/olap/wal_manager.h index f5b49f6ddaf..d8fa4a70527 100644 --- a/be/src/olap/wal_manager.h +++ b/be/src/olap/wal_manager.h @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include <memory> + #include "common/config.h" #include "gen_cpp/FrontendService.h" #include "gen_cpp/FrontendService_types.h" @@ -56,6 +58,8 @@ private: std::vector<std::string> _wal_dirs; std::shared_mutex _wal_lock; std::unordered_map<int64_t, std::string> _wal_path_map; + std::unordered_map<int64_t, std::shared_ptr<WalWriter>> _wal_id_to_writer_map; + std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes; bool _stop = false; }; } // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_writer.cpp b/be/src/olap/wal_writer.cpp index 7cd427453bb..085c0f0e31f 100644 --- a/be/src/olap/wal_writer.cpp +++ b/be/src/olap/wal_writer.cpp @@ -17,6 +17,9 @@ #include "olap/wal_writer.h" +#include <atomic> + +#include "common/config.h" #include "io/fs/file_writer.h" #include "io/fs/local_file_system.h" #include "io/fs/path.h" @@ -25,7 +28,12 @@ namespace doris { -WalWriter::WalWriter(const std::string& file_name) : _file_name(file_name) {} +WalWriter::WalWriter(const std::string& file_name, + const std::shared_ptr<std::atomic_size_t>& all_wal_disk_bytes) + : _file_name(file_name), + _count(0), + _disk_bytes(0), + _all_wal_disk_bytes(all_wal_disk_bytes) {} WalWriter::~WalWriter() {} @@ -44,6 +52,12 @@ Status WalWriter::finalize() { } Status WalWriter::append_blocks(const PBlockArray& blocks) { + { + std::unique_lock l(_mutex); + while (_all_wal_disk_bytes->load(std::memory_order_relaxed) > config::wal_max_disk_size) { + cv.wait_for(l, std::chrono::milliseconds(WalWriter::MAX_WAL_WRITE_WAIT_TIME)); + } + } size_t total_size = 0; for (const auto& block : blocks) { total_size += LENGTH_SIZE + block->ByteSizeLong() + CHECKSUM_SIZE; @@ -62,6 +76,10 @@ Status WalWriter::append_blocks(const PBlockArray& blocks) { offset += CHECKSUM_SIZE; } DCHECK(offset == total_size); + _disk_bytes += total_size; + _all_wal_disk_bytes->store( + _all_wal_disk_bytes->fetch_add(total_size, std::memory_order_relaxed), + std::memory_order_relaxed); // write rows RETURN_IF_ERROR(_file_writer->append({row_binary, offset})); _count++; diff --git a/be/src/olap/wal_writer.h b/be/src/olap/wal_writer.h index 12fd84f258f..9fd0a396788 100644 --- a/be/src/olap/wal_writer.h +++ b/be/src/olap/wal_writer.h @@ -17,9 +17,13 @@ #pragma once +#include <atomic> +#include <memory> + #include "common/status.h" #include "gen_cpp/internal_service.pb.h" #include "io/fs/file_reader_writer_fwd.h" +#include "util/lock.h" namespace doris { @@ -27,23 +31,30 @@ using PBlockArray = std::vector<PBlock*>; class WalWriter { public: - explicit WalWriter(const std::string& file_name); + explicit WalWriter(const std::string& file_name, + const std::shared_ptr<std::atomic_size_t>& all_wal_disk_bytes); ~WalWriter(); Status init(); Status finalize(); Status append_blocks(const PBlockArray& blocks); + size_t disk_bytes() const { return _disk_bytes; }; std::string file_name() { return _file_name; }; static const int64_t LENGTH_SIZE = 8; static const int64_t CHECKSUM_SIZE = 4; + doris::ConditionVariable cv; private: + static constexpr size_t MAX_WAL_WRITE_WAIT_TIME = 1000; std::string _file_name; io::FileWriterPtr _file_writer; int64_t _count; int64_t _batch; + size_t _disk_bytes; + std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes; + doris::Mutex _mutex; }; } // namespace doris \ No newline at end of file diff --git a/be/test/olap/wal_manager_test.cpp b/be/test/olap/wal_manager_test.cpp index fbb5fdbf03f..ec387680a63 100644 --- a/be/test/olap/wal_manager_test.cpp +++ b/be/test/olap/wal_manager_test.cpp @@ -78,7 +78,9 @@ public: void prepare() { static_cast<void>(io::global_local_filesystem()->create_directory(wal_dir)); } void createWal(const std::string& wal_path) { - auto wal_writer = WalWriter(wal_path); + std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes = + std::make_shared<std::atomic_size_t>(0); + auto wal_writer = WalWriter(wal_path, _all_wal_disk_bytes); static_cast<void>(wal_writer.init()); static_cast<void>(wal_writer.finalize()); } diff --git a/be/test/olap/wal_reader_writer_test.cpp b/be/test/olap/wal_reader_writer_test.cpp index 71c822013a2..09460477e38 100644 --- a/be/test/olap/wal_reader_writer_test.cpp +++ b/be/test/olap/wal_reader_writer_test.cpp @@ -17,6 +17,7 @@ #include <gtest/gtest.h> #include <filesystem> +#include <memory> #include "agent/be_exec_version_manager.h" #include "common/object_pool.h" @@ -89,7 +90,9 @@ void generate_block(PBlock& pblock, int row_index) { TEST_F(WalReaderWriterTest, TestWriteAndRead1) { std::string file_name = _s_test_data_path + "/abcd123.txt"; - auto wal_writer = WalWriter(file_name); + std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes = + std::make_shared<std::atomic_size_t>(0); + auto wal_writer = WalWriter(file_name, _all_wal_disk_bytes); static_cast<void>(wal_writer.init()); size_t file_len = 0; int64_t file_size = -1; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org