This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new fbcd8ec2c98 branch-3.0: [Enhancement](recycler) Add valid s3 vault 
config for recycler #47723 (#48375)
fbcd8ec2c98 is described below

commit fbcd8ec2c98f0085458c5f105f6eaed570026a12
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Feb 27 17:58:48 2025 +0800

    branch-3.0: [Enhancement](recycler) Add valid s3 vault config for recycler 
#47723 (#48375)
    
    Cherry-picked from #47723
    
    Co-authored-by: abmdocrt <lianyuk...@selectdb.com>
---
 cloud/src/common/config.h                    |   3 +
 cloud/src/meta-service/meta_service_helper.h |  64 ++++++++-----
 cloud/src/recycler/recycler.cpp              |  28 +++++-
 cloud/src/recycler/s3_accessor.cpp           |   4 +-
 cloud/test/recycler_test.cpp                 | 134 +++++++++++++++++++++++++++
 5 files changed, 204 insertions(+), 29 deletions(-)

diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 1823ad90bf2..a0c469e820d 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -252,4 +252,7 @@ CONF_Bool(enable_check_instance_id, "true");
 
 // Check if ip eq 127.0.0.1, ms/recycler exit
 CONF_Bool(enable_loopback_address_for_ms, "false");
+// Which vaults should be recycled. If empty, recycle all vaults.
+// Comma seprated list: recycler_storage_vault_white_list="aaa,bbb,ccc"
+CONF_Strings(recycler_storage_vault_white_list, "");
 } // namespace doris::cloud::config
diff --git a/cloud/src/meta-service/meta_service_helper.h 
b/cloud/src/meta-service/meta_service_helper.h
index 7d626dd1186..cd9ed2f7f1d 100644
--- a/cloud/src/meta-service/meta_service_helper.h
+++ b/cloud/src/meta-service/meta_service_helper.h
@@ -52,6 +52,46 @@ inline std::string md5(const std::string& str) {
     return ss.str();
 }
 
