This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 3bc4fedbbfe721e51732d018bd95626f741ebef5
Author: zhannngchen <48427519+zhannngc...@users.noreply.github.com>
AuthorDate: Fri Oct 13 14:22:07 2023 +0800

    [enhancement](merge-on-write) refine tablet meta_lock usage and add some 
trace log (#25124)
---
 be/src/olap/tablet.cpp      | 276 ++++++++++++++++++++++++--------------------
 be/src/olap/tablet_meta.cpp |  11 ++
 2 files changed, 160 insertions(+), 127 deletions(-)

diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index a66e222fcbf..f245e121fd9 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -153,7 +153,7 @@ static bvar::Adder<uint64_t> 
g_tablet_pk_not_found("doris_pk", "lookup_not_found
 static bvar::PerSecond<bvar::Adder<uint64_t>> g_tablet_pk_not_found_per_second(
         "doris_pk", "lookup_not_found_per_second", &g_tablet_pk_not_found, 60);
 
-const std::chrono::seconds TRACE_TABLET_LOCK_THRESHOLD = 10s;
+const std::chrono::seconds TRACE_TABLET_LOCK_THRESHOLD = 1s;
 
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_bytes, MetricUnit::BYTES);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_finish_count, 
MetricUnit::OPERATIONS);
@@ -678,153 +678,161 @@ void Tablet::_delete_stale_rowset_by_version(const 
Version& version) {
 
 void Tablet::delete_expired_stale_rowset() {
     int64_t now = UnixSeconds();
-    std::lock_guard<std::shared_mutex> wrlock(_meta_lock);
-    SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
-    // Compute the end time to delete rowsets, when a expired rowset 
createtime less then this time, it will be deleted.
-    double expired_stale_sweep_endtime =
-            ::difftime(now, config::tablet_rowset_stale_sweep_time_sec);
-    if (config::tablet_rowset_stale_sweep_by_size) {
-        expired_stale_sweep_endtime = now;
-    }
-
-    std::vector<int64_t> path_id_vec;
-    // capture the path version to delete
-    _timestamped_version_tracker.capture_expired_paths(
-            static_cast<int64_t>(expired_stale_sweep_endtime), &path_id_vec);
-
-    if (path_id_vec.empty()) {
-        return;
-    }
-
-    const RowsetSharedPtr lastest_delta = rowset_with_max_version();
-    if (lastest_delta == nullptr) {
-        LOG(WARNING) << "lastest_delta is null " << tablet_id();
-        return;
-    }
-
-    // fetch missing version before delete
-    std::vector<Version> missed_versions;
-    calc_missed_versions_unlocked(lastest_delta->end_version(), 
&missed_versions);
+    // hold write lock while processing stable rowset
+    {
+        std::lock_guard<std::shared_mutex> wrlock(_meta_lock);
+        SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
+        // Compute the end time to delete rowsets, when a expired rowset 
createtime less then this time, it will be deleted.
+        double expired_stale_sweep_endtime =
+                ::difftime(now, config::tablet_rowset_stale_sweep_time_sec);
+        if (config::tablet_rowset_stale_sweep_by_size) {
+            expired_stale_sweep_endtime = now;
+        }
 
-    if (!missed_versions.empty()) {
-        LOG(WARNING) << "tablet:" << full_name()
-                     << ", missed version for version:" << 
lastest_delta->end_version();
-        _print_missed_versions(missed_versions);
-        return;
-    }
+        std::vector<int64_t> path_id_vec;
+        // capture the path version to delete
+        _timestamped_version_tracker.capture_expired_paths(
+                static_cast<int64_t>(expired_stale_sweep_endtime), 
&path_id_vec);
 
-    // do check consistent operation
-    auto path_id_iter = path_id_vec.begin();
+        if (path_id_vec.empty()) {
+            return;
+        }
 
-    std::map<int64_t, PathVersionListSharedPtr> stale_version_path_map;
-    while (path_id_iter != path_id_vec.end()) {
-        PathVersionListSharedPtr version_path =
-                
_timestamped_version_tracker.fetch_and_delete_path_by_id(*path_id_iter);
+        const RowsetSharedPtr lastest_delta = rowset_with_max_version();
+        if (lastest_delta == nullptr) {
+            LOG(WARNING) << "lastest_delta is null " << tablet_id();
+            return;
+        }
 
-        Version test_version = Version(0, lastest_delta->end_version());
-        stale_version_path_map[*path_id_iter] = version_path;
+        // fetch missing version before delete
+        std::vector<Version> missed_versions;
+        calc_missed_versions_unlocked(lastest_delta->end_version(), 
&missed_versions);
 
-        Status status = capture_consistent_versions(test_version, nullptr);
-        // 1. When there is no consistent versions, we must reconstruct the 
tracker.
-        if (!status.ok()) {
-            // 2. fetch missing version after delete
-            std::vector<Version> after_missed_versions;
-            calc_missed_versions_unlocked(lastest_delta->end_version(), 
&after_missed_versions);
+        if (!missed_versions.empty()) {
+            LOG(WARNING) << "tablet:" << full_name()
+                         << ", missed version for version:" << 
lastest_delta->end_version();
+            _print_missed_versions(missed_versions);
+            return;
+        }
 
-            // 2.1 check whether missed_versions and after_missed_versions are 
the same.
-            // when they are the same, it means we can delete the path 
securely.
-            bool is_missing = missed_versions.size() != 
after_missed_versions.size();
+        // do check consistent operation
+        auto path_id_iter = path_id_vec.begin();
 
-            if (!is_missing) {
-                for (int ver_index = 0; ver_index < missed_versions.size(); 
ver_index++) {
-                    if (missed_versions[ver_index] != 
after_missed_versions[ver_index]) {
-                        is_missing = true;
-                        break;
-                    }
-                }
-            }
+        std::map<int64_t, PathVersionListSharedPtr> stale_version_path_map;
+        while (path_id_iter != path_id_vec.end()) {
+            PathVersionListSharedPtr version_path =
+                    
_timestamped_version_tracker.fetch_and_delete_path_by_id(*path_id_iter);
 
-            if (is_missing) {
-                LOG(WARNING) << "The consistent version check fails, there are 
bugs. "
-                             << "Reconstruct the tracker to recover versions 
in tablet="
-                             << tablet_id();
+            Version test_version = Version(0, lastest_delta->end_version());
+            stale_version_path_map[*path_id_iter] = version_path;
 
-                // 3. try to recover
-                
_timestamped_version_tracker.recover_versioned_tracker(stale_version_path_map);
+            Status status = capture_consistent_versions(test_version, nullptr);
+            // 1. When there is no consistent versions, we must reconstruct 
the tracker.
+            if (!status.ok()) {
+                // 2. fetch missing version after delete
+                std::vector<Version> after_missed_versions;
+                calc_missed_versions_unlocked(lastest_delta->end_version(), 
&after_missed_versions);
 
-                // 4. double check the consistent versions
-                // fetch missing version after recover
-                std::vector<Version> recover_missed_versions;
-                calc_missed_versions_unlocked(lastest_delta->end_version(),
-                                              &recover_missed_versions);
+                // 2.1 check whether missed_versions and after_missed_versions 
are the same.
+                // when they are the same, it means we can delete the path 
securely.
+                bool is_missing = missed_versions.size() != 
after_missed_versions.size();
 
-                // 4.1 check whether missed_versions and 
recover_missed_versions are the same.
-                // when they are the same, it means we recover successfully.
-                bool is_recover_missing = missed_versions.size() != 
recover_missed_versions.size();
-
-                if (!is_recover_missing) {
+                if (!is_missing) {
                     for (int ver_index = 0; ver_index < 
missed_versions.size(); ver_index++) {
-                        if (missed_versions[ver_index] != 
recover_missed_versions[ver_index]) {
-                            is_recover_missing = true;
+                        if (missed_versions[ver_index] != 
after_missed_versions[ver_index]) {
+                            is_missing = true;
                             break;
                         }
                     }
                 }
 
-                // 5. check recover fail, version is mission
-                if (is_recover_missing) {
-                    if (!config::ignore_rowset_stale_unconsistent_delete) {
-                        LOG(FATAL) << "rowset stale unconsistent delete. 
tablet= " << tablet_id();
-                    } else {
-                        LOG(WARNING) << "rowset stale unconsistent delete. 
tablet= " << tablet_id();
+                if (is_missing) {
+                    LOG(WARNING) << "The consistent version check fails, there 
are bugs. "
+                                 << "Reconstruct the tracker to recover 
versions in tablet="
+                                 << tablet_id();
+
+                    // 3. try to recover
+                    
_timestamped_version_tracker.recover_versioned_tracker(stale_version_path_map);
+
+                    // 4. double check the consistent versions
+                    // fetch missing version after recover
+                    std::vector<Version> recover_missed_versions;
+                    calc_missed_versions_unlocked(lastest_delta->end_version(),
+                                                  &recover_missed_versions);
+
+                    // 4.1 check whether missed_versions and 
recover_missed_versions are the same.
+                    // when they are the same, it means we recover 
successfully.
+                    bool is_recover_missing =
+                            missed_versions.size() != 
recover_missed_versions.size();
+
+                    if (!is_recover_missing) {
+                        for (int ver_index = 0; ver_index < 
missed_versions.size(); ver_index++) {
+                            if (missed_versions[ver_index] != 
recover_missed_versions[ver_index]) {
+                                is_recover_missing = true;
+                                break;
+                            }
+                        }
+                    }
+
+                    // 5. check recover fail, version is mission
+                    if (is_recover_missing) {
+                        if (!config::ignore_rowset_stale_unconsistent_delete) {
+                            LOG(FATAL)
+                                    << "rowset stale unconsistent delete. 
tablet= " << tablet_id();
+                        } else {
+                            LOG(WARNING)
+                                    << "rowset stale unconsistent delete. 
tablet= " << tablet_id();
+                        }
                     }
                 }
+                return;
             }
-            return;
-        }
-        path_id_iter++;
-    }
-
-    auto old_size = _stale_rs_version_map.size();
-    auto old_meta_size = _tablet_meta->all_stale_rs_metas().size();
-
-    // do delete operation
-    auto to_delete_iter = stale_version_path_map.begin();
-    while (to_delete_iter != stale_version_path_map.end()) {
-        std::vector<TimestampedVersionSharedPtr>& to_delete_version =
-                to_delete_iter->second->timestamped_versions();
-        for (auto& timestampedVersion : to_delete_version) {
-            auto it = 
_stale_rs_version_map.find(timestampedVersion->version());
-            if (it != _stale_rs_version_map.end()) {
-                // delete rowset
-                StorageEngine::instance()->add_unused_rowset(it->second);
-                _stale_rs_version_map.erase(it);
-                VLOG_NOTICE << "delete stale rowset tablet=" << full_name() << 
" version["
-                            << timestampedVersion->version().first << ","
-                            << timestampedVersion->version().second
-                            << "] move to unused_rowset success " << std::fixed
-                            << expired_stale_sweep_endtime;
-            } else {
-                LOG(WARNING) << "delete stale rowset tablet=" << full_name() 
<< " version["
-                             << timestampedVersion->version().first << ","
-                             << timestampedVersion->version().second
-                             << "] not find in stale rs version map";
+            path_id_iter++;
+        }
+
+        auto old_size = _stale_rs_version_map.size();
+        auto old_meta_size = _tablet_meta->all_stale_rs_metas().size();
+
+        // do delete operation
+        auto to_delete_iter = stale_version_path_map.begin();
+        while (to_delete_iter != stale_version_path_map.end()) {
+            std::vector<TimestampedVersionSharedPtr>& to_delete_version =
+                    to_delete_iter->second->timestamped_versions();
+            for (auto& timestampedVersion : to_delete_version) {
+                auto it = 
_stale_rs_version_map.find(timestampedVersion->version());
+                if (it != _stale_rs_version_map.end()) {
+                    // delete rowset
+                    StorageEngine::instance()->add_unused_rowset(it->second);
+                    _stale_rs_version_map.erase(it);
+                    VLOG_NOTICE << "delete stale rowset tablet=" << 
full_name() << " version["
+                                << timestampedVersion->version().first << ","
+                                << timestampedVersion->version().second
+                                << "] move to unused_rowset success " << 
std::fixed
+                                << expired_stale_sweep_endtime;
+                } else {
+                    LOG(WARNING) << "delete stale rowset tablet=" << 
full_name() << " version["
+                                 << timestampedVersion->version().first << ","
+                                 << timestampedVersion->version().second
+                                 << "] not find in stale rs version map";
+                }
+                _delete_stale_rowset_by_version(timestampedVersion->version());
             }
-            _delete_stale_rowset_by_version(timestampedVersion->version());
+            to_delete_iter++;
         }
-        to_delete_iter++;
-    }
 
-    bool reconstructed = _reconstruct_version_tracker_if_necessary();
-
-    VLOG_NOTICE << "delete stale rowset _stale_rs_version_map tablet=" << 
full_name()
-                << " current_size=" << _stale_rs_version_map.size() << " 
old_size=" << old_size
-                << " current_meta_size=" << 
_tablet_meta->all_stale_rs_metas().size()
-                << " old_meta_size=" << old_meta_size << " sweep endtime " << 
std::fixed
-                << expired_stale_sweep_endtime << ", reconstructed=" << 
reconstructed;
+        bool reconstructed = _reconstruct_version_tracker_if_necessary();
 
+        VLOG_NOTICE << "delete stale rowset _stale_rs_version_map tablet=" << 
full_name()
+                    << " current_size=" << _stale_rs_version_map.size() << " 
old_size=" << old_size
+                    << " current_meta_size=" << 
_tablet_meta->all_stale_rs_metas().size()
+                    << " old_meta_size=" << old_meta_size << " sweep endtime " 
<< std::fixed
+                    << expired_stale_sweep_endtime << ", reconstructed=" << 
reconstructed;
+    }
 #ifndef BE_TEST
-    save_meta();
+    {
+        std::shared_lock<std::shared_mutex> rlock(_meta_lock);
+        save_meta();
+    }
 #endif
 }
 
@@ -3350,8 +3358,10 @@ Status Tablet::update_delete_bitmap(const 
RowsetSharedPtr& rowset,
     RowsetIdUnorderedSet rowset_ids_to_del;
     int64_t cur_version = rowset->start_version();
 
+    OlapStopWatch watch;
     std::vector<segment_v2::SegmentSharedPtr> segments;
-    _load_rowset_segments(rowset, &segments);
+    RETURN_IF_ERROR(_load_rowset_segments(rowset, &segments));
+    auto t1 = watch.get_elapse_time_us();
 
     {
         std::shared_lock meta_rlock(_meta_lock);
@@ -3364,6 +3374,8 @@ Status Tablet::update_delete_bitmap(const 
RowsetSharedPtr& rowset,
         }
         cur_rowset_ids = all_rs_id(cur_version - 1);
     }
+    auto t2 = watch.get_elapse_time_us();
+
     _rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add, 
&rowset_ids_to_del);
     for (const auto& to_del : rowset_ids_to_del) {
         delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX});
@@ -3374,21 +3386,31 @@ Status Tablet::update_delete_bitmap(const 
RowsetSharedPtr& rowset,
         std::shared_lock meta_rlock(_meta_lock);
         specified_rowsets = get_rowset_by_ids(&rowset_ids_to_add);
     }
+    auto t3 = watch.get_elapse_time_us();
 
-    OlapStopWatch watch;
     auto token = 
StorageEngine::instance()->calc_delete_bitmap_executor()->create_token();
     RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, 
delete_bitmap,
                                        cur_version - 1, token.get(), 
rowset_writer));
     RETURN_IF_ERROR(token->wait());
     RETURN_IF_ERROR(token->get_delete_bitmap(delete_bitmap));
+
+    std::stringstream ss;
+    if (watch.get_elapse_time_us() < 1 * 1000 * 1000) {
+        ss << "cost: " << watch.get_elapse_time_us() - t3 << "(us)";
+    } else {
+        ss << "cost(us): (load segments: " << t1 << ", get all rsid: " << t2 - 
t1
+           << ", get rowsets: " << t3 - t2
+           << ", calc delete bitmap: " << watch.get_elapse_time_us() - t3 << 
")";
+    }
+
     size_t total_rows = std::accumulate(
             segments.begin(), segments.end(), 0,
             [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum 
+= s->num_rows(); });
     LOG(INFO) << "[Publish] construct delete bitmap tablet: " << tablet_id()
               << ", rowset_ids to add: " << rowset_ids_to_add.size()
               << ", rowset_ids to del: " << rowset_ids_to_del.size()
-              << ", cur max_version: " << cur_version << ", transaction_id: " 
<< txn_id
-              << ", cost: " << watch.get_elapse_time_us() << "(us), total 
rows: " << total_rows;
+              << ", cur max_version: " << cur_version << ", transaction_id: " 
<< txn_id << ","
+              << ss.str() << " , total rows: " << total_rows;
 
     if (config::enable_merge_on_write_correctness_check && rowset->num_rows() 
!= 0) {
         // only do correctness check if the rowset has at least one row written
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 612c50280d7..e1637d3d945 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -39,6 +39,7 @@
 #include "olap/tablet_meta_manager.h"
 #include "olap/utils.h"
 #include "util/string_util.h"
+#include "util/time.h"
 #include "util/uid_util.h"
 
 using std::string;
@@ -442,12 +443,22 @@ Status TabletMeta::_save_meta(DataDir* data_dir) {
                    << " tablet=" << full_name() << " _tablet_uid=" << 
_tablet_uid.to_string();
     }
     string meta_binary;
+
+    auto t1 = MonotonicMicros();
     RETURN_IF_ERROR(serialize(&meta_binary));
+    auto t2 = MonotonicMicros();
     Status status = TabletMetaManager::save(data_dir, tablet_id(), 
schema_hash(), meta_binary);
     if (!status.ok()) {
         LOG(FATAL) << "fail to save tablet_meta. status=" << status << ", 
tablet_id=" << tablet_id()
                    << ", schema_hash=" << schema_hash();
     }
+    auto t3 = MonotonicMicros();
+    auto cost =  t3 - t1;
+    if (cost > 1 * 1000 * 1000) {
+        LOG(INFO) << "save tablet(" << full_name() << ") meta too slow. 
serialize cost " << t2 - t1
+                  << "(us), serialized binary size: " << meta_binary.length()
+                  << "(bytes), write rocksdb cost " << t3 - t2 << "(us)";
+    }
     return status;
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to