This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 4b9d6f9695b branch-3.0: [fix](cloud) Persist cluster_id file in 
Compute-Storage Decoupled mode #53147 (#53195)
4b9d6f9695b is described below

commit 4b9d6f9695b1a8beea862c0aa6643d35bfdac3ec
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jul 14 21:39:57 2025 +0800

    branch-3.0: [fix](cloud) Persist cluster_id file in Compute-Storage 
Decoupled mode #53147 (#53195)
    
    Cherry-picked from #53147
    
    Co-authored-by: yagagagaga <[email protected]>
---
 be/src/cloud/cloud_storage_engine.cpp | 77 +++++++++++++++++++++++++++++++++--
 be/src/cloud/cloud_storage_engine.h   | 12 +++---
 be/src/runtime/exec_env_init.cpp      |  2 +-
 3 files changed, 82 insertions(+), 9 deletions(-)

diff --git a/be/src/cloud/cloud_storage_engine.cpp 
b/be/src/cloud/cloud_storage_engine.cpp
index 74a8f5068a4..6cac3c7f8d0 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -89,10 +89,11 @@ int get_base_thread_num() {
     return std::min(std::max(int(num_cores * 
config::base_compaction_thread_num_factor), 1), 10);
 }
 
-CloudStorageEngine::CloudStorageEngine(const UniqueId& backend_uid)
-        : BaseStorageEngine(Type::CLOUD, backend_uid),
+CloudStorageEngine::CloudStorageEngine(const EngineOptions& options)
+        : BaseStorageEngine(Type::CLOUD, options.backend_uid),
           _meta_mgr(std::make_unique<cloud::CloudMetaMgr>()),
-          _tablet_mgr(std::make_unique<CloudTabletMgr>(*this)) {
+          _tablet_mgr(std::make_unique<CloudTabletMgr>(*this)),
+          _options(options) {
     _cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] =
             std::make_shared<CloudSizeBasedCumulativeCompactionPolicy>();
     _cumulative_compaction_policies[CUMULATIVE_TIME_SERIES_POLICY] =
@@ -226,6 +227,9 @@ Status CloudStorageEngine::open() {
             
init_stream_load_recorder(ExecEnv::GetInstance()->store_paths()[0].path),
             "init StreamLoadRecorder failed");
 
+    // check cluster id
+    RETURN_NOT_OK_STATUS_WITH_WARN(_check_all_root_path_cluster_id(), "fail to 
check cluster id");
+
     return ThreadPoolBuilder("SyncLoadForTabletsThreadPool")
             .set_max_threads(config::sync_load_for_tablets_thread)
             .set_min_threads(config::sync_load_for_tablets_thread)
@@ -1141,5 +1145,72 @@ Status 
CloudStorageEngine::unregister_compaction_stop_token(CloudTabletSPtr tabl
     return Status::OK();
 }
 
+Status CloudStorageEngine::_check_all_root_path_cluster_id() {
+    // Check if all root paths have the same cluster id
+    std::set<int32_t> cluster_ids;
+    for (const auto& path : _options.store_paths) {
+        auto cluster_id_path = fmt::format("{}/{}", path.path, 
CLUSTER_ID_PREFIX);
+        bool exists = false;
+        RETURN_IF_ERROR(io::global_local_filesystem()->exists(cluster_id_path, 
&exists));
+        if (exists) {
+            io::FileReaderSPtr reader;
+            
RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cluster_id_path, 
&reader));
+            size_t fsize = reader->size();
+            if (fsize > 0) {
+                std::string content;
+                content.resize(fsize, '\0');
+                size_t bytes_read = 0;
+                RETURN_IF_ERROR(reader->read_at(0, {content.data(), fsize}, 
&bytes_read));
+                DCHECK_EQ(fsize, bytes_read);
+                int32_t tmp_cluster_id = std::stoi(content);
+                cluster_ids.insert(tmp_cluster_id);
+            }
+        }
+    }
+    _effective_cluster_id = config::cluster_id;
+    // first init
+    if (cluster_ids.empty()) {
+        // not set configured cluster id
+        if (_effective_cluster_id == -1) {
+            return Status::OK();
+        } else {
+            // If no cluster id file exists, use the configured cluster id
+            RETURN_IF_ERROR(set_cluster_id(_effective_cluster_id));
+        }
+    }
+    if (cluster_ids.size() > 1) {
+        return Status::InternalError(
+                "All root paths must have the same cluster id, but you have "
+                "different cluster ids: {}",
+                fmt::join(cluster_ids, ", "));
+    }
+    if (_effective_cluster_id != -1 && *cluster_ids.begin() != 
_effective_cluster_id) {
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                Status::Corruption("multiple cluster ids is not equal. 
config::cluster_id={}, "
+                                   "storage path cluster_id={}",
+                                   _effective_cluster_id, 
*cluster_ids.begin()),
+                "cluster id not equal");
+    }
+    return Status::OK();
+}
+
+Status CloudStorageEngine::set_cluster_id(int32_t cluster_id) {
+    std::lock_guard<std::mutex> l(_store_lock);
+    for (auto& path : _options.store_paths) {
+        auto cluster_id_path = fmt::format("{}/{}", path.path, 
CLUSTER_ID_PREFIX);
+        bool exists = false;
+        RETURN_IF_ERROR(io::global_local_filesystem()->exists(cluster_id_path, 
&exists));
+        if (!exists) {
+            io::FileWriterPtr file_writer;
+            RETURN_IF_ERROR(
+                    
io::global_local_filesystem()->create_file(cluster_id_path, &file_writer));
+            RETURN_IF_ERROR(file_writer->append(std::to_string(cluster_id)));
+            RETURN_IF_ERROR(file_writer->close());
+        }
+    }
+    _effective_cluster_id = cluster_id;
+    return Status::OK();
+}
+
 #include "common/compile_check_end.h"
 } // namespace doris