+/**
+ * Encrypts all "sk" values in the given debug string with MD5 hashes.
+ * 
+ * Assumptions:
+ * - Input string contains one or more occurrences of "sk: " followed by a 
value in double quotes.
+ * - An md5() function exists that takes a std::string and returns its MD5 
hash as a string.
+ * 
+ * @param debug_string Input string containing "sk: " fields to be encrypted.
+ * @return A new string with all "sk" values replaced by their MD5 hashes.
+ * 
+ * Behavior:
+ * 1. Searches for all occurrences of "sk: " in the input string.
+ * 2. For each occurrence, extracts the value between double quotes.
+ * 3. Replaces the original value with "md5: " followed by its MD5 hash.
+ * 4. Returns the modified string with all "sk" values encrypted.
+ */
+inline std::string encryt_sk(std::string debug_string) {
+    // Start position for searching "sk" fields
+    size_t pos = 0;
+    // Iterate through the string and find all occurrences of "sk: "
+    while ((pos = debug_string.find("sk: ", pos)) != std::string::npos) {
+        // Find the start and end of the "sk" value (assumed to be within 
quotes)
+        // Start after the quote
+        size_t sk_value_start = debug_string.find('\"', pos) + 1;
+        // End at the next quote
+        size_t sk_value_end = debug_string.find('\"', sk_value_start);
+
+        // Extract the "sk" value
+        std::string sk_value = debug_string.substr(sk_value_start, 
sk_value_end - sk_value_start);
+        // Encrypt the "sk" value with MD5
+        std::string encrypted_sk = "md5: " + md5(sk_value);
+
+        // Replace the original "sk" value with the encrypted MD5 value
+        debug_string.replace(sk_value_start, sk_value_end - sk_value_start, 
encrypted_sk);
+        // Move the position to the end of the current "sk" field and continue 
searching
+        pos = sk_value_end;
+    }
+    return debug_string;
+}
+
 template <class Request>
 void begin_rpc(std::string_view func_name, brpc::Controller* ctrl, const 
Request* req) {
     if constexpr (std::is_same_v<Request, CreateRowsetRequest>) {
@@ -133,29 +173,7 @@ void finish_rpc(std::string_view func_name, 
brpc::Controller* ctrl, Response* re
                   << " status=" << res->status().ShortDebugString();
     } else if constexpr (std::is_same_v<Response, GetObjStoreInfoResponse> ||
                          std::is_same_v<Response, GetStageResponse>) {
-        std::string debug_string = res->DebugString();
-        // Start position for searching "sk" fields
-        size_t pos = 0;
-        // Iterate through the string and find all occurrences of "sk: "
-        while ((pos = debug_string.find("sk: ", pos)) != std::string::npos) {
-            // Find the start and end of the "sk" value (assumed to be within 
quotes)
-            // Start after the quote
-            size_t sk_value_start = debug_string.find('\"', pos) + 1;
-            // End at the next quote
-            size_t sk_value_end = debug_string.find('\"', sk_value_start);
-
-            // Extract the "sk" value
-            std::string sk_value =
-                    debug_string.substr(sk_value_start, sk_value_end - 
sk_value_start);
-            // Encrypt the "sk" value with MD5
-            std::string encrypted_sk = "md5: " + md5(sk_value);
-
-            // Replace the original "sk" value with the encrypted MD5 value
-            debug_string.replace(sk_value_start, sk_value_end - 
sk_value_start, encrypted_sk);
-
-            // Move the position to the end of the current "sk" field and 
continue searching
-            pos = sk_value_end;
-        }
+        std::string debug_string = encryt_sk(res->DebugString());
         TEST_SYNC_POINT_CALLBACK("sk_finish_rpc", &debug_string);
         LOG(INFO) << "finish " << func_name << " from " << ctrl->remote_side()
                   << " response=" << debug_string;
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 52597fad04b..40c328d8945 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -27,11 +27,13 @@
 #include <cstddef>
 #include <cstdint>
 #include <deque>
+#include <numeric>
 #include <string>
 #include <string_view>
 
 #include "common/stopwatch.h"
 #include "meta-service/meta_service.h"
+#include "meta-service/meta_service_helper.h"
 #include "meta-service/meta_service_schema.h"
 #include "meta-service/txn_kv.h"
 #include "meta-service/txn_kv_error.h"
@@ -533,9 +535,27 @@ int InstanceRecycler::init_storage_vault_accessors() {
             LOG(WARNING) << "malformed storage vault, unable to deserialize 
key=" << hex(k);
             return -1;
         }
+        std::string recycler_storage_vault_white_list = accumulate(
+                config::recycler_storage_vault_white_list.begin(),
+                config::recycler_storage_vault_white_list.end(), std::string(),
+                [](std::string a, std::string b) { return a + (a.empty() ? "" 
: ",") + b; });
+        LOG_INFO("config::recycler_storage_vault_white_list")
+                .tag("", recycler_storage_vault_white_list);
+        if (!config::recycler_storage_vault_white_list.empty()) {
+            if (auto it = 
std::find(config::recycler_storage_vault_white_list.begin(),
+                                    
config::recycler_storage_vault_white_list.end(), vault.name());
+                it == config::recycler_storage_vault_white_list.end()) {
+                LOG_WARNING(
+                        "failed to init accessor for vault because this vault 
is not in "
+                        "config::recycler_storage_vault_white_list. ")
+                        .tag(" vault name:", vault.name())
+                        .tag(" config::recycler_storage_vault_white_list:",
+                             recycler_storage_vault_white_list);
+                continue;
+            }
+        }
         
TEST_SYNC_POINT_CALLBACK("InstanceRecycler::init_storage_vault_accessors.mock_vault",
                                  &accessor_map_, &vault);
-
         if (vault.has_hdfs_info()) {
             auto accessor = std::make_shared<HdfsAccessor>(vault.hdfs_info());
             int ret = accessor->init();
@@ -558,17 +578,17 @@ int InstanceRecycler::init_storage_vault_accessors() {
             }
 
             std::shared_ptr<S3Accessor> accessor;
-            int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
+            int ret = S3Accessor::create(*s3_conf, &accessor);
             if (ret != 0) {
                 LOG(WARNING) << "failed to init s3 accessor. instance_id=" << 
instance_id_
                              << " resource_id=" << vault.id() << " name=" << 
vault.name()
                              << " ret=" << ret
-                             << " s3_vault=" << 
vault.obj_info().ShortDebugString();
+                             << " s3_vault=" << 
encryt_sk(vault.obj_info().ShortDebugString());
                 continue;
             }
             LOG(INFO) << "succeed to init s3 accessor. instance_id=" << 
instance_id_
                       << " resource_id=" << vault.id() << " name=" << 
vault.name() << " ret=" << ret
-                      << " s3_vault=" << vault.obj_info().ShortDebugString();
+                      << " s3_vault=" << 
encryt_sk(vault.obj_info().ShortDebugString());
             accessor_map_.emplace(vault.id(), std::move(accessor));
         }
     }
diff --git a/cloud/src/recycler/s3_accessor.cpp 
b/cloud/src/recycler/s3_accessor.cpp
index 0ef150e20d1..5356ddf38af 100644
--- a/cloud/src/recycler/s3_accessor.cpp
+++ b/cloud/src/recycler/s3_accessor.cpp
@@ -228,10 +228,10 @@ int S3Accessor::create(S3Conf conf, 
std::shared_ptr<S3Accessor>* accessor) {
     TEST_SYNC_POINT_RETURN_WITH_VALUE("S3Accessor::init.s3_init_failed", 
(int)-1);
     switch (conf.provider) {
     case S3Conf::GCS:
-        *accessor = std::make_shared<GcsAccessor>(std::move(conf));
+        *accessor = std::make_shared<GcsAccessor>(conf);
         break;
     default:
-        *accessor = std::make_shared<S3Accessor>(std::move(conf));
+        *accessor = std::make_shared<S3Accessor>(conf);
         break;
     }
 
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index f1834ad8003..d2e8c50ff4e 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -3616,6 +3616,140 @@ TEST(RecyclerTest, 
init_all_vault_accessors_failed_test) {
     EXPECT_EQ(recycler.init(), -2);
 }
 
+TEST(RecyclerTest, recycler_storage_vault_white_list_test) {
+    auto* sp = SyncPoint::get_instance();
+    std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, 
[&sp](int*) {
+        sp->clear_all_call_backs();
+        sp->clear_trace();
+        sp->disable_processing();
+    });
+
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    EXPECT_EQ(txn_kv->init(), 0);
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+    std::string key;
+    std::string val;
+
+    InstanceKeyInfo key_info {"test_instance"};
+    instance_key(key_info, &key);
+    InstanceInfoPB instance;
+    instance.set_instance_id("GetObjStoreInfoTestInstance");
+    {
+        ObjectStoreInfoPB obj_info;
+        StorageVaultPB vault;
+        obj_info.set_id("id");
+        obj_info.set_ak("ak");
+        obj_info.set_sk("sk");
+        obj_info.set_provider(ObjectStoreInfoPB_Provider_COS);
+        vault.mutable_obj_info()->MergeFrom(obj_info);
+        vault.set_name("s3_1");
+        vault.set_id("s3_1");
+        instance.add_storage_vault_names(vault.name());
+        instance.add_resource_ids(vault.id());
+        txn->put(storage_vault_key({instance.instance_id(), "1"}), 
vault.SerializeAsString());
+    }
+
+    {
+        ObjectStoreInfoPB obj_info;
+        StorageVaultPB vault;
+        obj_info.set_id("id");
+        obj_info.set_ak("ak");
+        obj_info.set_sk("sk");
+        obj_info.set_provider(ObjectStoreInfoPB_Provider_COS);
+        vault.mutable_obj_info()->MergeFrom(obj_info);
+        vault.set_name("s3_2");
+        vault.set_id("s3_2");
+        instance.add_storage_vault_names(vault.name());
+        instance.add_resource_ids(vault.id());
+        instance.set_instance_id("GetObjStoreInfoTestInstance");
+        txn->put(storage_vault_key({instance.instance_id(), "2"}), 
vault.SerializeAsString());
+    }
+
+    {
+        HdfsBuildConf hdfs_build_conf;
+        StorageVaultPB vault;
+        hdfs_build_conf.set_fs_name("fs_name");
+        hdfs_build_conf.set_user("root");
+        HdfsVaultInfo hdfs_info;
+        hdfs_info.set_prefix("root_path");
+        hdfs_info.mutable_build_conf()->MergeFrom(hdfs_build_conf);
+        vault.mutable_hdfs_info()->MergeFrom(hdfs_info);
+        vault.set_name("hdfs_1");
+        vault.set_id("hdfs_1");
+        instance.add_storage_vault_names(vault.name());
+        instance.add_resource_ids(vault.id());
+        instance.set_instance_id("GetObjStoreInfoTestInstance");
+        txn->put(storage_vault_key({instance.instance_id(), "3"}), 
vault.SerializeAsString());
+    }
+
+    {
+        HdfsBuildConf hdfs_build_conf;
+        StorageVaultPB vault;
+        hdfs_build_conf.set_fs_name("fs_name");
+        hdfs_build_conf.set_user("root");
+        HdfsVaultInfo hdfs_info;
+        hdfs_info.set_prefix("root_path");
+        hdfs_info.mutable_build_conf()->MergeFrom(hdfs_build_conf);
+        vault.mutable_hdfs_info()->MergeFrom(hdfs_info);
+        vault.set_name("hdfs_2");
+        vault.set_id("hdfs_2");
+        instance.add_storage_vault_names(vault.name());
+        instance.add_resource_ids(vault.id());
+        instance.set_instance_id("GetObjStoreInfoTestInstance");
+        txn->put(storage_vault_key({instance.instance_id(), "4"}), 
vault.SerializeAsString());
+    }
+
+    auto accessor = std::make_shared<MockAccessor>();
+    EXPECT_EQ(accessor->put_file("data/0/test.csv", ""), 0);
+    sp->set_call_back("HdfsAccessor::init.hdfs_init_failed", [](auto&& args) {
+        auto* ret = try_any_cast_ret<int>(args);
+        ret->first = 0;
+        ret->second = true;
+    });
+    sp->set_call_back(
+            "InstanceRecycler::init_storage_vault_accessors.mock_vault", 
[&accessor](auto&& args) {
+                auto* map = try_any_cast<
+                        std::unordered_map<std::string, 
std::shared_ptr<StorageVaultAccessor>>*>(
+                        args[0]);
+                auto* vault = try_any_cast<StorageVaultPB*>(args[1]);
+                map->emplace(vault->id(), accessor);
+            });
+    sp->enable_processing();
+
+    val = instance.SerializeAsString();
+    txn->put(key, val);
+    EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+    EXPECT_EQ(accessor->exists("data/0/test.csv"), 0);
+
+    {
+        config::recycler_storage_vault_white_list = {};
+        InstanceRecycler recycler(txn_kv, instance, thread_group,
+                                  std::make_shared<TxnLazyCommitter>(txn_kv));
+        EXPECT_EQ(recycler.init(), 0);
+        EXPECT_EQ(recycler.accessor_map_.size(), 4);
+    }
+
+    {
+        config::recycler_storage_vault_white_list = {"s3_1", "s3_2", "hdfs_1", 
"hdfs_2"};
+        InstanceRecycler recycler(txn_kv, instance, thread_group,
+                                  std::make_shared<TxnLazyCommitter>(txn_kv));
+        EXPECT_EQ(recycler.init(), 0);
+        EXPECT_EQ(recycler.accessor_map_.size(), 4);
+    }
+
+    {
+        config::recycler_storage_vault_white_list = {"s3_1", "hdfs_1"};
+        InstanceRecycler recycler(txn_kv, instance, thread_group,
+                                  std::make_shared<TxnLazyCommitter>(txn_kv));
+        EXPECT_EQ(recycler.init(), 0);
+        EXPECT_EQ(recycler.accessor_map_.size(), 2);
+        
EXPECT_EQ(recycler.accessor_map_.at("s3_1")->exists("data/0/test.csv"), 0);
+        
EXPECT_EQ(recycler.accessor_map_.at("hdfs_1")->exists("data/0/test.csv"), 0);
+    }
+}
+
 TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v1) {
     auto* sp = SyncPoint::get_instance();
     std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, 
[&sp](int*) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to