This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bac58079acb [fix](restore) Add a local snapshot lock to protect 
snapshot dir (#47279)
bac58079acb is described below

commit bac58079acb571aef1c6dcaf3b83cb928990ec90
Author: walter <maoch...@selectdb.com>
AuthorDate: Tue Jan 21 21:18:25 2025 +0800

    [fix](restore) Add a local snapshot lock to protect snapshot dir (#47279)
    
    To avoid concurrent modification of a snapshot dir.
---
 be/src/olap/snapshot_manager.cpp   |  33 +++++++++++-
 be/src/olap/snapshot_manager.h     |  51 +++++++++++++++++++
 be/src/runtime/snapshot_loader.cpp | 100 ++++++++++++++++++-------------------
 3 files changed, 132 insertions(+), 52 deletions(-)

diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index 8202feb68c6..7f0e94274d9 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -67,6 +67,35 @@ using std::vector;
 namespace doris {
 using namespace ErrorCode;
 
+LocalSnapshotLockGuard LocalSnapshotLock::acquire(const std::string& path) {
+    std::unique_lock<std::mutex> l(_lock);
+    auto& ctx = _local_snapshot_contexts[path];
+    while (ctx._is_locked) {
+        ctx._waiting_count++;
+        ctx._cv.wait(l);
+        ctx._waiting_count--;
+    }
+
+    ctx._is_locked = true;
+    return {path};
+}
+
+void LocalSnapshotLock::release(const std::string& path) {
+    std::lock_guard<std::mutex> l(_lock);
+    auto iter = _local_snapshot_contexts.find(path);
+    if (iter == _local_snapshot_contexts.end()) {
+        return;
+    }
+
+    auto& ctx = iter->second;
+    ctx._is_locked = false;
+    if (ctx._waiting_count > 0) {
+        ctx._cv.notify_one();
+    } else {
+        _local_snapshot_contexts.erase(iter);
+    }
+}
+
 SnapshotManager::SnapshotManager(StorageEngine& engine) : _engine(engine) {
     _mem_tracker =
             MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, 
"SnapshotManager");
@@ -118,6 +147,8 @@ Status SnapshotManager::make_snapshot(const 
TSnapshotRequest& request, string* s
 }
 
 Status SnapshotManager::release_snapshot(const string& snapshot_path) {
+    auto local_snapshot_guard = 
LocalSnapshotLock::instance().acquire(snapshot_path);
+
     // If the requested snapshot_path is located in the root/snapshot folder, 
it is considered legal and can be deleted.
     // Otherwise, it is considered an illegal request and returns an error 
result.
     SCOPED_ATTACH_TASK(_mem_tracker);
@@ -448,7 +479,7 @@ Status SnapshotManager::_create_snapshot_files(const 
TabletSharedPtr& ref_tablet
                 }
             }
             // be would definitely set it as true no matter has missed version 
or not
-            // but it would take no effets on the following range loop
+            // but it would take no effects on the following range loop
             if (!is_single_rowset_clone && request.__isset.missing_version) {
                 for (int64_t missed_version : request.missing_version) {
                     Version version = {missed_version, missed_version};
diff --git a/be/src/olap/snapshot_manager.h b/be/src/olap/snapshot_manager.h
index dd10f7f3550..668bb860e1b 100644
--- a/be/src/olap/snapshot_manager.h
+++ b/be/src/olap/snapshot_manager.h
@@ -17,7 +17,9 @@
 
 #pragma once
 
+#include <condition_variable>
 #include <memory>
+#include <mutex>
 #include <string>
 #include <vector>
 
@@ -33,6 +35,55 @@ struct RowsetId;
 class StorageEngine;
 class MemTrackerLimiter;
 
+class LocalSnapshotLockGuard;
+
+// A simple lock to protect the local snapshot path.
+class LocalSnapshotLock {
+    friend class LocalSnapshotLockGuard;
+
+public:
+    LocalSnapshotLock() = default;
+    ~LocalSnapshotLock() = default;
+    LocalSnapshotLock(const LocalSnapshotLock&) = delete;
+    LocalSnapshotLock& operator=(const LocalSnapshotLock&) = delete;
+
+    static LocalSnapshotLock& instance() {
+        static LocalSnapshotLock instance;
+        return instance;
+    }
+
+    // Acquire the lock for the specified path. It will block if the lock is 
already held by another.
+    LocalSnapshotLockGuard acquire(const std::string& path);
+
+private:
+    void release(const std::string& path);
+
+    class LocalSnapshotContext {
+    public:
+        bool _is_locked = false;
+        size_t _waiting_count = 0;
+        std::condition_variable _cv;
+
+        LocalSnapshotContext() = default;
+        LocalSnapshotContext(const LocalSnapshotContext&) = delete;
+        LocalSnapshotContext& operator=(const LocalSnapshotContext&) = delete;
+    };
+
+    std::mutex _lock;
+    std::unordered_map<std::string, LocalSnapshotContext> 
_local_snapshot_contexts;
+};
+
+class LocalSnapshotLockGuard {
+public:
+    LocalSnapshotLockGuard(std::string path) : _snapshot_path(std::move(path)) 
{}
+    LocalSnapshotLockGuard(const LocalSnapshotLockGuard&) = delete;
+    LocalSnapshotLockGuard& operator=(const LocalSnapshotLockGuard&) = delete;
+    ~LocalSnapshotLockGuard() { 
LocalSnapshotLock::instance().release(_snapshot_path); }
+
+private:
+    std::string _snapshot_path;
+};
+
 class SnapshotManager {
 public:
     SnapshotManager(StorageEngine& engine);
diff --git a/be/src/runtime/snapshot_loader.cpp 
b/be/src/runtime/snapshot_loader.cpp
index c5b27c82305..422aecad37a 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -28,6 +28,7 @@
 #include <gen_cpp/Types_types.h>
 
 #include <algorithm>
+#include <condition_variable>
 #include <cstring>
 #include <filesystem>
 #include <istream>
@@ -146,6 +147,9 @@ Status SnapshotLoader::upload(const std::map<std::string, 
std::string>& src_to_d
         const std::string& src_path = iter.first;
         const std::string& dest_path = iter.second;
 
+        // Take a lock to protect the local snapshot path.
+        auto local_snapshot_guard = 
LocalSnapshotLock::instance().acquire(src_path);
+
         int64_t tablet_id = 0;
         int32_t schema_hash = 0;
         RETURN_IF_ERROR(
@@ -242,6 +246,9 @@ Status SnapshotLoader::download(const std::map<std::string, 
std::string>& src_to
         const std::string& remote_path = iter.first;
         const std::string& local_path = iter.second;
 
+        // Take a lock to protect the local snapshot path.
+        auto local_snapshot_guard = 
LocalSnapshotLock::instance().acquire(local_path);
+
         int64_t local_tablet_id = 0;
         int32_t schema_hash = 0;
         
RETURN_IF_ERROR(_get_tablet_id_and_schema_hash_from_file_path(local_path, 
&local_tablet_id,
@@ -397,8 +404,6 @@ Status SnapshotLoader::download(const std::map<std::string, 
std::string>& src_to
 Status SnapshotLoader::remote_http_download(
         const std::vector<TRemoteTabletSnapshot>& remote_tablet_snapshots,
         std::vector<int64_t>* downloaded_tablet_ids) {
-    LOG(INFO) << fmt::format("begin to download snapshots via http. job: {}, 
task id: {}", _job_id,
-                             _task_id);
     constexpr uint32_t kListRemoteFileTimeout = 15;
     constexpr uint32_t kDownloadFileMaxRetry = 3;
     constexpr uint32_t kGetLengthTimeout = 10;
@@ -408,35 +413,39 @@ Status SnapshotLoader::remote_http_download(
     RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, 
TTaskType::type::DOWNLOAD));
     Status status = Status::OK();
 
-    // Step before, validate all remote
-
-    // Step 1: Validate local tablet snapshot paths
+    int report_counter = 0;
+    int finished_num = 0;
+    int total_num = remote_tablet_snapshots.size();
     for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
-        const auto& path = remote_tablet_snapshot.local_snapshot_path;
+        const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
+        const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
+        LOG(INFO) << fmt::format(
+                "download snapshots via http. job: {}, task id: {}, local dir: 
{}, remote dir: {}",
+                _job_id, _task_id, local_path, remote_path);
+
+        // Take a lock to protect the local snapshot path.
+        auto local_snapshot_guard = 
LocalSnapshotLock::instance().acquire(local_path);
+
+        // Step 1: Validate local tablet snapshot paths
         bool res = true;
-        RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(path, 
&res));
+        
RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(local_path, &res));
         if (!res) {
             std::stringstream ss;
             auto err_msg =
-                    fmt::format("snapshot path is not directory or does not 
exist: {}", path);
+                    fmt::format("snapshot path is not directory or does not 
exist: {}", local_path);
             LOG(WARNING) << err_msg;
             return Status::RuntimeError(err_msg);
         }
-    }
 
-    // Step 2: get all local files
-    struct LocalFileStat {
-        uint64_t size;
-        std::string md5;
-    };
-    std::unordered_map<std::string, std::unordered_map<std::string, 
LocalFileStat>> local_files_map;
-    for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
-        const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
-        std::vector<std::string> local_files;
-        RETURN_IF_ERROR(_get_existing_files_from_local(local_path, 
&local_files));
-
-        auto& local_filestat = local_files_map[local_path];
-        for (auto& local_file : local_files) {
+        // Step 2: get all local files
+        struct LocalFileStat {
+            uint64_t size;
+            std::string md5;
+        };
+        std::unordered_map<std::string, LocalFileStat> local_files;
+        std::vector<std::string> existing_files;
+        RETURN_IF_ERROR(_get_existing_files_from_local(local_path, 
&existing_files));
+        for (auto& local_file : existing_files) {
             // add file size
             std::string local_file_path = local_path + "/" + local_file;
             std::error_code ec;
@@ -453,27 +462,20 @@ Status SnapshotLoader::remote_http_download(
                              << " md5sum: " << status.to_string();
                 return status;
             }
-            local_filestat[local_file] = {local_file_size, md5};
+            local_files[local_file] = {local_file_size, md5};
         }
-    }
-
-    // Step 3: Validate remote tablet snapshot paths && remote files map
-    // key is remote snapshot paths, value is filelist
-    // get all these use http download action
-    // 
http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr
-    int report_counter = 0;
-    int total_num = remote_tablet_snapshots.size();
-    int finished_num = 0;
-    struct RemoteFileStat {
-        std::string url;
-        std::string md5;
-        uint64_t size;
-    };
-    std::unordered_map<std::string, std::unordered_map<std::string, 
RemoteFileStat>>
-            remote_files_map;
-    for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
-        const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
-        auto& remote_files = remote_files_map[remote_path];
+        existing_files.clear();
+
+        // Step 3: Validate remote tablet snapshot paths && remote files map
+        // key is remote snapshot paths, value is filelist
+        // get all these use http download action
+        // 
http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr
+        struct RemoteFileStat {
+            std::string url;
+            std::string md5;
+            uint64_t size;
+        };
+        std::unordered_map<std::string, RemoteFileStat> remote_files;
         const auto& token = remote_tablet_snapshot.remote_token;
         const auto& remote_be_addr = remote_tablet_snapshot.remote_be_addr;
 
@@ -516,19 +518,11 @@ Status SnapshotLoader::remote_http_download(
 
             remote_files[filename] = RemoteFileStat {remote_file_url, 
file_md5, file_size};
         }
-    }
 
-    // Step 4: Compare local and remote files && get all need download files
-    for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+        // Step 4: Compare local and remote files && get all need download 
files
         RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, 
total_num,
                                       TTaskType::type::DOWNLOAD));
 
-        const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
-        const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
-        auto& remote_files = remote_files_map[remote_path];
-        auto& local_files = local_files_map[local_path];
-        auto remote_tablet_id = remote_tablet_snapshot.remote_tablet_id;
-
         // get all need download files
         std::vector<std::string> need_download_files;
         for (const auto& [remote_file, remote_filestat] : remote_files) {
@@ -656,6 +650,7 @@ Status SnapshotLoader::remote_http_download(
         if (total_time_ms > 0) {
             copy_rate = total_file_size / ((double)total_time_ms) / 1000;
         }
+        auto remote_tablet_id = remote_tablet_snapshot.remote_tablet_id;
         LOG(INFO) << fmt::format(
                 "succeed to copy remote tablet {} to local tablet {}, total 
file size: {} B, cost: "
                 "{} ms, rate: {} MB/s",
@@ -705,6 +700,9 @@ Status SnapshotLoader::remote_http_download(
 // MUST hold tablet's header lock, push lock, cumulative lock and base 
compaction lock
 Status SnapshotLoader::move(const std::string& snapshot_path, TabletSharedPtr 
tablet,
                             bool overwrite) {
+    // Take a lock to protect the local snapshot path.
+    auto local_snapshot_guard = 
LocalSnapshotLock::instance().acquire(snapshot_path);
+
     auto tablet_path = tablet->tablet_path();
     auto store_path = tablet->data_dir()->path();
     LOG(INFO) << "begin to move snapshot files. from: " << snapshot_path << ", 
to: " << tablet_path


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to