This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.0-alpha in repository https://gitbox.apache.org/repos/asf/doris.git
commit 814f83e4534c887d048fcaca1ea39819a573c382 Author: yiguolei <676222...@qq.com> AuthorDate: Sat Apr 22 10:15:51 2023 +0800 [bugfix](memleak) UserFunctionCache may have memory leak during close (#18913) * [bugfix](memleak) UserFunctionCache may have memory leak during close * [bugfix](memleak) UserFunctionCache may have memory leak during close --------- Co-authored-by: yiguolei <yiguo...@gmail.com> --- be/src/runtime/user_function_cache.cpp | 124 ++++++--------------------------- be/src/runtime/user_function_cache.h | 25 ++----- 2 files changed, 28 insertions(+), 121 deletions(-) diff --git a/be/src/runtime/user_function_cache.cpp b/be/src/runtime/user_function_cache.cpp index 312e9ca1f2..b912fbae24 100644 --- a/be/src/runtime/user_function_cache.cpp +++ b/be/src/runtime/user_function_cache.cpp @@ -22,6 +22,7 @@ #include <vector> #include "common/config.h" +#include "common/factory_creator.h" #include "common/status.h" #include "gutil/strings/split.h" #include "http/http_client.h" @@ -38,16 +39,12 @@ static const int kLibShardNum = 128; // function cache entry, store information for struct UserFunctionCacheEntry { + ENABLE_FACTORY_CREATOR(UserFunctionCacheEntry); UserFunctionCacheEntry(int64_t fid_, const std::string& checksum_, const std::string& lib_file_, LibType type) : function_id(fid_), checksum(checksum_), lib_file(lib_file_), type(type) {} ~UserFunctionCacheEntry(); - void ref() { _refs.fetch_add(1); } - - // If unref() returns true, this object should be delete - bool unref() { return _refs.fetch_sub(1) == 1; } - int64_t function_id = 0; // used to check if this library is valid. std::string checksum; @@ -78,9 +75,6 @@ struct UserFunctionCacheEntry { std::unordered_map<std::string, void*> fptr_map; LibType type; - -private: - std::atomic<int> _refs {0}; }; UserFunctionCacheEntry::~UserFunctionCacheEntry() { @@ -104,9 +98,6 @@ UserFunctionCache::~UserFunctionCache() { while (it != _entry_map.end()) { auto entry = it->second; it = _entry_map.erase(it); - if (entry->unref()) { - delete entry; - } } } @@ -152,11 +143,9 @@ Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const std return Status::InternalError("duplicate function id"); } // create a cache entry and put it into entry map - UserFunctionCacheEntry* entry = - new UserFunctionCacheEntry(function_id, checksum, dir + "/" + file, lib_type); + std::shared_ptr<UserFunctionCacheEntry> entry = UserFunctionCacheEntry::create_shared( + function_id, checksum, dir + "/" + file, lib_type); entry->is_downloaded = true; - - entry->ref(); _entry_map[function_id] = entry; return Status::OK(); @@ -194,64 +183,11 @@ std::string get_real_symbol(const std::string& symbol) { return str2; } -Status UserFunctionCache::get_function_ptr(int64_t fid, const std::string& orig_symbol, - const std::string& url, const std::string& checksum, - void** fn_ptr, UserFunctionCacheEntry** output_entry) { - auto symbol = get_real_symbol(orig_symbol); - if (fid == 0) { - // Just loading a function ptr in the current process. No need to take any locks. - RETURN_IF_ERROR(dynamic_lookup(_current_process_handle, symbol.c_str(), fn_ptr)); - return Status::OK(); - } - - // if we need to unref entry - bool need_unref_entry = false; - UserFunctionCacheEntry* entry = nullptr; - // find the library entry for this function. If *output_entry is not null - // find symbol in it without to get other entry - if (output_entry != nullptr && *output_entry != nullptr) { - entry = *output_entry; - } else { - RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, &entry, LibType::SO)); - need_unref_entry = true; - } - - Status status; - { - std::lock_guard<SpinLock> l(entry->map_lock); - // now, we have the library entry, we need to lock it to find symbol - auto it = entry->fptr_map.find(symbol); - if (it != entry->fptr_map.end()) { - *fn_ptr = it->second; - } else { - status = dynamic_lookup(entry->lib_handle, symbol.c_str(), fn_ptr); - if (status.ok()) { - entry->fptr_map.emplace(symbol, *fn_ptr); - } else { - LOG(WARNING) << "fail to lookup symbol in library, symbol=" << symbol - << ", file=" << entry->lib_file; - } - } - } - - if (status.ok() && output_entry != nullptr && *output_entry == nullptr) { - *output_entry = entry; - need_unref_entry = false; - } - - if (need_unref_entry) { - if (entry->unref()) { - delete entry; - } - } - - return status; -} - Status UserFunctionCache::_get_cache_entry(int64_t fid, const std::string& url, const std::string& checksum, - UserFunctionCacheEntry** output_entry, LibType type) { - UserFunctionCacheEntry* entry = nullptr; + std::shared_ptr<UserFunctionCacheEntry>& output_entry, + LibType type) { + std::shared_ptr<UserFunctionCacheEntry> entry = nullptr; std::string file_name = _get_file_name_from_url(url); { std::lock_guard<std::mutex> l(_cache_lock); @@ -259,12 +195,10 @@ Status UserFunctionCache::_get_cache_entry(int64_t fid, const std::string& url, if (it != _entry_map.end()) { entry = it->second; } else { - entry = new UserFunctionCacheEntry( + entry = UserFunctionCacheEntry::create_shared( fid, checksum, _make_lib_file(fid, checksum, type, file_name), type); - entry->ref(); _entry_map.emplace(fid, entry); } - entry->ref(); } auto st = _load_cache_entry(url, entry); if (!st.ok()) { @@ -275,28 +209,21 @@ Status UserFunctionCache::_get_cache_entry(int64_t fid, const std::string& url, return st; } - *output_entry = entry; + output_entry = entry; return Status::OK(); } -void UserFunctionCache::_destroy_cache_entry(UserFunctionCacheEntry* entry) { +void UserFunctionCache::_destroy_cache_entry(std::shared_ptr<UserFunctionCacheEntry> entry) { // 1. we remove cache entry from entry map - size_t num_removed = 0; - { - std::lock_guard<std::mutex> l(_cache_lock); - num_removed = _entry_map.erase(entry->function_id); - } - if (num_removed > 0) { - entry->unref(); - } + std::lock_guard<std::mutex> l(_cache_lock); + // set should delete flag to true, so that the jar file will be removed when + // the entry is removed from map, and deconstruct method is called. entry->should_delete_library.store(true); - // now we need to drop - if (entry->unref()) { - delete entry; - } + _entry_map.erase(entry->function_id); } -Status UserFunctionCache::_load_cache_entry(const std::string& url, UserFunctionCacheEntry* entry) { +Status UserFunctionCache::_load_cache_entry(const std::string& url, + std::shared_ptr<UserFunctionCacheEntry> entry) { if (entry->is_loaded.load()) { return Status::OK(); } @@ -316,7 +243,8 @@ Status UserFunctionCache::_load_cache_entry(const std::string& url, UserFunction } // entry's lock must be held -Status UserFunctionCache::_download_lib(const std::string& url, UserFunctionCacheEntry* entry) { +Status UserFunctionCache::_download_lib(const std::string& url, + std::shared_ptr<UserFunctionCacheEntry> entry) { DCHECK(!entry->is_downloaded); // get local path to save library @@ -389,7 +317,8 @@ std::string UserFunctionCache::_get_file_name_from_url(const std::string& url) c } // entry's lock must be held -Status UserFunctionCache::_load_cache_entry_internal(UserFunctionCacheEntry* entry) { +Status UserFunctionCache::_load_cache_entry_internal( + std::shared_ptr<UserFunctionCacheEntry> entry) { RETURN_IF_ERROR(dynamic_open(entry->lib_file.c_str(), &entry->lib_handle)); entry->is_loaded.store(true); return Status::OK(); @@ -408,19 +337,10 @@ std::string UserFunctionCache::_make_lib_file(int64_t function_id, const std::st return ss.str(); } -void UserFunctionCache::release_entry(UserFunctionCacheEntry* entry) { - if (entry == nullptr) { - return; - } - if (entry->unref()) { - delete entry; - } -} - Status UserFunctionCache::get_jarpath(int64_t fid, const std::string& url, const std::string& checksum, std::string* libpath) { - UserFunctionCacheEntry* entry = nullptr; - RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, &entry, LibType::JAR)); + std::shared_ptr<UserFunctionCacheEntry> entry = nullptr; + RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, entry, LibType::JAR)); *libpath = entry->lib_file; return Status::OK(); } diff --git a/be/src/runtime/user_function_cache.h b/be/src/runtime/user_function_cache.h index b3c4aa7e80..f49b6d216d 100644 --- a/be/src/runtime/user_function_cache.h +++ b/be/src/runtime/user_function_cache.h @@ -54,19 +54,6 @@ public: static UserFunctionCache* instance(); - // Return function pointer for given fid and symbol. - // If fid is 0, lookup symbol from this doris-be process. - // Otherwise find symbol in UserFunction's library. - // Found function pointer is returned in fn_ptr, and cache entry - // is returned by entry. Client must call release_entry to release - // cache entry if didn't need it. - // If *entry is not true means that we should find symbol in this - // entry. - Status get_function_ptr(int64_t fid, const std::string& symbol, const std::string& url, - const std::string& checksum, void** fn_ptr, - UserFunctionCacheEntry** entry); - void release_entry(UserFunctionCacheEntry* entry); - Status get_jarpath(int64_t fid, const std::string& url, const std::string& checksum, std::string* libpath); @@ -74,14 +61,14 @@ private: Status _load_cached_lib(); Status _load_entry_from_lib(const std::string& dir, const std::string& file); Status _get_cache_entry(int64_t fid, const std::string& url, const std::string& checksum, - UserFunctionCacheEntry** output_entry, LibType type); - Status _load_cache_entry(const std::string& url, UserFunctionCacheEntry* entry); - Status _download_lib(const std::string& url, UserFunctionCacheEntry* entry); - Status _load_cache_entry_internal(UserFunctionCacheEntry* entry); + std::shared_ptr<UserFunctionCacheEntry>& output_entry, LibType type); + Status _load_cache_entry(const std::string& url, std::shared_ptr<UserFunctionCacheEntry> entry); + Status _download_lib(const std::string& url, std::shared_ptr<UserFunctionCacheEntry> entry); + Status _load_cache_entry_internal(std::shared_ptr<UserFunctionCacheEntry> entry); std::string _make_lib_file(int64_t function_id, const std::string& checksum, LibType type, const std::string& file_name); - void _destroy_cache_entry(UserFunctionCacheEntry* entry); + void _destroy_cache_entry(std::shared_ptr<UserFunctionCacheEntry> entry); std::string _get_real_url(const std::string& url); std::string _get_file_name_from_url(const std::string& url) const; @@ -91,7 +78,7 @@ private: void* _current_process_handle = nullptr; std::mutex _cache_lock; - std::unordered_map<int64_t, UserFunctionCacheEntry*> _entry_map; + std::unordered_map<int64_t, std::shared_ptr<UserFunctionCacheEntry>> _entry_map; }; } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org