This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit d525f576e196507a1064480eebae42404926d3b9
Author: Lightman <31928846+lchangli...@users.noreply.github.com>
AuthorDate: Thu Jan 11 20:41:31 2024 +0800

    [improve] Use lru cache to count the number of column in tablet schema to 
control memory (#29668)
---
 be/src/common/config.cpp                  |  1 +
 be/src/common/config.h                    |  2 +
 be/src/olap/base_tablet.cpp               |  1 -
 be/src/olap/rowset/beta_rowset_writer.cpp |  2 +-
 be/src/olap/rowset/rowset_meta.cpp        | 38 ++++++++++++---
 be/src/olap/rowset/rowset_meta.h          | 10 ++++
 be/src/olap/snapshot_manager.cpp          |  5 +-
 be/src/olap/tablet.cpp                    |  3 +-
 be/src/olap/tablet_schema_cache.cpp       | 81 +++++++++++--------------------
 be/src/olap/tablet_schema_cache.h         | 39 +++++----------
 be/src/runtime/exec_env_init.cpp          |  5 +-
 be/src/runtime/memory/cache_policy.h      |  5 +-
 be/test/olap/rowset/rowset_meta_test.cpp  |  2 +-
 be/test/testutil/run_all_tests.cpp        |  7 ++-
 14 files changed, 101 insertions(+), 100 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index e32dee42a42..89ef077f886 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1129,6 +1129,7 @@ DEFINE_Bool(group_commit_wait_replay_wal_finish, "false");
 
 DEFINE_mInt32(scan_thread_nice_value, "0");
 DEFINE_mInt32(tablet_schema_cache_recycle_interval, "3600");
+DEFINE_mInt32(tablet_schema_cache_capacity, "102400");
 
 DEFINE_Bool(exit_on_exception, "false");
 // This config controls whether the s3 file writer would flush cache 
asynchronously
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 0f67c64bb13..750c75ab5f3 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1201,6 +1201,8 @@ DECLARE_Bool(group_commit_wait_replay_wal_finish);
 DECLARE_Int32(scan_thread_nice_value);
 // Used to modify the recycle interval of tablet schema cache
 DECLARE_mInt32(tablet_schema_cache_recycle_interval);
+// Granularity is at the column level
+DECLARE_mInt32(tablet_schema_cache_capacity);
 
 // Use `LOG(FATAL)` to replace `throw` when true
 DECLARE_mBool(exit_on_exception);
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 011a901ba5b..43f514906cf 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -34,7 +34,6 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_bytes, 
MetricUnit::BYTES);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_finish_count, 
MetricUnit::OPERATIONS);
 
 BaseTablet::BaseTablet(TabletMetaSharedPtr tablet_meta) : 