diff --git a/be/src/cloud/cloud_storage_engine.h 
b/be/src/cloud/cloud_storage_engine.h
index f21e443a77e..b7aa2da94e8 100644
--- a/be/src/cloud/cloud_storage_engine.h
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <memory>
+#include <mutex>
 
 //#include "cloud/cloud_cumulative_compaction.h"
 //#include "cloud/cloud_base_compaction.h"
@@ -51,7 +52,7 @@ class CloudCompactionStopToken;
 
 class CloudStorageEngine final : public BaseStorageEngine {
 public:
-    CloudStorageEngine(const UniqueId& backend_uid);
+    CloudStorageEngine(const EngineOptions& options);
 
     ~CloudStorageEngine() override;
 
@@ -64,10 +65,7 @@ public:
 
     Status start_bg_threads() override;
 
-    Status set_cluster_id(int32_t cluster_id) override {
-        _effective_cluster_id = cluster_id;
-        return Status::OK();
-    }
+    Status set_cluster_id(int32_t cluster_id) override;
 
     cloud::CloudMetaMgr& meta_mgr() const { return *_meta_mgr; }
 
@@ -169,6 +167,7 @@ private:
     Status _request_tablet_global_compaction_lock(ReaderType compaction_type,
                                                   const CloudTabletSPtr& 
tablet,
                                                   
std::shared_ptr<CloudCompactionMixin> compaction);
+    Status _check_all_root_path_cluster_id();
     void _lease_compaction_thread_callback();
     void _check_tablet_delete_bitmap_score_callback();
 
@@ -221,6 +220,9 @@ private:
     CumuPolices _cumulative_compaction_policies;
 
     std::atomic_bool first_sync_storage_vault {true};
+
+    EngineOptions _options;
+    std::mutex _store_lock;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 8f30211602a..1d1ad2ee184 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -327,7 +327,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
     if (config::is_cloud_mode()) {
         std::cout << "start BE in cloud mode, cloud_unique_id: " << 
config::cloud_unique_id
                   << ", meta_service_endpoint: " << 
config::meta_service_endpoint << std::endl;
-        _storage_engine = 
std::make_unique<CloudStorageEngine>(options.backend_uid);
+        _storage_engine = std::make_unique<CloudStorageEngine>(options);
     } else {
         std::cout << "start BE in local mode" << std::endl;
         _storage_engine = std::make_unique<StorageEngine>(options);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to