github-actions[bot] commented on code in PR #28757: URL: https://github.com/apache/doris/pull/28757#discussion_r1433100484
########## be/src/olap/wal_manager.cpp: ########## @@ -410,4 +419,80 @@ Status WalManager::get_wal_column_index(int64_t wal_id, std::vector<size_t>& col return Status::OK(); } +Status WalManager::add_wal_cv_map(int64_t wal_id, std::shared_ptr<std::mutex> lock, Review Comment: warning: method 'add_wal_cv_map' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status WalManager::add_wal_cv_map(int64_t wal_id, std::shared_ptr<std::mutex> lock, ``` ########## be/src/olap/wal_manager.cpp: ########## @@ -410,4 +419,80 @@ return Status::OK(); } +Status WalManager::add_wal_cv_map(int64_t wal_id, std::shared_ptr<std::mutex> lock, + std::shared_ptr<std::condition_variable> cv) { + std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock); + auto it = _wal_lock_map.find(wal_id); + if (it != _wal_lock_map.end()) { + return Status::InternalError("wal {} is already in _wal_cv_map ", wal_id); + } + _wal_lock_map.emplace(wal_id, lock); + _wal_cv_map.emplace(wal_id, cv); + LOG(INFO) << "add " << wal_id << " to _wal_cv_map"; + return Status::OK(); +} +Status WalManager::erase_wal_cv_map(int64_t wal_id) { Review Comment: warning: method 'erase_wal_cv_map' can be made static [readability-convert-member-functions-to-static] be/src/olap/wal_manager.h:76: ```diff - Status erase_wal_cv_map(int64_t wal_id); + static Status erase_wal_cv_map(int64_t wal_id); ``` ########## be/src/olap/wal_manager.cpp: ########## @@ -410,4 +419,80 @@ return Status::OK(); } +Status WalManager::add_wal_cv_map(int64_t wal_id, std::shared_ptr<std::mutex> lock, + std::shared_ptr<std::condition_variable> cv) { + std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock); + auto it = _wal_lock_map.find(wal_id); + if (it != _wal_lock_map.end()) { + return Status::InternalError("wal {} is already in _wal_cv_map ", wal_id); + } + _wal_lock_map.emplace(wal_id, lock); + _wal_cv_map.emplace(wal_id, cv); + LOG(INFO) << "add " << wal_id << " to _wal_cv_map"; + return Status::OK(); +} +Status WalManager::erase_wal_cv_map(int64_t wal_id) { + std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock); + if (_wal_lock_map.erase(wal_id) && _wal_cv_map.erase(wal_id)) { + LOG(INFO) << "erase " << wal_id << " from _wal_cv_map"; + } else { + return Status::InternalError("fail to erase wal {} from wal_cv_map", wal_id); + } + return Status::OK(); +} +Status WalManager::wait_relay_wal_finish(int64_t wal_id) { + std::shared_ptr<std::mutex> lock = nullptr; + std::shared_ptr<std::condition_variable> cv = nullptr; + auto st = get_lock_and_cv(wal_id, lock, cv); + if (st.ok()) { + std::unique_lock l(*(lock)); + LOG(INFO) << "start wait " << wal_id; + if (cv->wait_for(l, std::chrono::seconds(180)) == std::cv_status::timeout) { + LOG(WARNING) << "wait for " << wal_id << " is time out"; + } + LOG(INFO) << "get wal " << wal_id << ",finish wait"; + RETURN_IF_ERROR(erase_wal_cv_map(wal_id)); + LOG(INFO) << "erase wal " << wal_id; + } + return Status::OK(); +} +Status WalManager::notify(int64_t wal_id) { + std::shared_ptr<std::mutex> lock = nullptr; + std::shared_ptr<std::condition_variable> cv = nullptr; + auto st = get_lock_and_cv(wal_id, lock, cv); + if (st.ok()) { + std::unique_lock l(*(lock)); + cv->notify_all(); + LOG(INFO) << "get wal " << wal_id << ",notify all"; + } + return Status::OK(); +} +Status WalManager::get_lock_and_cv(int64_t wal_id, std::shared_ptr<std::mutex>& lock, Review Comment: warning: method 'get_lock_and_cv' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status WalManager::get_lock_and_cv(int64_t wal_id, std::shared_ptr<std::mutex>& lock, ``` ########## be/src/olap/wal_manager.cpp: ########## @@ -410,4 +419,80 @@ return Status::OK(); } +Status WalManager::add_wal_cv_map(int64_t wal_id, std::shared_ptr<std::mutex> lock, + std::shared_ptr<std::condition_variable> cv) { + std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock); + auto it = _wal_lock_map.find(wal_id); + if (it != _wal_lock_map.end()) { + return Status::InternalError("wal {} is already in _wal_cv_map ", wal_id); + } + _wal_lock_map.emplace(wal_id, lock); + _wal_cv_map.emplace(wal_id, cv); + LOG(INFO) << "add " << wal_id << " to _wal_cv_map"; + return Status::OK(); +} +Status WalManager::erase_wal_cv_map(int64_t wal_id) { + std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock); + if (_wal_lock_map.erase(wal_id) && _wal_cv_map.erase(wal_id)) { + LOG(INFO) << "erase " << wal_id << " from _wal_cv_map"; + } else { + return Status::InternalError("fail to erase wal {} from wal_cv_map", wal_id); + } + return Status::OK(); +} +Status WalManager::wait_relay_wal_finish(int64_t wal_id) { + std::shared_ptr<std::mutex> lock = nullptr; + std::shared_ptr<std::condition_variable> cv = nullptr; + auto st = get_lock_and_cv(wal_id, lock, cv); + if (st.ok()) { + std::unique_lock l(*(lock)); + LOG(INFO) << "start wait " << wal_id; + if (cv->wait_for(l, std::chrono::seconds(180)) == std::cv_status::timeout) { + LOG(WARNING) << "wait for " << wal_id << " is time out"; + } + LOG(INFO) << "get wal " << wal_id << ",finish wait"; + RETURN_IF_ERROR(erase_wal_cv_map(wal_id)); + LOG(INFO) << "erase wal " << wal_id; + } + return Status::OK(); +} +Status WalManager::notify(int64_t wal_id) { + std::shared_ptr<std::mutex> lock = nullptr; + std::shared_ptr<std::condition_variable> cv = nullptr; + auto st = get_lock_and_cv(wal_id, lock, cv); + if (st.ok()) { + std::unique_lock l(*(lock)); + cv->notify_all(); + LOG(INFO) << "get wal " << wal_id << ",notify all"; + } + return Status::OK(); +} +Status WalManager::get_lock_and_cv(int64_t wal_id, std::shared_ptr<std::mutex>& lock, + std::shared_ptr<std::condition_variable>& cv) { + std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock); + auto lock_it = _wal_lock_map.find(wal_id); + if (lock_it == _wal_lock_map.end()) { + return Status::InternalError("cannot find txn {} in wal_lock_map", wal_id); + } + lock = lock_it->second; + auto cv_it = _wal_cv_map.find(wal_id); + if (cv_it == _wal_cv_map.end()) { + return Status::InternalError("cannot find txn {} in wal_cv_map", wal_id); + } + cv = cv_it->second; + return Status::OK(); +} + +bool WalManager::find_wal_path(int64_t wal_id) { Review Comment: warning: method 'find_wal_path' can be made static [readability-convert-member-functions-to-static] be/src/olap/wal_manager.h:80: ```diff - bool find_wal_path(int64_t wal_id); + static bool find_wal_path(int64_t wal_id); ``` ########## be/src/olap/wal_manager.cpp: ########## @@ -410,4 +419,80 @@ return Status::OK(); } +Status WalManager::add_wal_cv_map(int64_t wal_id, std::shared_ptr<std::mutex> lock, + std::shared_ptr<std::condition_variable> cv) { + std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock); + auto it = _wal_lock_map.find(wal_id); + if (it != _wal_lock_map.end()) { + return Status::InternalError("wal {} is already in _wal_cv_map ", wal_id); + } + _wal_lock_map.emplace(wal_id, lock); + _wal_cv_map.emplace(wal_id, cv); + LOG(INFO) << "add " << wal_id << " to _wal_cv_map"; + return Status::OK(); +} +Status WalManager::erase_wal_cv_map(int64_t wal_id) { + std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock); + if (_wal_lock_map.erase(wal_id) && _wal_cv_map.erase(wal_id)) { + LOG(INFO) << "erase " << wal_id << " from _wal_cv_map"; + } else { + return Status::InternalError("fail to erase wal {} from wal_cv_map", wal_id); + } + return Status::OK(); +} +Status WalManager::wait_relay_wal_finish(int64_t wal_id) { + std::shared_ptr<std::mutex> lock = nullptr; + std::shared_ptr<std::condition_variable> cv = nullptr; + auto st = get_lock_and_cv(wal_id, lock, cv); + if (st.ok()) { + std::unique_lock l(*(lock)); + LOG(INFO) << "start wait " << wal_id; + if (cv->wait_for(l, std::chrono::seconds(180)) == std::cv_status::timeout) { + LOG(WARNING) << "wait for " << wal_id << " is time out"; + } + LOG(INFO) << "get wal " << wal_id << ",finish wait"; + RETURN_IF_ERROR(erase_wal_cv_map(wal_id)); + LOG(INFO) << "erase wal " << wal_id; + } + return Status::OK(); +} +Status WalManager::notify(int64_t wal_id) { Review Comment: warning: method 'notify' can be made static [readability-convert-member-functions-to-static] be/src/olap/wal_manager.h:81: ```diff - Status notify(int64_t wal_id); + static Status notify(int64_t wal_id); ``` ########## be/src/olap/wal_manager.cpp: ########## @@ -410,4 +419,80 @@ return Status::OK(); } +Status WalManager::add_wal_cv_map(int64_t wal_id, std::shared_ptr<std::mutex> lock, + std::shared_ptr<std::condition_variable> cv) { + std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock); + auto it = _wal_lock_map.find(wal_id); + if (it != _wal_lock_map.end()) { + return Status::InternalError("wal {} is already in _wal_cv_map ", wal_id); + } + _wal_lock_map.emplace(wal_id, lock); + _wal_cv_map.emplace(wal_id, cv); + LOG(INFO) << "add " << wal_id << " to _wal_cv_map"; + return Status::OK(); +} +Status WalManager::erase_wal_cv_map(int64_t wal_id) { + std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock); + if (_wal_lock_map.erase(wal_id) && _wal_cv_map.erase(wal_id)) { + LOG(INFO) << "erase " << wal_id << " from _wal_cv_map"; + } else { + return Status::InternalError("fail to erase wal {} from wal_cv_map", wal_id); + } + return Status::OK(); +} +Status WalManager::wait_relay_wal_finish(int64_t wal_id) { + std::shared_ptr<std::mutex> lock = nullptr; + std::shared_ptr<std::condition_variable> cv = nullptr; + auto st = get_lock_and_cv(wal_id, lock, cv); + if (st.ok()) { + std::unique_lock l(*(lock)); + LOG(INFO) << "start wait " << wal_id; + if (cv->wait_for(l, std::chrono::seconds(180)) == std::cv_status::timeout) { + LOG(WARNING) << "wait for " << wal_id << " is time out"; + } + LOG(INFO) << "get wal " << wal_id << ",finish wait"; + RETURN_IF_ERROR(erase_wal_cv_map(wal_id)); + LOG(INFO) << "erase wal " << wal_id; + } + return Status::OK(); +} +Status WalManager::notify(int64_t wal_id) { + std::shared_ptr<std::mutex> lock = nullptr; + std::shared_ptr<std::condition_variable> cv = nullptr; + auto st = get_lock_and_cv(wal_id, lock, cv); + if (st.ok()) { + std::unique_lock l(*(lock)); + cv->notify_all(); + LOG(INFO) << "get wal " << wal_id << ",notify all"; + } + return Status::OK(); +} +Status WalManager::get_lock_and_cv(int64_t wal_id, std::shared_ptr<std::mutex>& lock, + std::shared_ptr<std::condition_variable>& cv) { + std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock); + auto lock_it = _wal_lock_map.find(wal_id); + if (lock_it == _wal_lock_map.end()) { + return Status::InternalError("cannot find txn {} in wal_lock_map", wal_id); + } + lock = lock_it->second; + auto cv_it = _wal_cv_map.find(wal_id); + if (cv_it == _wal_cv_map.end()) { + return Status::InternalError("cannot find txn {} in wal_cv_map", wal_id); + } + cv = cv_it->second; + return Status::OK(); +} + +bool WalManager::find_wal_path(int64_t wal_id) { + std::shared_lock rdlock(_wal_lock); + auto it = _wal_path_map.find(wal_id); + if (it != _wal_path_map.end()) { + LOG(INFO) << "find wal path " << it->second; + return true; Review Comment: warning: redundant boolean literal in conditional return statement [readability-simplify-boolean-expr] be/src/olap/wal_manager.cpp:488: ```diff - if (it != _wal_path_map.end()) { - LOG(INFO) << "find wal path " << it->second; - return true; - } else { - LOG(INFO) << "not find wal path for " << wal_id; - return false; - } + return it != _wal_path_map.end(); ``` ########## be/src/runtime/runtime_state.h: ########## @@ -256,6 +256,10 @@ class RuntimeState { int64_t wal_id() { return _wal_id; } + void set_txn_id(int64_t txn_id) { _txn_id = txn_id; } + + int64_t txn_id() { return _txn_id; } Review Comment: warning: method 'txn_id' can be made const [readability-make-member-function-const] ```suggestion int64_t txn_id() const { return _txn_id; } ``` ########## be/src/olap/wal_table.h: ########## @@ -15,15 +15,22 @@ // specific language governing permissions and limitations // under the License. #pragma once +#include <event2/bufferevent.h> Review Comment: warning: 'event2/bufferevent.h' file not found [clang-diagnostic-error] ```cpp #include <event2/bufferevent.h> ^ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org