This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 352c8e369451e81dff65dbcea0a3d2a0932784f6
Author: huanghaibin <284824...@qq.com>
AuthorDate: Thu Feb 8 11:48:49 2024 +0800

    [improvement](group_commit)  Rename fail wal to tmp should only use in test 
P0 scenario (#30959)
---
 be/src/common/config.cpp       |  1 +
 be/src/common/config.h         |  1 +
 be/src/olap/wal/wal_info.cpp   |  2 +-
 be/src/olap/wal/wal_info.h     |  4 ++--
 be/src/olap/wal/wal_reader.cpp | 14 +++++++++++---
 be/src/olap/wal/wal_table.cpp  | 23 +++++++++++++++--------
 6 files changed, 31 insertions(+), 14 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 873be1c5f5e..9ac754b6799 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1119,6 +1119,7 @@ DEFINE_Int16(bitmap_serialize_version, "1");
 DEFINE_String(group_commit_wal_path, "");
 DEFINE_Int32(group_commit_replay_wal_retry_num, "10");
 DEFINE_Int32(group_commit_replay_wal_retry_interval_seconds, "5");
+DEFINE_Int32(group_commit_replay_wal_retry_interval_max_seconds, "1800");
 DEFINE_Int32(group_commit_relay_wal_threads, "10");
 // This config can be set to limit thread number in group commit request 
fragment thread pool.
 DEFINE_Int32(group_commit_insert_threads, "10");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 7aa18b0d614..e4ddd552a98 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1186,6 +1186,7 @@ DECLARE_Int16(bitmap_serialize_version);
 DECLARE_String(group_commit_wal_path);
 DECLARE_Int32(group_commit_replay_wal_retry_num);
 DECLARE_Int32(group_commit_replay_wal_retry_interval_seconds);
+DECLARE_Int32(group_commit_replay_wal_retry_interval_max_seconds);
 DECLARE_mInt32(group_commit_relay_wal_threads);
 // This config can be set to limit thread number in group commit request 
fragment thread pool.
 DECLARE_mInt32(group_commit_insert_threads);
diff --git a/be/src/olap/wal/wal_info.cpp b/be/src/olap/wal/wal_info.cpp
index 3c6fc4190b4..b517966f635 100644
--- a/be/src/olap/wal/wal_info.cpp
+++ b/be/src/olap/wal/wal_info.cpp
@@ -32,7 +32,7 @@ std::string WalInfo::get_wal_path() {
     return _wal_path;
 }
 
-int64_t WalInfo::get_retry_num() {
+int32_t WalInfo::get_retry_num() {
     return _retry_num;
 }
 
diff --git a/be/src/olap/wal/wal_info.h b/be/src/olap/wal/wal_info.h
index 0365e23abd4..5fadc36627c 100644
--- a/be/src/olap/wal/wal_info.h
+++ b/be/src/olap/wal/wal_info.h
@@ -23,7 +23,7 @@ public:
     WalInfo(int64_t wal_id, std::string wal_path, int64_t retry_num, int64_t 
start_time_ms);
     ~WalInfo() = default;
     int64_t get_wal_id();
-    int64_t get_retry_num();
+    int32_t get_retry_num();
     int64_t get_start_time_ms();
     std::string get_wal_path();
     void add_retry_num();
@@ -31,7 +31,7 @@ public:
 private:
     int64_t _wal_id;
     std::string _wal_path;
-    int64_t _retry_num;
+    int32_t _retry_num;
     int64_t _start_time_ms;
 };
 
diff --git a/be/src/olap/wal/wal_reader.cpp b/be/src/olap/wal/wal_reader.cpp
index bac073e3034..0dfaa18a241 100644
--- a/be/src/olap/wal/wal_reader.cpp
+++ b/be/src/olap/wal/wal_reader.cpp
@@ -39,14 +39,22 @@ static Status _deserialize(PBlock& block, const 
std::string& buf) {
 }
 
 Status WalReader::init() {
+    bool exists = false;
+    RETURN_IF_ERROR(io::global_local_filesystem()->exists(_file_name, 
&exists));
+    if (!exists) {
+        LOG(WARNING) << "not exist wal= " << _file_name;
+        return Status::NotFound("wal {} doesn't exist", _file_name);
+    }
     RETURN_IF_ERROR(io::global_local_filesystem()->open_file(_file_name, 
&file_reader));
     return Status::OK();
 }
 
 Status WalReader::finalize() {
-    auto st = file_reader->close();
-    if (!st.ok()) {
-        LOG(WARNING) << "fail to close wal " << _file_name;
+    if (file_reader != nullptr) {
+        auto st = file_reader->close();
+        if (!st.ok()) {
+            LOG(WARNING) << "fail to close wal " << _file_name << " st= " << 
st.to_string();
+        }
     }
     return Status::OK();
 }
diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp
index dabbf0596e9..9b2d1338f24 100644
--- a/be/src/olap/wal/wal_table.cpp
+++ b/be/src/olap/wal/wal_table.cpp
@@ -59,16 +59,15 @@ void WalTable::_pick_relay_wals() {
     std::vector<std::string> need_replay_wals;
     std::vector<std::string> need_erase_wals;
     for (const auto& [wal_path, wal_info] : _replay_wal_map) {
-        if (wal_info->get_retry_num() >= 
config::group_commit_replay_wal_retry_num) {
+        if (config::group_commit_wait_replay_wal_finish &&
+            wal_info->get_retry_num() >= 
config::group_commit_replay_wal_retry_num) {
             LOG(WARNING) << "failed to replay wal=" << wal_path << " after 
retry "
                          << wal_info->get_retry_num() << " times";
             [[maybe_unused]] auto st = 
_exec_env->wal_mgr()->rename_to_tmp_path(
                     wal_path, _table_id, wal_info->get_wal_id());
-            if (config::group_commit_wait_replay_wal_finish) {
-                auto notify_st = 
_exec_env->wal_mgr()->notify_relay_wal(wal_info->get_wal_id());
-                if (!notify_st.ok()) {
-                    LOG(WARNING) << "notify wal " << wal_info->get_wal_id() << 
" fail";
-                }
+            auto notify_st = 
_exec_env->wal_mgr()->notify_relay_wal(wal_info->get_wal_id());
+            if (!notify_st.ok()) {
+                LOG(WARNING) << "notify wal " << wal_info->get_wal_id() << " 
fail";
             }
             need_erase_wals.push_back(wal_path);
             continue;
@@ -153,8 +152,16 @@ bool WalTable::_need_replay(std::shared_ptr<WalInfo> 
wal_info) {
         return true;
     }
 #ifndef BE_TEST
-    auto replay_interval = pow(2, wal_info->get_retry_num()) *
-                           
config::group_commit_replay_wal_retry_interval_seconds * 1000;
+    auto replay_interval = 0;
+    if (wal_info->get_retry_num() >= 
config::group_commit_replay_wal_retry_num) {
+        replay_interval = pow(2, config::group_commit_replay_wal_retry_num) *
+                                  
config::group_commit_replay_wal_retry_interval_seconds * 1000 +
+                          (wal_info->get_retry_num() - 
config::group_commit_replay_wal_retry_num) *
+                                  
config::group_commit_replay_wal_retry_interval_max_seconds * 1000;
+    } else {
+        replay_interval = pow(2, wal_info->get_retry_num()) *
+                          
config::group_commit_replay_wal_retry_interval_seconds * 1000;
+    }
     return UnixMillis() - wal_info->get_start_time_ms() >= replay_interval;
 #else
     return true;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to