This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fc890e590d0 [Enhancement](recycler) Add valid s3 vault config for 
recycler (#47723)
fc890e590d0 is described below

commit fc890e590d0609a7d9f64a3edc5a363c85b36721
Author: abmdocrt <lianyuk...@selectdb.com>
AuthorDate: Wed Feb 26 17:53:43 2025 +0800

    [Enhancement](recycler) Add valid s3 vault config for recycler (#47723)
    
    In the current multi-vault mode, deleting vaults is not allowed. This 
causes the recycler to still attempt accessing invalid vaults when deleting 
data, slowing down the deletion process. The HDFS accessor automatically checks 
the connectivity of vaults during initialization and only initializes connected 
HDFS vaults. However, S3 does not check if vaults are connected and initializes 
all vaults, including invalid ones. This PR adds a configuration that allows 
users to specify valid S3  [...]
---
 cloud/src/common/config.h                    |   3 +
 cloud/src/meta-service/meta_service_helper.h |  64 ++++++++-----
 cloud/src/recycler/recycler.cpp              |  28 +++++-
 cloud/src/recycler/s3_accessor.cpp           |   4 +-
 cloud/test/recycler_test.cpp                 | 134 +++++++++++++++++++++++++++
 5 files changed, 204 insertions(+), 29 deletions(-)

diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index c8a0eba3968..4db2ee37836 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -257,4 +257,7 @@ CONF_Bool(enable_check_instance_id, "true");
 
 // Check if ip eq 127.0.0.1, ms/recycler exit
 CONF_Bool(enable_loopback_address_for_ms, "false");
+// Which vaults should be recycled. If empty, recycle all vaults.
+// Comma seprated list: recycler_storage_vault_white_list="aaa,bbb,ccc"
+CONF_Strings(recycler_storage_vault_white_list, "");
 } // namespace doris::cloud::config
diff --git a/cloud/src/meta-service/meta_service_helper.h 
b/cloud/src/meta-service/meta_service_helper.h
index 7d626dd1186..cd9ed2f7f1d 100644
--- a/cloud/src/meta-service/meta_service_helper.h
+++ b/cloud/src/meta-service/meta_service_helper.h
@@ -52,6 +52,46 @@ inline std::string md5(const std::string& str) {
     return ss.str();
 }
 
+/**
+ * Encrypts all "sk" values in the given debug string with MD5 hashes.
+ * 
+ * Assumptions:
+ * - Input string contains one or more occurrences of "sk: " followed by a 
value in double quotes.
+ * - An md5() function exists that takes a std::string and returns its MD5 
hash as a string.
+ * 
+ * @param debug_string Input string containing "sk: " fields to be encrypted.
+ * @return A new string with all "sk" values replaced by their MD5 hashes.
+ * 
+ * Behavior:
+ * 1. Searches for all occurrences of "sk: " in the input string.
+ * 2. For each occurrence, extracts the value between double quotes.
+ * 3. Replaces the original value with "md5: " followed by its MD5 hash.
+ * 4. Returns the modified string with all "sk" values encrypted.
+ */
+inline std::string encryt_sk(std::string debug_string) {
+    // Start position for searching "sk" fields
+    size_t pos = 0;
+    // Iterate through the string and find all occurrences of "sk: "
+    while ((pos = debug_string.find("sk: ", pos)) != std::string::npos) {
+        // Find the start and end of the "sk" value (assumed to be within 
quotes)
+        // Start after the quote
+        size_t sk_value_start = debug_string.find('\"', pos) + 1;
+        // End at the next quote
+        size_t sk_value_end = debug_string.find('\"', sk_value_start);
+
+        // Extract the "sk" value
+        std::string sk_value = debug_string.substr(sk_value_start, 
sk_value_end - sk_value_start);
+        // Encrypt the "sk" value with MD5
+        std::string encrypted_sk = "md5: " + md5(sk_value);
+
+        // Replace the original "sk" value with the encrypted MD5 value
+        debug_string.replace(sk_value_start, sk_value_end - sk_value_start, 
encrypted_sk);
+        // Move the position to the end of the current "sk" field and continue 
searching
+        pos = sk_value_end;
+    }
+    return debug_string;
+}
+
 template <class Request>
 void begin_rpc(std::string_view func_name, brpc::Controller* ctrl, const 
Request* req) {
     if constexpr (std::is_same_v<Request, CreateRowsetRequest>) {
@@ -133,29 +173,7 @@ void finish_rpc(std::string_view func_name, 
brpc::Controller* ctrl, Response* re
                   << " status=" << res->status().ShortDebugString();
     } else if constexpr (std::is_same_v<Response, GetObjStoreInfoResponse> ||
                          std::is_same_v<Response, GetStageResponse>) {
-        std::string debug_string = res->DebugString();
-        // Start position for searching "sk" fields
-        size_t pos = 0;
-        // Iterate through the string and find all occurrences of "sk: "
-        while ((pos = debug_string.find("sk: ", pos)) != std::string::npos) {
-            // Find the start and end of the "sk" value (assumed to be within 
quotes)
-            // Start after the quote
-            size_t sk_value_start = debug_string.find('\"', pos) + 1;
-            // End at the next quote
-            size_t sk_value_end = debug_string.find('\"', sk_value_start);
-
-            // Extract the "sk" value
-            std::string sk_value =
-                    debug_string.substr(sk_value_start, sk_value_end - 
sk_value_start);
-            // Encrypt the "sk" value with MD5
-            std::string encrypted_sk = "md5: " + md5(sk_value);
-
-            // Replace the original "sk" value with the encrypted MD5 value
-            debug_string.replace(sk_value_start, sk_value_end - 
sk_value_start, encrypted_sk);
-
-            // Move the position to the end of the current "sk" field and 
continue searching
-            pos = sk_value_end;
-        }
+        std::string debug_string = encryt_sk(res->DebugString());
         TEST_SYNC_POINT_CALLBACK("sk_finish_rpc", &debug_string);
         LOG(INFO) << "finish " << func_name << " from " << ctrl->remote_side()
                   << " response=" << debug_string;
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 52597fad04b..40c328d8945 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -27,11 +27,13 @@
 #include <cstddef>
 #include <cstdint>
 #include <deque>
+#include <numeric>
 #include <string>
 #include <string_view>
 
 #include "common/stopwatch.h"
 #include "meta-service/meta_service.h"
+#include "meta-service/meta_service_helper.h"
 #include "meta-service/meta_service_schema.h"
 #include "meta-service/txn_kv.h"
 #include "meta-service/txn_kv_error.h"
@@ -533,9 +535,27 @@ int InstanceRecycler::init_storage_vault_accessors() {
             LOG(WARNING) << "malformed storage vault, unable to deserialize 
key=" << hex(k);
             return -1;
         }
+        std::string recycler_storage_vault_white_list = accumulate(
+                config::recycler_storage_vault_white_list.begin(),
+                config::recycler_storage_vault_white_list.end(), std::string(),
+                [](std::string a, std::string b) { return a + (a.empty() ? "" 
: ",") + b; });
+        LOG_INFO("config::recycler_storage_vault_white_list")
+                .tag("", recycler_storage_vault_white_list);
+        if (!config::recycler_storage_vault_white_list.empty()) {
+            if (auto it = 
std::find(config::recycler_storage_vault_white_list.begin(),
+                                    
config::recycler_storage_vault_white_list.end(), vault.name());
+                it == config::recycler_storage_vault_white_list.end()) {
+                LOG_WARNING(
+                        "failed to init accessor for vault because this vault 
is not in "
+                        "config::recycler_storage_vault_white_list. ")
+                        .tag(" vault name:", vault.name())
+                        .tag(" config::recycler_storage_vault_white_list:",
+                             recycler_storage_vault_white_list);
+                continue;
+            }
+        }
         
TEST_SYNC_POINT_CALLBACK("InstanceRecycler::init_storage_vault_accessors.mock_vault",
                                  &accessor_map_, &vault);
-
         if (vault.has_hdfs_info()) {
             auto accessor = std::make_shared<HdfsAccessor>(vault.hdfs_info());
             int ret = accessor->init();
@@ -558,17 +578,17 @@ int InstanceRecycler::init_storage_vault_accessors() {
             }
 
             std::shared_ptr<S3Accessor> accessor;
-            int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
+            int ret = S3Accessor::create(*s3_conf, &accessor);
             if (ret != 0) {
                 LOG(WARNING) << "failed to init s3 accessor. instance_id=" << 
instance_id_
                              << " resource_id=" << vault.id() << " name=" << 
vault.name()
                              << " ret=" << ret
-                             << " s3_vault=" << 
vault.obj_info().ShortDebugString();
+                             << " s3_vault=" << 
encryt_sk(vault.obj_info().ShortDebugString());
                 continue;
             }
             LOG(INFO) << "succeed to init s3 accessor. instance_id=" << 
instance_id_
                       << " resource_id=" << vault.id() << " name=" << 
vault.name() << " ret=" << ret
-                      << " s3_vault=" << vault.obj_info().ShortDebugString();
+                      << " s3_vault=" << 
encryt_sk(vault.obj_info().ShortDebugString());
             accessor_map_.emplace(vault.id(), std::move(accessor));
         }
     }
diff --git a/cloud/src/recycler/s3_accessor.cpp 
b/cloud/src/recycler/s3_accessor.cpp
index 0ef150e20d1..5356ddf38af 100644
--- a/cloud/src/recycler/s3_accessor.cpp
+++ b/cloud/src/recycler/s3_accessor.cpp
@@ -228,10 +228,10 @@ int S3Accessor::create(S3Conf conf, 
std::shared_ptr<S3Accessor>* accessor) {
     TEST_SYNC_POINT_RETURN_WITH_VALUE("S3Accessor::init.s3_init_failed", 
(int)-1);
     switch (conf.provider) {
     case S3Conf::GCS:
-        *accessor = std::make_shared<GcsAccessor>(std::move(conf));
+        *accessor = std::make_shared<GcsAccessor>(conf);
         break;
     default:
-        *accessor = std::make_shared<S3Accessor>(std::move(conf));
+        *accessor = std::make_shared<S3Accessor>(conf);
         break;
     }
 
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index b6143c9545e..8efa3378461 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -3616,6 +3616,140 @@ TEST(RecyclerTest, 
init_all_vault_accessors_failed_test) {
     EXPECT_EQ(recycler.init(), -2);
 }
 
+TEST(RecyclerTest, recycler_storage_vault_white_list_test) {
+    auto* sp = SyncPoint::get_instance();
+    std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, 
[&sp](int*) {
+        sp->clear_all_call_backs();
+        sp->clear_trace();
+        sp->disable_processing();
+    });
+
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    EXPECT_EQ(txn_kv->init(), 0);
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+    std::string key;
+    std::string val;
+
+    InstanceKeyInfo key_info {"test_instance"};
+    instance_key(key_info, &key);
+    InstanceInfoPB instance;
+    instance.set_instance_id("GetObjStoreInfoTestInstance");
+    {
+        ObjectStoreInfoPB obj_info;
+        StorageVaultPB vault;
+        obj_info.set_id("id");
+        obj_info.set_ak("ak");
+        obj_info.set_sk("sk");
+        obj_info.set_provider(ObjectStoreInfoPB_Provider_COS);
+        vault.mutable_obj_info()->MergeFrom(obj_info);
+        vault.set_name("s3_1");
+        vault.set_id("s3_1");
+        instance.add_storage_vault_names(vault.name());
+        instance.add_resource_ids(vault.id());
+        txn->put(storage_vault_key({instance.instance_id(), "1"}), 
vault.SerializeAsString());
+    }
+
+    {
+        ObjectStoreInfoPB obj_info;
+        StorageVaultPB vault;
+        obj_info.set_id("id");
+        obj_info.set_ak("ak");
+        obj_info.set_sk("sk");
+        obj_info.set_provider(ObjectStoreInfoPB_Provider_COS);
+        vault.mutable_obj_info()->MergeFrom(obj_info);
+        vault.set_name("s3_2");
+        vault.set_id("s3_2");
+        instance.add_storage_vault_names(vault.name());
+        instance.add_resource_ids(vault.id());
+        instance.set_instance_id("GetObjStoreInfoTestInstance");
+        txn->put(storage_vault_key({instance.instance_id(), "2"}), 
vault.SerializeAsString());
+    }
+
+    {
+        HdfsBuildConf hdfs_build_conf;
+        StorageVaultPB vault;
+        hdfs_build_conf.set_fs_name("fs_name");
+        hdfs_build_conf.set_user("root");
+        HdfsVaultInfo hdfs_info;
+        hdfs_info.set_prefix("root_path");
+        hdfs_info.mutable_build_conf()->MergeFrom(hdfs_build_conf);
+        vault.mutable_hdfs_info()->MergeFrom(hdfs_info);
+        vault.set_name("hdfs_1");
+        vault.set_id("hdfs_1");
+        instance.add_storage_vault_names(vault.name());
+        instance.add_resource_ids(vault.id());
+        instance.set_instance_id("GetObjStoreInfoTestInstance");
+        txn->put(storage_vault_key({instance.instance_id(), "3"}), 
vault.SerializeAsString());
+    }
+
+    {
+        HdfsBuildConf hdfs_build_conf;
+        StorageVaultPB vault;
+        hdfs_build_conf.set_fs_name("fs_name");
+        hdfs_build_conf.set_user("root");
+        HdfsVaultInfo hdfs_info;
+        hdfs_info.set_prefix("root_path");
+        hdfs_info.mutable_build_conf()->MergeFrom(hdfs_build_conf);
+        vault.mutable_hdfs_info()->MergeFrom(hdfs_info);
+        vault.set_name("hdfs_2");
+        vault.set_id("hdfs_2");
+        instance.add_storage_vault_names(vault.name());
+        instance.add_resource_ids(vault.id());
+        instance.set_instance_id("GetObjStoreInfoTestInstance");
+        txn->put(storage_vault_key({instance.instance_id(), "4"}), 
vault.SerializeAsString());
+    }
+
+    auto accessor = std::make_shared<MockAccessor>();
+    EXPECT_EQ(accessor->put_file("data/0/test.csv", ""), 0);
+    sp->set_call_back("HdfsAccessor::init.hdfs_init_failed", [](auto&& args) {
+        auto* ret = try_any_cast_ret<int>(args);
+        ret->first = 0;
+        ret->second = true;
+    });
+    sp->set_call_back(
+            "InstanceRecycler::init_storage_vault_accessors.mock_vault", 
[&accessor](auto&& args) {
+                auto* map = try_any_cast<
+                        std::unordered_map<std::string, 
std::shared_ptr<StorageVaultAccessor>>*>(
+                        args[0]);
+                auto* vault = try_any_cast<StorageVaultPB*>(args[1]);
+                map->emplace(vault->id(), accessor);
+            });
+    sp->enable_processing();
+
+    val = instance.SerializeAsString();
+    txn->put(key, val);
+    EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+    EXPECT_EQ(accessor->exists("data/0/test.csv"), 0);
+
+    {
+        config::recycler_storage_vault_white_list = {};
+        InstanceRecycler recycler(txn_kv, instance, thread_group,
+                                  std::make_shared<TxnLazyCommitter>(txn_kv));
+        EXPECT_EQ(recycler.init(), 0);
+        EXPECT_EQ(recycler.accessor_map_.size(), 4);
+    }
+
+    {
+        config::recycler_storage_vault_white_list = {"s3_1", "s3_2", "hdfs_1", 
"hdfs_2"};
+        InstanceRecycler recycler(txn_kv, instance, thread_group,
+                                  std::make_shared<TxnLazyCommitter>(txn_kv));
+        EXPECT_EQ(recycler.init(), 0);
+        EXPECT_EQ(recycler.accessor_map_.size(), 4);
+    }
+
+    {
+        config::recycler_storage_vault_white_list = {"s3_1", "hdfs_1"};
+        InstanceRecycler recycler(txn_kv, instance, thread_group,
+                                  std::make_shared<TxnLazyCommitter>(txn_kv));
+        EXPECT_EQ(recycler.init(), 0);
+        EXPECT_EQ(recycler.accessor_map_.size(), 2);
+        
EXPECT_EQ(recycler.accessor_map_.at("s3_1")->exists("data/0/test.csv"), 0);
+        
EXPECT_EQ(recycler.accessor_map_.at("hdfs_1")->exists("data/0/test.csv"), 0);
+    }
+}
+
 TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v1) {
     auto* sp = SyncPoint::get_instance();
     std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, 
[&sp](int*) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to