This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 352c8e369451e81dff65dbcea0a3d2a0932784f6 Author: huanghaibin <284824...@qq.com> AuthorDate: Thu Feb 8 11:48:49 2024 +0800 [improvement](group_commit) Rename fail wal to tmp should only use in test P0 scenario (#30959) --- be/src/common/config.cpp | 1 + be/src/common/config.h | 1 + be/src/olap/wal/wal_info.cpp | 2 +- be/src/olap/wal/wal_info.h | 4 ++-- be/src/olap/wal/wal_reader.cpp | 14 +++++++++++--- be/src/olap/wal/wal_table.cpp | 23 +++++++++++++++-------- 6 files changed, 31 insertions(+), 14 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 873be1c5f5e..9ac754b6799 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1119,6 +1119,7 @@ DEFINE_Int16(bitmap_serialize_version, "1"); DEFINE_String(group_commit_wal_path, ""); DEFINE_Int32(group_commit_replay_wal_retry_num, "10"); DEFINE_Int32(group_commit_replay_wal_retry_interval_seconds, "5"); +DEFINE_Int32(group_commit_replay_wal_retry_interval_max_seconds, "1800"); DEFINE_Int32(group_commit_relay_wal_threads, "10"); // This config can be set to limit thread number in group commit request fragment thread pool. DEFINE_Int32(group_commit_insert_threads, "10"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 7aa18b0d614..e4ddd552a98 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1186,6 +1186,7 @@ DECLARE_Int16(bitmap_serialize_version); DECLARE_String(group_commit_wal_path); DECLARE_Int32(group_commit_replay_wal_retry_num); DECLARE_Int32(group_commit_replay_wal_retry_interval_seconds); +DECLARE_Int32(group_commit_replay_wal_retry_interval_max_seconds); DECLARE_mInt32(group_commit_relay_wal_threads); // This config can be set to limit thread number in group commit request fragment thread pool. DECLARE_mInt32(group_commit_insert_threads); diff --git a/be/src/olap/wal/wal_info.cpp b/be/src/olap/wal/wal_info.cpp index 3c6fc4190b4..b517966f635 100644 --- a/be/src/olap/wal/wal_info.cpp +++ b/be/src/olap/wal/wal_info.cpp @@ -32,7 +32,7 @@ std::string WalInfo::get_wal_path() { return _wal_path; } -int64_t WalInfo::get_retry_num() { +int32_t WalInfo::get_retry_num() { return _retry_num; } diff --git a/be/src/olap/wal/wal_info.h b/be/src/olap/wal/wal_info.h index 0365e23abd4..5fadc36627c 100644 --- a/be/src/olap/wal/wal_info.h +++ b/be/src/olap/wal/wal_info.h @@ -23,7 +23,7 @@ public: WalInfo(int64_t wal_id, std::string wal_path, int64_t retry_num, int64_t start_time_ms); ~WalInfo() = default; int64_t get_wal_id(); - int64_t get_retry_num(); + int32_t get_retry_num(); int64_t get_start_time_ms(); std::string get_wal_path(); void add_retry_num(); @@ -31,7 +31,7 @@ public: private: int64_t _wal_id; std::string _wal_path; - int64_t _retry_num; + int32_t _retry_num; int64_t _start_time_ms; }; diff --git a/be/src/olap/wal/wal_reader.cpp b/be/src/olap/wal/wal_reader.cpp index bac073e3034..0dfaa18a241 100644 --- a/be/src/olap/wal/wal_reader.cpp +++ b/be/src/olap/wal/wal_reader.cpp @@ -39,14 +39,22 @@ static Status _deserialize(PBlock& block, const std::string& buf) { } Status WalReader::init() { + bool exists = false; + RETURN_IF_ERROR(io::global_local_filesystem()->exists(_file_name, &exists)); + if (!exists) { + LOG(WARNING) << "not exist wal= " << _file_name; + return Status::NotFound("wal {} doesn't exist", _file_name); + } RETURN_IF_ERROR(io::global_local_filesystem()->open_file(_file_name, &file_reader)); return Status::OK(); } Status WalReader::finalize() { - auto st = file_reader->close(); - if (!st.ok()) { - LOG(WARNING) << "fail to close wal " << _file_name; + if (file_reader != nullptr) { + auto st = file_reader->close(); + if (!st.ok()) { + LOG(WARNING) << "fail to close wal " << _file_name << " st= " << st.to_string(); + } } return Status::OK(); } diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp index dabbf0596e9..9b2d1338f24 100644 --- a/be/src/olap/wal/wal_table.cpp +++ b/be/src/olap/wal/wal_table.cpp @@ -59,16 +59,15 @@ void WalTable::_pick_relay_wals() { std::vector<std::string> need_replay_wals; std::vector<std::string> need_erase_wals; for (const auto& [wal_path, wal_info] : _replay_wal_map) { - if (wal_info->get_retry_num() >= config::group_commit_replay_wal_retry_num) { + if (config::group_commit_wait_replay_wal_finish && + wal_info->get_retry_num() >= config::group_commit_replay_wal_retry_num) { LOG(WARNING) << "failed to replay wal=" << wal_path << " after retry " << wal_info->get_retry_num() << " times"; [[maybe_unused]] auto st = _exec_env->wal_mgr()->rename_to_tmp_path( wal_path, _table_id, wal_info->get_wal_id()); - if (config::group_commit_wait_replay_wal_finish) { - auto notify_st = _exec_env->wal_mgr()->notify_relay_wal(wal_info->get_wal_id()); - if (!notify_st.ok()) { - LOG(WARNING) << "notify wal " << wal_info->get_wal_id() << " fail"; - } + auto notify_st = _exec_env->wal_mgr()->notify_relay_wal(wal_info->get_wal_id()); + if (!notify_st.ok()) { + LOG(WARNING) << "notify wal " << wal_info->get_wal_id() << " fail"; } need_erase_wals.push_back(wal_path); continue; @@ -153,8 +152,16 @@ bool WalTable::_need_replay(std::shared_ptr<WalInfo> wal_info) { return true; } #ifndef BE_TEST - auto replay_interval = pow(2, wal_info->get_retry_num()) * - config::group_commit_replay_wal_retry_interval_seconds * 1000; + auto replay_interval = 0; + if (wal_info->get_retry_num() >= config::group_commit_replay_wal_retry_num) { + replay_interval = pow(2, config::group_commit_replay_wal_retry_num) * + config::group_commit_replay_wal_retry_interval_seconds * 1000 + + (wal_info->get_retry_num() - config::group_commit_replay_wal_retry_num) * + config::group_commit_replay_wal_retry_interval_max_seconds * 1000; + } else { + replay_interval = pow(2, wal_info->get_retry_num()) * + config::group_commit_replay_wal_retry_interval_seconds * 1000; + } return UnixMillis() - wal_info->get_start_time_ms() >= replay_interval; #else return true; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org