github-actions[bot] commented on code in PR #29560:
URL: https://github.com/apache/doris/pull/29560#discussion_r1442570565


##########
be/src/olap/wal/wal_manager.cpp:
##########
@@ -507,4 +427,114 @@ std::string WalManager::_get_base_wal_path(const 
std::string& wal_path_str) {
     return wal_path.string();
 }
 
+Status WalManager::delete_wal(int64_t table_id, int64_t wal_id, size_t 
block_queue_pre_allocated) {
+    std::string wal_path;
+    {
+        std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock);
+        auto it = _wal_path_map.find(wal_id);
+        if (it != _wal_path_map.end()) {
+            wal_path = it->second;
+            
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(wal_path));
+            LOG(INFO) << "delete file=" << wal_path;
+            _wal_path_map.erase(wal_id);
+            erase_wal_queue(table_id, wal_id);
+        }
+    }
+    RETURN_IF_ERROR(update_wal_dir_pre_allocated(_get_base_wal_path(wal_path), 
0,
+                                                 block_queue_pre_allocated));
+    return Status::OK();
+}
+
+Status WalManager::rename_to_tmp_path(const std::string wal, int64_t table_id, 
int64_t wal_id) {
+    io::Path wal_path = wal;
+    std::list<std::string> path_element;
+    for (int i = 0; i < 3; ++i) {
+        if (!wal_path.has_parent_path()) {
+            return Status::InternalError("parent path is not enough when 
rename " + wal);
+        }
+        path_element.push_front(wal_path.filename().string());
+        wal_path = wal_path.parent_path();
+    }
+    wal_path.append(_tmp);
+    for (auto path : path_element) {
+        wal_path.append(path);
+    }
+    bool exists = false;
+    
RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_path.parent_path(), 
&exists));
+    if (!exists) {
+        
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_path.parent_path()));
+    }
+    auto res = std::rename(wal.c_str(), wal_path.string().c_str());
+    if (res < 0) {
+        return Status::InternalError("rename fail on path " + wal);
+    }
+    LOG(INFO) << "rename wal from " << wal << " to " << wal_path.string();
+    erase_wal_queue(table_id, wal_id);
+    return Status::OK();
+}
+
+Status WalManager::add_wal_cv_map(int64_t wal_id, std::shared_ptr<std::mutex> 
lock,

Review Comment:
   warning: method 'add_wal_cv_map' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static Status WalManager::add_wal_cv_map(int64_t wal_id, 
std::shared_ptr<std::mutex> lock,
   ```
   



##########
be/src/olap/wal/wal_manager.cpp:
##########
@@ -507,4 +427,114 @@
     return wal_path.string();
 }
 
+Status WalManager::delete_wal(int64_t table_id, int64_t wal_id, size_t 
block_queue_pre_allocated) {
+    std::string wal_path;
+    {
+        std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock);
+        auto it = _wal_path_map.find(wal_id);
+        if (it != _wal_path_map.end()) {
+            wal_path = it->second;
+            
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(wal_path));
+            LOG(INFO) << "delete file=" << wal_path;
+            _wal_path_map.erase(wal_id);
+            erase_wal_queue(table_id, wal_id);
+        }
+    }
+    RETURN_IF_ERROR(update_wal_dir_pre_allocated(_get_base_wal_path(wal_path), 
0,
+                                                 block_queue_pre_allocated));
+    return Status::OK();
+}
+
+Status WalManager::rename_to_tmp_path(const std::string wal, int64_t table_id, 
int64_t wal_id) {
+    io::Path wal_path = wal;
+    std::list<std::string> path_element;
+    for (int i = 0; i < 3; ++i) {
+        if (!wal_path.has_parent_path()) {
+            return Status::InternalError("parent path is not enough when 
rename " + wal);
+        }
+        path_element.push_front(wal_path.filename().string());
+        wal_path = wal_path.parent_path();
+    }
+    wal_path.append(_tmp);
+    for (auto path : path_element) {
+        wal_path.append(path);
+    }
+    bool exists = false;
+    
RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_path.parent_path(), 
&exists));
+    if (!exists) {
+        
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_path.parent_path()));
+    }
+    auto res = std::rename(wal.c_str(), wal_path.string().c_str());
+    if (res < 0) {
+        return Status::InternalError("rename fail on path " + wal);
+    }
+    LOG(INFO) << "rename wal from " << wal << " to " << wal_path.string();
+    erase_wal_queue(table_id, wal_id);
+    return Status::OK();
+}
+
+Status WalManager::add_wal_cv_map(int64_t wal_id, std::shared_ptr<std::mutex> 
lock,
+                                  std::shared_ptr<std::condition_variable> cv) 
{
+    std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock);
+    auto it = _wal_cv_map.find(wal_id);
+    if (it != _wal_cv_map.end()) {
+        return Status::InternalError("wal {} is already in _wal_cv_map ", 
wal_id);
+    }
+    auto pair = std::make_pair(lock, cv);
+    _wal_cv_map.emplace(wal_id, pair);
+    LOG(INFO) << "add  " << wal_id << " to _wal_cv_map";
+    return Status::OK();
+}
+
+Status WalManager::erase_wal_cv_map(int64_t wal_id) {
+    std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock);
+    if (_wal_cv_map.erase(wal_id)) {
+        LOG(INFO) << "erase " << wal_id << " from _wal_cv_map";
+    } else {
+        return Status::InternalError("fail to erase wal {} from wal_cv_map", 
wal_id);
+    }
+    return Status::OK();
+}
+
+Status WalManager::wait_replay_wal_finish(int64_t wal_id) {
+    std::shared_ptr<std::mutex> lock = nullptr;
+    std::shared_ptr<std::condition_variable> cv = nullptr;
+    auto st = get_lock_and_cv(wal_id, lock, cv);
+    if (st.ok()) {
+        std::unique_lock l(*(lock));
+        LOG(INFO) << "start wait " << wal_id;
+        if (cv->wait_for(l, std::chrono::seconds(180)) == 
std::cv_status::timeout) {
+            LOG(WARNING) << "wait for " << wal_id << " is time out";
+        }
+        LOG(INFO) << "get wal " << wal_id << ",finish wait";
+        RETURN_IF_ERROR(erase_wal_cv_map(wal_id));
+        LOG(INFO) << "erase wal " << wal_id;
+    }
+    return Status::OK();
+}
+
+Status WalManager::notify_relay_wal(int64_t wal_id) {
+    std::shared_ptr<std::mutex> lock = nullptr;
+    std::shared_ptr<std::condition_variable> cv = nullptr;
+    auto st = get_lock_and_cv(wal_id, lock, cv);
+    if (st.ok()) {
+        std::unique_lock l(*(lock));
+        cv->notify_all();
+        LOG(INFO) << "get wal " << wal_id << ",notify all";
+    }
+    return Status::OK();
+}
+
+Status WalManager::get_lock_and_cv(int64_t wal_id, 
std::shared_ptr<std::mutex>& lock,

Review Comment:
   warning: method 'get_lock_and_cv' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static Status WalManager::get_lock_and_cv(int64_t wal_id, 
std::shared_ptr<std::mutex>& lock,
   ```
   



##########
be/test/olap/wal_manager_test.cpp:
##########
@@ -64,23 +64,37 @@ class WalManagerTest : public testing::Test {
         _env->_function_client_cache = new 
BrpcClientCache<PFunctionService_Stub>();
         _env->_stream_load_executor = StreamLoadExecutor::create_shared(_env);
         _env->_store_paths = {StorePath(std::filesystem::current_path(), 0)};
-        _env->_wal_manager = WalManager::create_shared(_env, wal_dir);
+        _env->_wal_manager = WalManager::create_shared(_env, wal_dir.string());
         k_stream_load_begin_result = TLoadTxnBeginResult();
     }
     void TearDown() override {
-        
static_cast<void>(io::global_local_filesystem()->delete_directory(wal_dir));
+        Status st = io::global_local_filesystem()->delete_directory(wal_dir);
+        if (!st.ok()) {
+            LOG(WARNING) << "fail to delete " << wal_dir.string();
+        }
         SAFE_STOP(_env->_wal_manager);
         SAFE_DELETE(_env->_function_client_cache);
         SAFE_DELETE(_env->_internal_client_cache);
         SAFE_DELETE(_env->_master_info);
     }
 
-    void prepare() { 
static_cast<void>(io::global_local_filesystem()->create_directory(wal_dir)); }
+    void prepare() {

Review Comment:
   warning: method 'prepare' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static void prepare() {
   ```
   



##########
be/src/olap/wal/wal_manager.cpp:
##########
@@ -507,4 +427,114 @@
     return wal_path.string();
 }
 
+Status WalManager::delete_wal(int64_t table_id, int64_t wal_id, size_t 
block_queue_pre_allocated) {
+    std::string wal_path;
+    {
+        std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock);
+        auto it = _wal_path_map.find(wal_id);
+        if (it != _wal_path_map.end()) {
+            wal_path = it->second;
+            
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(wal_path));
+            LOG(INFO) << "delete file=" << wal_path;
+            _wal_path_map.erase(wal_id);
+            erase_wal_queue(table_id, wal_id);
+        }
+    }
+    RETURN_IF_ERROR(update_wal_dir_pre_allocated(_get_base_wal_path(wal_path), 
0,
+                                                 block_queue_pre_allocated));
+    return Status::OK();
+}
+
+Status WalManager::rename_to_tmp_path(const std::string wal, int64_t table_id, 
int64_t wal_id) {
+    io::Path wal_path = wal;
+    std::list<std::string> path_element;
+    for (int i = 0; i < 3; ++i) {
+        if (!wal_path.has_parent_path()) {
+            return Status::InternalError("parent path is not enough when 
rename " + wal);
+        }
+        path_element.push_front(wal_path.filename().string());
+        wal_path = wal_path.parent_path();
+    }
+    wal_path.append(_tmp);
+    for (auto path : path_element) {
+        wal_path.append(path);
+    }
+    bool exists = false;
+    
RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_path.parent_path(), 
&exists));
+    if (!exists) {
+        
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_path.parent_path()));
+    }
+    auto res = std::rename(wal.c_str(), wal_path.string().c_str());
+    if (res < 0) {
+        return Status::InternalError("rename fail on path " + wal);
+    }
+    LOG(INFO) << "rename wal from " << wal << " to " << wal_path.string();
+    erase_wal_queue(table_id, wal_id);
+    return Status::OK();
+}
+
+Status WalManager::add_wal_cv_map(int64_t wal_id, std::shared_ptr<std::mutex> 
lock,
+                                  std::shared_ptr<std::condition_variable> cv) 
{
+    std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock);
+    auto it = _wal_cv_map.find(wal_id);
+    if (it != _wal_cv_map.end()) {
+        return Status::InternalError("wal {} is already in _wal_cv_map ", 
wal_id);
+    }
+    auto pair = std::make_pair(lock, cv);
+    _wal_cv_map.emplace(wal_id, pair);
+    LOG(INFO) << "add  " << wal_id << " to _wal_cv_map";
+    return Status::OK();
+}
+
+Status WalManager::erase_wal_cv_map(int64_t wal_id) {

Review Comment:
   warning: method 'erase_wal_cv_map' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/olap/wal/wal_manager.h:99:
   ```diff
   -     Status erase_wal_cv_map(int64_t wal_id);
   +     static Status erase_wal_cv_map(int64_t wal_id);
   ```
   



##########
be/src/olap/wal/wal_manager.cpp:
##########
@@ -507,4 +427,114 @@
     return wal_path.string();
 }
 
+Status WalManager::delete_wal(int64_t table_id, int64_t wal_id, size_t 
block_queue_pre_allocated) {
+    std::string wal_path;
+    {
+        std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock);
+        auto it = _wal_path_map.find(wal_id);
+        if (it != _wal_path_map.end()) {
+            wal_path = it->second;
+            
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(wal_path));
+            LOG(INFO) << "delete file=" << wal_path;
+            _wal_path_map.erase(wal_id);
+            erase_wal_queue(table_id, wal_id);
+        }
+    }
+    RETURN_IF_ERROR(update_wal_dir_pre_allocated(_get_base_wal_path(wal_path), 
0,
+                                                 block_queue_pre_allocated));
+    return Status::OK();
+}
+
+Status WalManager::rename_to_tmp_path(const std::string wal, int64_t table_id, 
int64_t wal_id) {
+    io::Path wal_path = wal;
+    std::list<std::string> path_element;
+    for (int i = 0; i < 3; ++i) {
+        if (!wal_path.has_parent_path()) {
+            return Status::InternalError("parent path is not enough when 
rename " + wal);
+        }
+        path_element.push_front(wal_path.filename().string());
+        wal_path = wal_path.parent_path();
+    }
+    wal_path.append(_tmp);
+    for (auto path : path_element) {
+        wal_path.append(path);
+    }
+    bool exists = false;
+    
RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_path.parent_path(), 
&exists));
+    if (!exists) {
+        
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_path.parent_path()));
+    }
+    auto res = std::rename(wal.c_str(), wal_path.string().c_str());
+    if (res < 0) {
+        return Status::InternalError("rename fail on path " + wal);
+    }
+    LOG(INFO) << "rename wal from " << wal << " to " << wal_path.string();
+    erase_wal_queue(table_id, wal_id);
+    return Status::OK();
+}
+
+Status WalManager::add_wal_cv_map(int64_t wal_id, std::shared_ptr<std::mutex> 
lock,
+                                  std::shared_ptr<std::condition_variable> cv) 
{
+    std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock);
+    auto it = _wal_cv_map.find(wal_id);
+    if (it != _wal_cv_map.end()) {
+        return Status::InternalError("wal {} is already in _wal_cv_map ", 
wal_id);
+    }
+    auto pair = std::make_pair(lock, cv);
+    _wal_cv_map.emplace(wal_id, pair);
+    LOG(INFO) << "add  " << wal_id << " to _wal_cv_map";
+    return Status::OK();
+}
+
+Status WalManager::erase_wal_cv_map(int64_t wal_id) {
+    std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock);
+    if (_wal_cv_map.erase(wal_id)) {
+        LOG(INFO) << "erase " << wal_id << " from _wal_cv_map";
+    } else {
+        return Status::InternalError("fail to erase wal {} from wal_cv_map", 
wal_id);
+    }
+    return Status::OK();
+}
+
+Status WalManager::wait_replay_wal_finish(int64_t wal_id) {
+    std::shared_ptr<std::mutex> lock = nullptr;
+    std::shared_ptr<std::condition_variable> cv = nullptr;
+    auto st = get_lock_and_cv(wal_id, lock, cv);
+    if (st.ok()) {
+        std::unique_lock l(*(lock));
+        LOG(INFO) << "start wait " << wal_id;
+        if (cv->wait_for(l, std::chrono::seconds(180)) == 
std::cv_status::timeout) {
+            LOG(WARNING) << "wait for " << wal_id << " is time out";
+        }
+        LOG(INFO) << "get wal " << wal_id << ",finish wait";
+        RETURN_IF_ERROR(erase_wal_cv_map(wal_id));
+        LOG(INFO) << "erase wal " << wal_id;
+    }
+    return Status::OK();
+}
+
+Status WalManager::notify_relay_wal(int64_t wal_id) {

Review Comment:
   warning: method 'notify_relay_wal' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/olap/wal/wal_manager.h:103:
   ```diff
   -     Status notify_relay_wal(int64_t wal_id);
   +     static Status notify_relay_wal(int64_t wal_id);
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to