_tablet_meta(std::move(tablet_meta)) {
-    
TabletSchemaCache::instance()->insert(_tablet_meta->tablet_schema()->to_key());
     _metric_entity = 
DorisMetrics::instance()->metric_registry()->register_entity(
             fmt::format("Tablet.{}", tablet_id()), {{"tablet_id", 
std::to_string(tablet_id())}},
             MetricEntityType::kTablet);
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index ce30a03c800..04980816a5f 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -654,7 +654,7 @@ void 
BaseBetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> rowset
 
 RowsetSharedPtr BaseBetaRowsetWriter::_build_tmp() {
     std::shared_ptr<RowsetMeta> rowset_meta_ = std::make_shared<RowsetMeta>();
-    *rowset_meta_ = *_rowset_meta;
+    rowset_meta_->init(_rowset_meta.get());
     _build_rowset_meta(rowset_meta_);
 
     RowsetSharedPtr rowset;
diff --git a/be/src/olap/rowset/rowset_meta.cpp 
b/be/src/olap/rowset/rowset_meta.cpp
index b78ecd424b3..7f4798f97e9 100644
--- a/be/src/olap/rowset/rowset_meta.cpp
+++ b/be/src/olap/rowset/rowset_meta.cpp
@@ -22,14 +22,20 @@
 #include "io/fs/local_file_system.h"
 #include "json2pb/json_to_pb.h"
 #include "json2pb/pb_to_json.h"
+#include "olap/lru_cache.h"
 #include "olap/olap_common.h"
 #include "olap/storage_policy.h"
+#include "olap/tablet_fwd.h"
 #include "olap/tablet_schema.h"
 #include "olap/tablet_schema_cache.h"
 
 namespace doris {
 
-RowsetMeta::~RowsetMeta() = default;
+RowsetMeta::~RowsetMeta() {
+    if (_handle) {
+        TabletSchemaCache::instance()->release(_handle);
+    }
+}
 
 bool RowsetMeta::init(const std::string& pb_rowset_meta) {
     bool ret = _deserialize_from_pb(pb_rowset_meta);
@@ -40,16 +46,21 @@ bool RowsetMeta::init(const std::string& pb_rowset_meta) {
     return true;
 }
 
+bool RowsetMeta::init(const RowsetMeta* rowset_meta) {
+    RowsetMetaPB rowset_meta_pb;
+    rowset_meta->to_rowset_pb(&rowset_meta_pb);
+    return init_from_pb(rowset_meta_pb);
+}
+
 bool RowsetMeta::init_from_pb(const RowsetMetaPB& rowset_meta_pb) {
     if (rowset_meta_pb.has_tablet_schema()) {
-        _schema = TabletSchemaCache::instance()->insert(
-                rowset_meta_pb.tablet_schema().SerializeAsString());
+        set_tablet_schema(rowset_meta_pb.tablet_schema());
     }
     // Release ownership of TabletSchemaPB from `rowset_meta_pb` and then set 
it back to `rowset_meta_pb`,
     // this won't break const semantics of `rowset_meta_pb`, because 
`rowset_meta_pb` is not changed
     // before and after call this method.
     auto& mut_rowset_meta_pb = const_cast<RowsetMetaPB&>(rowset_meta_pb);
-    auto schema = mut_rowset_meta_pb.release_tablet_schema();
+    auto* schema = mut_rowset_meta_pb.release_tablet_schema();
     _rowset_meta_pb = mut_rowset_meta_pb;
     mut_rowset_meta_pb.set_allocated_tablet_schema(schema);
     _init();
@@ -107,7 +118,21 @@ RowsetMetaPB RowsetMeta::get_rowset_pb() {
 }
 
 void RowsetMeta::set_tablet_schema(const TabletSchemaSPtr& tablet_schema) {
-    _schema = TabletSchemaCache::instance()->insert(tablet_schema->to_key());
+    if (_handle) {
+        TabletSchemaCache::instance()->release(_handle);
+    }
+    auto pair = TabletSchemaCache::instance()->insert(tablet_schema->to_key());
+    _handle = pair.first;
+    _schema = pair.second;
+}
+
+void RowsetMeta::set_tablet_schema(const TabletSchemaPB& tablet_schema) {
+    if (_handle) {
+        TabletSchemaCache::instance()->release(_handle);
+    }
+    auto pair = 
TabletSchemaCache::instance()->insert(tablet_schema.SerializeAsString());
+    _handle = pair.first;
+    _schema = pair.second;
 }
 
 bool RowsetMeta::_deserialize_from_pb(const std::string& value) {
@@ -116,8 +141,7 @@ bool RowsetMeta::_deserialize_from_pb(const std::string& 
value) {
         return false;
     }
     if (rowset_meta_pb.has_tablet_schema()) {
-        _schema = TabletSchemaCache::instance()->insert(
-                rowset_meta_pb.tablet_schema().SerializeAsString());
+        set_tablet_schema(rowset_meta_pb.tablet_schema());
         rowset_meta_pb.clear_tablet_schema();
     }
     _rowset_meta_pb = rowset_meta_pb;
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index 2f59e646591..7e1dfaa57c3 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -28,15 +28,19 @@
 #include "olap/olap_common.h"
 #include "olap/rowset/rowset_fwd.h"
 #include "olap/tablet_fwd.h"
+#include "runtime/memory/lru_cache_policy.h"
 
 namespace doris {
 
 class RowsetMeta {
 public:
+    RowsetMeta() = default;
     ~RowsetMeta();
 
     bool init(const std::string& pb_rowset_meta);
 
+    bool init(const RowsetMeta* rowset_meta);
+
     bool init_from_pb(const RowsetMetaPB& rowset_meta_pb);
 
     bool init_from_json(const std::string& json_rowset_meta);
@@ -296,9 +300,14 @@ public:
     int64_t newest_write_timestamp() const { return 
_rowset_meta_pb.newest_write_timestamp(); }
 
     void set_tablet_schema(const TabletSchemaSPtr& tablet_schema);
+    void set_tablet_schema(const TabletSchemaPB& tablet_schema);
 
     const TabletSchemaSPtr& tablet_schema() { return _schema; }
 
+    // Because the member field '_handle' is a raw pointer, use member func 
'init' to replace copy ctor
+    RowsetMeta(const RowsetMeta&) = delete;
+    RowsetMeta operator=(const RowsetMeta&) = delete;
+
 private:
     bool _deserialize_from_pb(const std::string& value);
 
@@ -313,6 +322,7 @@ private:
 private:
     RowsetMetaPB _rowset_meta_pb;
     TabletSchemaSPtr _schema;
+    Cache::Handle* _handle = nullptr;
     RowsetId _rowset_id;
     io::FileSystemSPtr _fs;
     bool _is_removed_from_rowset_meta = false;
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index c97d5cc4aae..7cfcdcfba6f 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -160,9 +160,8 @@ Result<std::vector<PendingRowsetGuard>> 
SnapshotManager::convert_rowset_ids(
         new_tablet_meta_pb.set_partition_id(partition_id);
     }
     new_tablet_meta_pb.set_schema_hash(schema_hash);
-    TabletSchemaSPtr tablet_schema;
-    tablet_schema =
-            
TabletSchemaCache::instance()->insert(new_tablet_meta_pb.schema().SerializeAsString());
+    TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+    tablet_schema->init_from_pb(new_tablet_meta_pb.schema());
 
     std::unordered_map<Version, RowsetMetaPB*, HashOfVersion> rs_version_map;
     std::unordered_map<RowsetId, RowsetId, HashOfRowsetId> rowset_id_mapping;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index c7eb4b24041..720cd5b2350 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2143,7 +2143,8 @@ Status Tablet::_cooldown_data(RowsetSharedPtr rowset) {
               << ", tp=" << old_rowset->data_disk_size() / duration.count();
 
     // gen a new rowset
-    auto new_rowset_meta = 
std::make_shared<RowsetMeta>(*old_rowset->rowset_meta());
+    auto new_rowset_meta = std::make_shared<RowsetMeta>();
+    new_rowset_meta->init(old_rowset->rowset_meta().get());
     new_rowset_meta->set_rowset_id(new_rowset_id);
     new_rowset_meta->set_fs(dest_fs);
     new_rowset_meta->set_creation_time(time(nullptr));
diff --git a/be/src/olap/tablet_schema_cache.cpp 
b/be/src/olap/tablet_schema_cache.cpp
index ba354b6f252..a1d3bc662ac 100644
--- a/be/src/olap/tablet_schema_cache.cpp
+++ b/be/src/olap/tablet_schema_cache.cpp
@@ -17,71 +17,48 @@
 
 #include "olap/tablet_schema_cache.h"
 
-#include "bvar/bvar.h"
+#include <gen_cpp/olap_file.pb.h>
 
-namespace doris {
+#include "bvar/bvar.h"
+#include "olap/tablet_schema.h"
 
 bvar::Adder<int64_t> g_tablet_schema_cache_count("tablet_schema_cache_count");
 bvar::Adder<int64_t> 
g_tablet_schema_cache_columns_count("tablet_schema_cache_columns_count");
 
-TabletSchemaSPtr TabletSchemaCache::insert(const std::string& key) {
-    std::lock_guard guard(_mtx);
-    auto iter = _cache.find(key);
-    if (iter == _cache.end()) {
-        TabletSchemaSPtr tablet_schema_ptr = std::make_shared<TabletSchema>();
+namespace doris {
+
+std::pair<Cache::Handle*, TabletSchemaSPtr> TabletSchemaCache::insert(const 
std::string& key) {
+    auto* lru_handle = cache()->lookup(key);
+    TabletSchemaSPtr tablet_schema_ptr;
+    if (lru_handle) {
+        auto* value = (CacheValue*)cache()->value(lru_handle);
+        value->last_visit_time = UnixMillis();
+        tablet_schema_ptr = value->tablet_schema;
+    } else {
+        auto* value = new CacheValue;
+        value->last_visit_time = UnixMillis();
+        tablet_schema_ptr = std::make_shared<TabletSchema>();
         TabletSchemaPB pb;
         pb.ParseFromString(key);
         tablet_schema_ptr->init_from_pb(pb);
-        _cache[key] = tablet_schema_ptr;
+        value->tablet_schema = tablet_schema_ptr;
+        auto deleter = [](const doris::CacheKey& key, void* value) {
+            auto* cache_value = (CacheValue*)value;
+            g_tablet_schema_cache_count << -1;
+            g_tablet_schema_cache_columns_count << 
-cache_value->tablet_schema->num_columns();
+            delete cache_value;
+        };
+        lru_handle = cache()->insert(key, value, 
tablet_schema_ptr->num_columns(), deleter,
+                                     CachePriority::NORMAL, 0);
         g_tablet_schema_cache_count << 1;
         g_tablet_schema_cache_columns_count << 
tablet_schema_ptr->num_columns();
-        return tablet_schema_ptr;
-    }
-    return iter->second;
-}
-
-void TabletSchemaCache::start() {
-    std::thread t(&TabletSchemaCache::_recycle, this);
-    t.detach();
-    LOG(INFO) << "TabletSchemaCache started";
-}
-
-void TabletSchemaCache::stop() {
-    _should_stop = true;
-    while (!_is_stopped) {
-        std::this_thread::sleep_for(std::chrono::seconds(1));
     }
-    LOG(INFO) << "TabletSchemaCache stopped";
+    DCHECK(lru_handle != nullptr);
+    return std::make_pair(lru_handle, tablet_schema_ptr);
 }
 
-/**
- * @brief recycle when TabletSchemaSPtr use_count equals 1.
- */
-void TabletSchemaCache::_recycle() {
-    int64_t check_interval = 5;
-    int64_t left_second = config::tablet_schema_cache_recycle_interval;
-    while (!_should_stop) {
-        if (left_second > 0) {
-            std::this_thread::sleep_for(std::chrono::seconds(check_interval));
-            left_second -= check_interval;
-            continue;
-        } else {
-            left_second = config::tablet_schema_cache_recycle_interval;
-        }
-
-        std::lock_guard guard(_mtx);
-        LOG(INFO) << "Tablet Schema Cache Capacity " << _cache.size();
-        for (auto iter = _cache.begin(), last = _cache.end(); iter != last;) {
-            if (iter->second.unique()) {
-                g_tablet_schema_cache_count << -1;
-                g_tablet_schema_cache_columns_count << 
-iter->second->num_columns();
-                iter = _cache.erase(iter);
-            } else {
-                ++iter;
-            }
-        }
-    }
-    _is_stopped = true;
+void TabletSchemaCache::release(Cache::Handle* lru_handle) {
+    cache()->release(lru_handle);
 }
 
 } // namespace doris
diff --git a/be/src/olap/tablet_schema_cache.h 
b/be/src/olap/tablet_schema_cache.h
index 93798983c8e..5824f3c1870 100644
--- a/be/src/olap/tablet_schema_cache.h
+++ b/be/src/olap/tablet_schema_cache.h
@@ -17,24 +17,20 @@
 
 #pragma once
 
-#include <gen_cpp/olap_file.pb.h>
-
-#include <memory>
-#include <mutex>
-#include <unordered_map>
-
-#include "olap/tablet_schema.h"
+#include "olap/tablet_fwd.h"
 #include "runtime/exec_env.h"
-#include "util/doris_metrics.h"
+#include "runtime/memory/lru_cache_policy.h"
 
 namespace doris {
 
-class TabletSchemaCache {
+class TabletSchemaCache : public LRUCachePolicy {
 public:
-    ~TabletSchemaCache() = default;
+    TabletSchemaCache(size_t capacity)
+            : LRUCachePolicy(CachePolicy::CacheType::TABLET_SCHEMA_CACHE, 
capacity,
+                             LRUCacheType::NUMBER, 
config::tablet_schema_cache_recycle_interval) {}
 
-    static TabletSchemaCache* create_global_schema_cache() {
-        TabletSchemaCache* res = new TabletSchemaCache();
+    static TabletSchemaCache* create_global_schema_cache(size_t capacity) {
+        auto* res = new TabletSchemaCache(capacity);
         return res;
     }
 
@@ -42,23 +38,14 @@ public:
         return ExecEnv::GetInstance()->get_tablet_schema_cache();
     }
 
-    TabletSchemaSPtr insert(const std::string& key);
-
-    void start();
+    std::pair<Cache::Handle*, TabletSchemaSPtr> insert(const std::string& key);
 
-    void stop();
-
-private:
-    /**
-     * @brief recycle when TabletSchemaSPtr use_count equals 1.
-     */
-    void _recycle();
+    void release(Cache::Handle*);
 
 private:
-    std::mutex _mtx;
-    std::unordered_map<std::string, TabletSchemaSPtr> _cache;
-    std::atomic_bool _should_stop = {false};
-    std::atomic_bool _is_stopped = {false};
+    struct CacheValue : public LRUCacheValueBase {
+        TabletSchemaSPtr tablet_schema;
+    };
 };
 
 } // namespace doris
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index bcfa3616658..adfd525a950 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -250,8 +250,8 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
     _heartbeat_flags = new HeartbeatFlags();
     _register_metrics();
 
-    _tablet_schema_cache = TabletSchemaCache::create_global_schema_cache();
-    _tablet_schema_cache->start();
+    _tablet_schema_cache =
+            
TabletSchemaCache::create_global_schema_cache(config::tablet_schema_cache_capacity);
 
     // S3 buffer pool
     _s3_buffer_pool = new io::S3FileBufferPool();
@@ -526,7 +526,6 @@ void ExecEnv::destroy() {
 
     SAFE_STOP(_wal_manager);
     _wal_manager.reset();
-    SAFE_STOP(_tablet_schema_cache);
     SAFE_STOP(_load_channel_mgr);
     SAFE_STOP(_scanner_scheduler);
     SAFE_STOP(_broker_mgr);
diff --git a/be/src/runtime/memory/cache_policy.h 
b/be/src/runtime/memory/cache_policy.h
index 0a57f838bfa..e965802ed2b 100644
--- a/be/src/runtime/memory/cache_policy.h
+++ b/be/src/runtime/memory/cache_policy.h
@@ -40,7 +40,8 @@ public:
         TABLET_VERSION_CACHE = 10,
         LAST_SUCCESS_CHANNEL_CACHE = 11,
         COMMON_OBJ_LRU_CACHE = 12,
-        FOR_UT = 13
+        FOR_UT = 13,
+        TABLET_SCHEMA_CACHE = 14,
     };
 
     static std::string type_string(CacheType type) {
@@ -73,6 +74,8 @@ public:
             return "CommonObjLRUCache";
         case CacheType::FOR_UT:
             return "ForUT";
+        case CacheType::TABLET_SCHEMA_CACHE:
+            return "TabletSchemaCache";
         default:
             LOG(FATAL) << "not match type of cache policy :" << 
static_cast<int>(type);
         }
diff --git a/be/test/olap/rowset/rowset_meta_test.cpp 
b/be/test/olap/rowset/rowset_meta_test.cpp
index e8f891de68e..881ce383810 100644
--- a/be/test/olap/rowset/rowset_meta_test.cpp
+++ b/be/test/olap/rowset/rowset_meta_test.cpp
@@ -72,7 +72,7 @@ private:
     std::string _json_rowset_meta;
 };
 
-void do_check(RowsetMeta rowset_meta) {
+void do_check(const RowsetMeta& rowset_meta) {
     RowsetId rowset_id;
     rowset_id.init(540081);
     EXPECT_EQ(rowset_id, rowset_meta.rowset_id());
diff --git a/be/test/testutil/run_all_tests.cpp 
b/be/test/testutil/run_all_tests.cpp
index fe514d1a387..945315f5e57 100644
--- a/be/test/testutil/run_all_tests.cpp
+++ b/be/test/testutil/run_all_tests.cpp
@@ -43,15 +43,15 @@ int main(int argc, char** argv) {
     doris::ExecEnv::GetInstance()->init_mem_tracker();
     doris::thread_context()->thread_mem_tracker_mgr->init();
     
doris::ExecEnv::GetInstance()->set_cache_manager(doris::CacheManager::create_global_instance());
-    doris::ExecEnv::GetInstance()->set_tablet_schema_cache(
-            doris::TabletSchemaCache::create_global_schema_cache());
-    doris::ExecEnv::GetInstance()->get_tablet_schema_cache()->start();
     
doris::ExecEnv::GetInstance()->set_dummy_lru_cache(std::make_shared<doris::DummyLRUCache>());
     doris::ExecEnv::GetInstance()->set_storage_page_cache(
             doris::StoragePageCache::create_global_cache(1 << 30, 10, 0));
     doris::ExecEnv::GetInstance()->set_segment_loader(new 
doris::SegmentLoader(1000));
     std::string conf = std::string(getenv("DORIS_HOME")) + "/conf/be.conf";
     auto st = doris::config::init(conf.c_str(), false);
+    doris::ExecEnv::GetInstance()->set_tablet_schema_cache(
+            doris::TabletSchemaCache::create_global_schema_cache(
+                    doris::config::tablet_schema_cache_capacity));
     LOG(INFO) << "init config " << st;
 
     doris::init_glog("be-test");
@@ -66,6 +66,5 @@ int main(int argc, char** argv) {
     doris::global_test_http_host = "http://127.0.0.1:"; + 
std::to_string(service->get_real_port());
 
     int res = RUN_ALL_TESTS();
-    doris::ExecEnv::GetInstance()->get_tablet_schema_cache()->stop();
     return res;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to