This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit d525f576e196507a1064480eebae42404926d3b9 Author: Lightman <31928846+lchangli...@users.noreply.github.com> AuthorDate: Thu Jan 11 20:41:31 2024 +0800 [improve] Use lru cache to count the number of column in tablet schema to control memory (#29668) --- be/src/common/config.cpp | 1 + be/src/common/config.h | 2 + be/src/olap/base_tablet.cpp | 1 - be/src/olap/rowset/beta_rowset_writer.cpp | 2 +- be/src/olap/rowset/rowset_meta.cpp | 38 ++++++++++++--- be/src/olap/rowset/rowset_meta.h | 10 ++++ be/src/olap/snapshot_manager.cpp | 5 +- be/src/olap/tablet.cpp | 3 +- be/src/olap/tablet_schema_cache.cpp | 81 +++++++++++-------------------- be/src/olap/tablet_schema_cache.h | 39 +++++---------- be/src/runtime/exec_env_init.cpp | 5 +- be/src/runtime/memory/cache_policy.h | 5 +- be/test/olap/rowset/rowset_meta_test.cpp | 2 +- be/test/testutil/run_all_tests.cpp | 7 ++- 14 files changed, 101 insertions(+), 100 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index e32dee42a42..89ef077f886 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1129,6 +1129,7 @@ DEFINE_Bool(group_commit_wait_replay_wal_finish, "false"); DEFINE_mInt32(scan_thread_nice_value, "0"); DEFINE_mInt32(tablet_schema_cache_recycle_interval, "3600"); +DEFINE_mInt32(tablet_schema_cache_capacity, "102400"); DEFINE_Bool(exit_on_exception, "false"); // This config controls whether the s3 file writer would flush cache asynchronously diff --git a/be/src/common/config.h b/be/src/common/config.h index 0f67c64bb13..750c75ab5f3 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1201,6 +1201,8 @@ DECLARE_Bool(group_commit_wait_replay_wal_finish); DECLARE_Int32(scan_thread_nice_value); // Used to modify the recycle interval of tablet schema cache DECLARE_mInt32(tablet_schema_cache_recycle_interval); +// Granularity is at the column level +DECLARE_mInt32(tablet_schema_cache_capacity); // Use `LOG(FATAL)` to replace `throw` when true DECLARE_mBool(exit_on_exception); diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 011a901ba5b..43f514906cf 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -34,7 +34,6 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_bytes, MetricUnit::BYTES); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_finish_count, MetricUnit::OPERATIONS); BaseTablet::BaseTablet(TabletMetaSharedPtr tablet_meta) : _tablet_meta(std::move(tablet_meta)) { - TabletSchemaCache::instance()->insert(_tablet_meta->tablet_schema()->to_key()); _metric_entity = DorisMetrics::instance()->metric_registry()->register_entity( fmt::format("Tablet.{}", tablet_id()), {{"tablet_id", std::to_string(tablet_id())}}, MetricEntityType::kTablet); diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index ce30a03c800..04980816a5f 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -654,7 +654,7 @@ void BaseBetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> rowset RowsetSharedPtr BaseBetaRowsetWriter::_build_tmp() { std::shared_ptr<RowsetMeta> rowset_meta_ = std::make_shared<RowsetMeta>(); - *rowset_meta_ = *_rowset_meta; + rowset_meta_->init(_rowset_meta.get()); _build_rowset_meta(rowset_meta_); RowsetSharedPtr rowset; diff --git a/be/src/olap/rowset/rowset_meta.cpp b/be/src/olap/rowset/rowset_meta.cpp index b78ecd424b3..7f4798f97e9 100644 --- a/be/src/olap/rowset/rowset_meta.cpp +++ b/be/src/olap/rowset/rowset_meta.cpp @@ -22,14 +22,20 @@ #include "io/fs/local_file_system.h" #include "json2pb/json_to_pb.h" #include "json2pb/pb_to_json.h" +#include "olap/lru_cache.h" #include "olap/olap_common.h" #include "olap/storage_policy.h" +#include "olap/tablet_fwd.h" #include "olap/tablet_schema.h" #include "olap/tablet_schema_cache.h" namespace doris { -RowsetMeta::~RowsetMeta() = default; +RowsetMeta::~RowsetMeta() { + if (_handle) { + TabletSchemaCache::instance()->release(_handle); + } +} bool RowsetMeta::init(const std::string& pb_rowset_meta) { bool ret = _deserialize_from_pb(pb_rowset_meta); @@ -40,16 +46,21 @@ bool RowsetMeta::init(const std::string& pb_rowset_meta) { return true; } +bool RowsetMeta::init(const RowsetMeta* rowset_meta) { + RowsetMetaPB rowset_meta_pb; + rowset_meta->to_rowset_pb(&rowset_meta_pb); + return init_from_pb(rowset_meta_pb); +} + bool RowsetMeta::init_from_pb(const RowsetMetaPB& rowset_meta_pb) { if (rowset_meta_pb.has_tablet_schema()) { - _schema = TabletSchemaCache::instance()->insert( - rowset_meta_pb.tablet_schema().SerializeAsString()); + set_tablet_schema(rowset_meta_pb.tablet_schema()); } // Release ownership of TabletSchemaPB from `rowset_meta_pb` and then set it back to `rowset_meta_pb`, // this won't break const semantics of `rowset_meta_pb`, because `rowset_meta_pb` is not changed // before and after call this method. auto& mut_rowset_meta_pb = const_cast<RowsetMetaPB&>(rowset_meta_pb); - auto schema = mut_rowset_meta_pb.release_tablet_schema(); + auto* schema = mut_rowset_meta_pb.release_tablet_schema(); _rowset_meta_pb = mut_rowset_meta_pb; mut_rowset_meta_pb.set_allocated_tablet_schema(schema); _init(); @@ -107,7 +118,21 @@ RowsetMetaPB RowsetMeta::get_rowset_pb() { } void RowsetMeta::set_tablet_schema(const TabletSchemaSPtr& tablet_schema) { - _schema = TabletSchemaCache::instance()->insert(tablet_schema->to_key()); + if (_handle) { + TabletSchemaCache::instance()->release(_handle); + } + auto pair = TabletSchemaCache::instance()->insert(tablet_schema->to_key()); + _handle = pair.first; + _schema = pair.second; +} + +void RowsetMeta::set_tablet_schema(const TabletSchemaPB& tablet_schema) { + if (_handle) { + TabletSchemaCache::instance()->release(_handle); + } + auto pair = TabletSchemaCache::instance()->insert(tablet_schema.SerializeAsString()); + _handle = pair.first; + _schema = pair.second; } bool RowsetMeta::_deserialize_from_pb(const std::string& value) { @@ -116,8 +141,7 @@ bool RowsetMeta::_deserialize_from_pb(const std::string& value) { return false; } if (rowset_meta_pb.has_tablet_schema()) { - _schema = TabletSchemaCache::instance()->insert( - rowset_meta_pb.tablet_schema().SerializeAsString()); + set_tablet_schema(rowset_meta_pb.tablet_schema()); rowset_meta_pb.clear_tablet_schema(); } _rowset_meta_pb = rowset_meta_pb; diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index 2f59e646591..7e1dfaa57c3 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -28,15 +28,19 @@ #include "olap/olap_common.h" #include "olap/rowset/rowset_fwd.h" #include "olap/tablet_fwd.h" +#include "runtime/memory/lru_cache_policy.h" namespace doris { class RowsetMeta { public: + RowsetMeta() = default; ~RowsetMeta(); bool init(const std::string& pb_rowset_meta); + bool init(const RowsetMeta* rowset_meta); + bool init_from_pb(const RowsetMetaPB& rowset_meta_pb); bool init_from_json(const std::string& json_rowset_meta); @@ -296,9 +300,14 @@ public: int64_t newest_write_timestamp() const { return _rowset_meta_pb.newest_write_timestamp(); } void set_tablet_schema(const TabletSchemaSPtr& tablet_schema); + void set_tablet_schema(const TabletSchemaPB& tablet_schema); const TabletSchemaSPtr& tablet_schema() { return _schema; } + // Because the member field '_handle' is a raw pointer, use member func 'init' to replace copy ctor + RowsetMeta(const RowsetMeta&) = delete; + RowsetMeta operator=(const RowsetMeta&) = delete; + private: bool _deserialize_from_pb(const std::string& value); @@ -313,6 +322,7 @@ private: private: RowsetMetaPB _rowset_meta_pb; TabletSchemaSPtr _schema; + Cache::Handle* _handle = nullptr; RowsetId _rowset_id; io::FileSystemSPtr _fs; bool _is_removed_from_rowset_meta = false; diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp index c97d5cc4aae..7cfcdcfba6f 100644 --- a/be/src/olap/snapshot_manager.cpp +++ b/be/src/olap/snapshot_manager.cpp @@ -160,9 +160,8 @@ Result<std::vector<PendingRowsetGuard>> SnapshotManager::convert_rowset_ids( new_tablet_meta_pb.set_partition_id(partition_id); } new_tablet_meta_pb.set_schema_hash(schema_hash); - TabletSchemaSPtr tablet_schema; - tablet_schema = - TabletSchemaCache::instance()->insert(new_tablet_meta_pb.schema().SerializeAsString()); + TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>(); + tablet_schema->init_from_pb(new_tablet_meta_pb.schema()); std::unordered_map<Version, RowsetMetaPB*, HashOfVersion> rs_version_map; std::unordered_map<RowsetId, RowsetId, HashOfRowsetId> rowset_id_mapping; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index c7eb4b24041..720cd5b2350 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2143,7 +2143,8 @@ Status Tablet::_cooldown_data(RowsetSharedPtr rowset) { << ", tp=" << old_rowset->data_disk_size() / duration.count(); // gen a new rowset - auto new_rowset_meta = std::make_shared<RowsetMeta>(*old_rowset->rowset_meta()); + auto new_rowset_meta = std::make_shared<RowsetMeta>(); + new_rowset_meta->init(old_rowset->rowset_meta().get()); new_rowset_meta->set_rowset_id(new_rowset_id); new_rowset_meta->set_fs(dest_fs); new_rowset_meta->set_creation_time(time(nullptr)); diff --git a/be/src/olap/tablet_schema_cache.cpp b/be/src/olap/tablet_schema_cache.cpp index ba354b6f252..a1d3bc662ac 100644 --- a/be/src/olap/tablet_schema_cache.cpp +++ b/be/src/olap/tablet_schema_cache.cpp @@ -17,71 +17,48 @@ #include "olap/tablet_schema_cache.h" -#include "bvar/bvar.h" +#include <gen_cpp/olap_file.pb.h> -namespace doris { +#include "bvar/bvar.h" +#include "olap/tablet_schema.h" bvar::Adder<int64_t> g_tablet_schema_cache_count("tablet_schema_cache_count"); bvar::Adder<int64_t> g_tablet_schema_cache_columns_count("tablet_schema_cache_columns_count"); -TabletSchemaSPtr TabletSchemaCache::insert(const std::string& key) { - std::lock_guard guard(_mtx); - auto iter = _cache.find(key); - if (iter == _cache.end()) { - TabletSchemaSPtr tablet_schema_ptr = std::make_shared<TabletSchema>(); +namespace doris { + +std::pair<Cache::Handle*, TabletSchemaSPtr> TabletSchemaCache::insert(const std::string& key) { + auto* lru_handle = cache()->lookup(key); + TabletSchemaSPtr tablet_schema_ptr; + if (lru_handle) { + auto* value = (CacheValue*)cache()->value(lru_handle); + value->last_visit_time = UnixMillis(); + tablet_schema_ptr = value->tablet_schema; + } else { + auto* value = new CacheValue; + value->last_visit_time = UnixMillis(); + tablet_schema_ptr = std::make_shared<TabletSchema>(); TabletSchemaPB pb; pb.ParseFromString(key); tablet_schema_ptr->init_from_pb(pb); - _cache[key] = tablet_schema_ptr; + value->tablet_schema = tablet_schema_ptr; + auto deleter = [](const doris::CacheKey& key, void* value) { + auto* cache_value = (CacheValue*)value; + g_tablet_schema_cache_count << -1; + g_tablet_schema_cache_columns_count << -cache_value->tablet_schema->num_columns(); + delete cache_value; + }; + lru_handle = cache()->insert(key, value, tablet_schema_ptr->num_columns(), deleter, + CachePriority::NORMAL, 0); g_tablet_schema_cache_count << 1; g_tablet_schema_cache_columns_count << tablet_schema_ptr->num_columns(); - return tablet_schema_ptr; - } - return iter->second; -} - -void TabletSchemaCache::start() { - std::thread t(&TabletSchemaCache::_recycle, this); - t.detach(); - LOG(INFO) << "TabletSchemaCache started"; -} - -void TabletSchemaCache::stop() { - _should_stop = true; - while (!_is_stopped) { - std::this_thread::sleep_for(std::chrono::seconds(1)); } - LOG(INFO) << "TabletSchemaCache stopped"; + DCHECK(lru_handle != nullptr); + return std::make_pair(lru_handle, tablet_schema_ptr); } -/** - * @brief recycle when TabletSchemaSPtr use_count equals 1. - */ -void TabletSchemaCache::_recycle() { - int64_t check_interval = 5; - int64_t left_second = config::tablet_schema_cache_recycle_interval; - while (!_should_stop) { - if (left_second > 0) { - std::this_thread::sleep_for(std::chrono::seconds(check_interval)); - left_second -= check_interval; - continue; - } else { - left_second = config::tablet_schema_cache_recycle_interval; - } - - std::lock_guard guard(_mtx); - LOG(INFO) << "Tablet Schema Cache Capacity " << _cache.size(); - for (auto iter = _cache.begin(), last = _cache.end(); iter != last;) { - if (iter->second.unique()) { - g_tablet_schema_cache_count << -1; - g_tablet_schema_cache_columns_count << -iter->second->num_columns(); - iter = _cache.erase(iter); - } else { - ++iter; - } - } - } - _is_stopped = true; +void TabletSchemaCache::release(Cache::Handle* lru_handle) { + cache()->release(lru_handle); } } // namespace doris diff --git a/be/src/olap/tablet_schema_cache.h b/be/src/olap/tablet_schema_cache.h index 93798983c8e..5824f3c1870 100644 --- a/be/src/olap/tablet_schema_cache.h +++ b/be/src/olap/tablet_schema_cache.h @@ -17,24 +17,20 @@ #pragma once -#include <gen_cpp/olap_file.pb.h> - -#include <memory> -#include <mutex> -#include <unordered_map> - -#include "olap/tablet_schema.h" +#include "olap/tablet_fwd.h" #include "runtime/exec_env.h" -#include "util/doris_metrics.h" +#include "runtime/memory/lru_cache_policy.h" namespace doris { -class TabletSchemaCache { +class TabletSchemaCache : public LRUCachePolicy { public: - ~TabletSchemaCache() = default; + TabletSchemaCache(size_t capacity) + : LRUCachePolicy(CachePolicy::CacheType::TABLET_SCHEMA_CACHE, capacity, + LRUCacheType::NUMBER, config::tablet_schema_cache_recycle_interval) {} - static TabletSchemaCache* create_global_schema_cache() { - TabletSchemaCache* res = new TabletSchemaCache(); + static TabletSchemaCache* create_global_schema_cache(size_t capacity) { + auto* res = new TabletSchemaCache(capacity); return res; } @@ -42,23 +38,14 @@ public: return ExecEnv::GetInstance()->get_tablet_schema_cache(); } - TabletSchemaSPtr insert(const std::string& key); - - void start(); + std::pair<Cache::Handle*, TabletSchemaSPtr> insert(const std::string& key); - void stop(); - -private: - /** - * @brief recycle when TabletSchemaSPtr use_count equals 1. - */ - void _recycle(); + void release(Cache::Handle*); private: - std::mutex _mtx; - std::unordered_map<std::string, TabletSchemaSPtr> _cache; - std::atomic_bool _should_stop = {false}; - std::atomic_bool _is_stopped = {false}; + struct CacheValue : public LRUCacheValueBase { + TabletSchemaSPtr tablet_schema; + }; }; } // namespace doris diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index bcfa3616658..adfd525a950 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -250,8 +250,8 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths, _heartbeat_flags = new HeartbeatFlags(); _register_metrics(); - _tablet_schema_cache = TabletSchemaCache::create_global_schema_cache(); - _tablet_schema_cache->start(); + _tablet_schema_cache = + TabletSchemaCache::create_global_schema_cache(config::tablet_schema_cache_capacity); // S3 buffer pool _s3_buffer_pool = new io::S3FileBufferPool(); @@ -526,7 +526,6 @@ void ExecEnv::destroy() { SAFE_STOP(_wal_manager); _wal_manager.reset(); - SAFE_STOP(_tablet_schema_cache); SAFE_STOP(_load_channel_mgr); SAFE_STOP(_scanner_scheduler); SAFE_STOP(_broker_mgr); diff --git a/be/src/runtime/memory/cache_policy.h b/be/src/runtime/memory/cache_policy.h index 0a57f838bfa..e965802ed2b 100644 --- a/be/src/runtime/memory/cache_policy.h +++ b/be/src/runtime/memory/cache_policy.h @@ -40,7 +40,8 @@ public: TABLET_VERSION_CACHE = 10, LAST_SUCCESS_CHANNEL_CACHE = 11, COMMON_OBJ_LRU_CACHE = 12, - FOR_UT = 13 + FOR_UT = 13, + TABLET_SCHEMA_CACHE = 14, }; static std::string type_string(CacheType type) { @@ -73,6 +74,8 @@ public: return "CommonObjLRUCache"; case CacheType::FOR_UT: return "ForUT"; + case CacheType::TABLET_SCHEMA_CACHE: + return "TabletSchemaCache"; default: LOG(FATAL) << "not match type of cache policy :" << static_cast<int>(type); } diff --git a/be/test/olap/rowset/rowset_meta_test.cpp b/be/test/olap/rowset/rowset_meta_test.cpp index e8f891de68e..881ce383810 100644 --- a/be/test/olap/rowset/rowset_meta_test.cpp +++ b/be/test/olap/rowset/rowset_meta_test.cpp @@ -72,7 +72,7 @@ private: std::string _json_rowset_meta; }; -void do_check(RowsetMeta rowset_meta) { +void do_check(const RowsetMeta& rowset_meta) { RowsetId rowset_id; rowset_id.init(540081); EXPECT_EQ(rowset_id, rowset_meta.rowset_id()); diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp index fe514d1a387..945315f5e57 100644 --- a/be/test/testutil/run_all_tests.cpp +++ b/be/test/testutil/run_all_tests.cpp @@ -43,15 +43,15 @@ int main(int argc, char** argv) { doris::ExecEnv::GetInstance()->init_mem_tracker(); doris::thread_context()->thread_mem_tracker_mgr->init(); doris::ExecEnv::GetInstance()->set_cache_manager(doris::CacheManager::create_global_instance()); - doris::ExecEnv::GetInstance()->set_tablet_schema_cache( - doris::TabletSchemaCache::create_global_schema_cache()); - doris::ExecEnv::GetInstance()->get_tablet_schema_cache()->start(); doris::ExecEnv::GetInstance()->set_dummy_lru_cache(std::make_shared<doris::DummyLRUCache>()); doris::ExecEnv::GetInstance()->set_storage_page_cache( doris::StoragePageCache::create_global_cache(1 << 30, 10, 0)); doris::ExecEnv::GetInstance()->set_segment_loader(new doris::SegmentLoader(1000)); std::string conf = std::string(getenv("DORIS_HOME")) + "/conf/be.conf"; auto st = doris::config::init(conf.c_str(), false); + doris::ExecEnv::GetInstance()->set_tablet_schema_cache( + doris::TabletSchemaCache::create_global_schema_cache( + doris::config::tablet_schema_cache_capacity)); LOG(INFO) << "init config " << st; doris::init_glog("be-test"); @@ -66,6 +66,5 @@ int main(int argc, char** argv) { doris::global_test_http_host = "http://127.0.0.1:" + std::to_string(service->get_real_port()); int res = RUN_ALL_TESTS(); - doris::ExecEnv::GetInstance()->get_tablet_schema_cache()->stop(); return res; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org