This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new fc890e590d0 [Enhancement](recycler) Add valid s3 vault config for recycler (#47723) fc890e590d0 is described below commit fc890e590d0609a7d9f64a3edc5a363c85b36721 Author: abmdocrt <lianyuk...@selectdb.com> AuthorDate: Wed Feb 26 17:53:43 2025 +0800 [Enhancement](recycler) Add valid s3 vault config for recycler (#47723) In the current multi-vault mode, deleting vaults is not allowed. This causes the recycler to still attempt accessing invalid vaults when deleting data, slowing down the deletion process. The HDFS accessor automatically checks the connectivity of vaults during initialization and only initializes connected HDFS vaults. However, S3 does not check if vaults are connected and initializes all vaults, including invalid ones. This PR adds a configuration that allows users to specify valid S3 [...] --- cloud/src/common/config.h | 3 + cloud/src/meta-service/meta_service_helper.h | 64 ++++++++----- cloud/src/recycler/recycler.cpp | 28 +++++- cloud/src/recycler/s3_accessor.cpp | 4 +- cloud/test/recycler_test.cpp | 134 +++++++++++++++++++++++++++ 5 files changed, 204 insertions(+), 29 deletions(-) diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h index c8a0eba3968..4db2ee37836 100644 --- a/cloud/src/common/config.h +++ b/cloud/src/common/config.h @@ -257,4 +257,7 @@ CONF_Bool(enable_check_instance_id, "true"); // Check if ip eq 127.0.0.1, ms/recycler exit CONF_Bool(enable_loopback_address_for_ms, "false"); +// Which vaults should be recycled. If empty, recycle all vaults. +// Comma seprated list: recycler_storage_vault_white_list="aaa,bbb,ccc" +CONF_Strings(recycler_storage_vault_white_list, ""); } // namespace doris::cloud::config diff --git a/cloud/src/meta-service/meta_service_helper.h b/cloud/src/meta-service/meta_service_helper.h index 7d626dd1186..cd9ed2f7f1d 100644 --- a/cloud/src/meta-service/meta_service_helper.h +++ b/cloud/src/meta-service/meta_service_helper.h @@ -52,6 +52,46 @@ inline std::string md5(const std::string& str) { return ss.str(); } +/** + * Encrypts all "sk" values in the given debug string with MD5 hashes. + * + * Assumptions: + * - Input string contains one or more occurrences of "sk: " followed by a value in double quotes. + * - An md5() function exists that takes a std::string and returns its MD5 hash as a string. + * + * @param debug_string Input string containing "sk: " fields to be encrypted. + * @return A new string with all "sk" values replaced by their MD5 hashes. + * + * Behavior: + * 1. Searches for all occurrences of "sk: " in the input string. + * 2. For each occurrence, extracts the value between double quotes. + * 3. Replaces the original value with "md5: " followed by its MD5 hash. + * 4. Returns the modified string with all "sk" values encrypted. + */ +inline std::string encryt_sk(std::string debug_string) { + // Start position for searching "sk" fields + size_t pos = 0; + // Iterate through the string and find all occurrences of "sk: " + while ((pos = debug_string.find("sk: ", pos)) != std::string::npos) { + // Find the start and end of the "sk" value (assumed to be within quotes) + // Start after the quote + size_t sk_value_start = debug_string.find('\"', pos) + 1; + // End at the next quote + size_t sk_value_end = debug_string.find('\"', sk_value_start); + + // Extract the "sk" value + std::string sk_value = debug_string.substr(sk_value_start, sk_value_end - sk_value_start); + // Encrypt the "sk" value with MD5 + std::string encrypted_sk = "md5: " + md5(sk_value); + + // Replace the original "sk" value with the encrypted MD5 value + debug_string.replace(sk_value_start, sk_value_end - sk_value_start, encrypted_sk); + // Move the position to the end of the current "sk" field and continue searching + pos = sk_value_end; + } + return debug_string; +} + template <class Request> void begin_rpc(std::string_view func_name, brpc::Controller* ctrl, const Request* req) { if constexpr (std::is_same_v<Request, CreateRowsetRequest>) { @@ -133,29 +173,7 @@ void finish_rpc(std::string_view func_name, brpc::Controller* ctrl, Response* re << " status=" << res->status().ShortDebugString(); } else if constexpr (std::is_same_v<Response, GetObjStoreInfoResponse> || std::is_same_v<Response, GetStageResponse>) { - std::string debug_string = res->DebugString(); - // Start position for searching "sk" fields - size_t pos = 0; - // Iterate through the string and find all occurrences of "sk: " - while ((pos = debug_string.find("sk: ", pos)) != std::string::npos) { - // Find the start and end of the "sk" value (assumed to be within quotes) - // Start after the quote - size_t sk_value_start = debug_string.find('\"', pos) + 1; - // End at the next quote - size_t sk_value_end = debug_string.find('\"', sk_value_start); - - // Extract the "sk" value - std::string sk_value = - debug_string.substr(sk_value_start, sk_value_end - sk_value_start); - // Encrypt the "sk" value with MD5 - std::string encrypted_sk = "md5: " + md5(sk_value); - - // Replace the original "sk" value with the encrypted MD5 value - debug_string.replace(sk_value_start, sk_value_end - sk_value_start, encrypted_sk); - - // Move the position to the end of the current "sk" field and continue searching - pos = sk_value_end; - } + std::string debug_string = encryt_sk(res->DebugString()); TEST_SYNC_POINT_CALLBACK("sk_finish_rpc", &debug_string); LOG(INFO) << "finish " << func_name << " from " << ctrl->remote_side() << " response=" << debug_string; diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 52597fad04b..40c328d8945 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -27,11 +27,13 @@ #include <cstddef> #include <cstdint> #include <deque> +#include <numeric> #include <string> #include <string_view> #include "common/stopwatch.h" #include "meta-service/meta_service.h" +#include "meta-service/meta_service_helper.h" #include "meta-service/meta_service_schema.h" #include "meta-service/txn_kv.h" #include "meta-service/txn_kv_error.h" @@ -533,9 +535,27 @@ int InstanceRecycler::init_storage_vault_accessors() { LOG(WARNING) << "malformed storage vault, unable to deserialize key=" << hex(k); return -1; } + std::string recycler_storage_vault_white_list = accumulate( + config::recycler_storage_vault_white_list.begin(), + config::recycler_storage_vault_white_list.end(), std::string(), + [](std::string a, std::string b) { return a + (a.empty() ? "" : ",") + b; }); + LOG_INFO("config::recycler_storage_vault_white_list") + .tag("", recycler_storage_vault_white_list); + if (!config::recycler_storage_vault_white_list.empty()) { + if (auto it = std::find(config::recycler_storage_vault_white_list.begin(), + config::recycler_storage_vault_white_list.end(), vault.name()); + it == config::recycler_storage_vault_white_list.end()) { + LOG_WARNING( + "failed to init accessor for vault because this vault is not in " + "config::recycler_storage_vault_white_list. ") + .tag(" vault name:", vault.name()) + .tag(" config::recycler_storage_vault_white_list:", + recycler_storage_vault_white_list); + continue; + } + } TEST_SYNC_POINT_CALLBACK("InstanceRecycler::init_storage_vault_accessors.mock_vault", &accessor_map_, &vault); - if (vault.has_hdfs_info()) { auto accessor = std::make_shared<HdfsAccessor>(vault.hdfs_info()); int ret = accessor->init(); @@ -558,17 +578,17 @@ int InstanceRecycler::init_storage_vault_accessors() { } std::shared_ptr<S3Accessor> accessor; - int ret = S3Accessor::create(std::move(*s3_conf), &accessor); + int ret = S3Accessor::create(*s3_conf, &accessor); if (ret != 0) { LOG(WARNING) << "failed to init s3 accessor. instance_id=" << instance_id_ << " resource_id=" << vault.id() << " name=" << vault.name() << " ret=" << ret - << " s3_vault=" << vault.obj_info().ShortDebugString(); + << " s3_vault=" << encryt_sk(vault.obj_info().ShortDebugString()); continue; } LOG(INFO) << "succeed to init s3 accessor. instance_id=" << instance_id_ << " resource_id=" << vault.id() << " name=" << vault.name() << " ret=" << ret - << " s3_vault=" << vault.obj_info().ShortDebugString(); + << " s3_vault=" << encryt_sk(vault.obj_info().ShortDebugString()); accessor_map_.emplace(vault.id(), std::move(accessor)); } } diff --git a/cloud/src/recycler/s3_accessor.cpp b/cloud/src/recycler/s3_accessor.cpp index 0ef150e20d1..5356ddf38af 100644 --- a/cloud/src/recycler/s3_accessor.cpp +++ b/cloud/src/recycler/s3_accessor.cpp @@ -228,10 +228,10 @@ int S3Accessor::create(S3Conf conf, std::shared_ptr<S3Accessor>* accessor) { TEST_SYNC_POINT_RETURN_WITH_VALUE("S3Accessor::init.s3_init_failed", (int)-1); switch (conf.provider) { case S3Conf::GCS: - *accessor = std::make_shared<GcsAccessor>(std::move(conf)); + *accessor = std::make_shared<GcsAccessor>(conf); break; default: - *accessor = std::make_shared<S3Accessor>(std::move(conf)); + *accessor = std::make_shared<S3Accessor>(conf); break; } diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index b6143c9545e..8efa3378461 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -3616,6 +3616,140 @@ TEST(RecyclerTest, init_all_vault_accessors_failed_test) { EXPECT_EQ(recycler.init(), -2); } +TEST(RecyclerTest, recycler_storage_vault_white_list_test) { + auto* sp = SyncPoint::get_instance(); + std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, [&sp](int*) { + sp->clear_all_call_backs(); + sp->clear_trace(); + sp->disable_processing(); + }); + + auto txn_kv = std::make_shared<MemTxnKv>(); + EXPECT_EQ(txn_kv->init(), 0); + std::unique_ptr<Transaction> txn; + ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + std::string key; + std::string val; + + InstanceKeyInfo key_info {"test_instance"}; + instance_key(key_info, &key); + InstanceInfoPB instance; + instance.set_instance_id("GetObjStoreInfoTestInstance"); + { + ObjectStoreInfoPB obj_info; + StorageVaultPB vault; + obj_info.set_id("id"); + obj_info.set_ak("ak"); + obj_info.set_sk("sk"); + obj_info.set_provider(ObjectStoreInfoPB_Provider_COS); + vault.mutable_obj_info()->MergeFrom(obj_info); + vault.set_name("s3_1"); + vault.set_id("s3_1"); + instance.add_storage_vault_names(vault.name()); + instance.add_resource_ids(vault.id()); + txn->put(storage_vault_key({instance.instance_id(), "1"}), vault.SerializeAsString()); + } + + { + ObjectStoreInfoPB obj_info; + StorageVaultPB vault; + obj_info.set_id("id"); + obj_info.set_ak("ak"); + obj_info.set_sk("sk"); + obj_info.set_provider(ObjectStoreInfoPB_Provider_COS); + vault.mutable_obj_info()->MergeFrom(obj_info); + vault.set_name("s3_2"); + vault.set_id("s3_2"); + instance.add_storage_vault_names(vault.name()); + instance.add_resource_ids(vault.id()); + instance.set_instance_id("GetObjStoreInfoTestInstance"); + txn->put(storage_vault_key({instance.instance_id(), "2"}), vault.SerializeAsString()); + } + + { + HdfsBuildConf hdfs_build_conf; + StorageVaultPB vault; + hdfs_build_conf.set_fs_name("fs_name"); + hdfs_build_conf.set_user("root"); + HdfsVaultInfo hdfs_info; + hdfs_info.set_prefix("root_path"); + hdfs_info.mutable_build_conf()->MergeFrom(hdfs_build_conf); + vault.mutable_hdfs_info()->MergeFrom(hdfs_info); + vault.set_name("hdfs_1"); + vault.set_id("hdfs_1"); + instance.add_storage_vault_names(vault.name()); + instance.add_resource_ids(vault.id()); + instance.set_instance_id("GetObjStoreInfoTestInstance"); + txn->put(storage_vault_key({instance.instance_id(), "3"}), vault.SerializeAsString()); + } + + { + HdfsBuildConf hdfs_build_conf; + StorageVaultPB vault; + hdfs_build_conf.set_fs_name("fs_name"); + hdfs_build_conf.set_user("root"); + HdfsVaultInfo hdfs_info; + hdfs_info.set_prefix("root_path"); + hdfs_info.mutable_build_conf()->MergeFrom(hdfs_build_conf); + vault.mutable_hdfs_info()->MergeFrom(hdfs_info); + vault.set_name("hdfs_2"); + vault.set_id("hdfs_2"); + instance.add_storage_vault_names(vault.name()); + instance.add_resource_ids(vault.id()); + instance.set_instance_id("GetObjStoreInfoTestInstance"); + txn->put(storage_vault_key({instance.instance_id(), "4"}), vault.SerializeAsString()); + } + + auto accessor = std::make_shared<MockAccessor>(); + EXPECT_EQ(accessor->put_file("data/0/test.csv", ""), 0); + sp->set_call_back("HdfsAccessor::init.hdfs_init_failed", [](auto&& args) { + auto* ret = try_any_cast_ret<int>(args); + ret->first = 0; + ret->second = true; + }); + sp->set_call_back( + "InstanceRecycler::init_storage_vault_accessors.mock_vault", [&accessor](auto&& args) { + auto* map = try_any_cast< + std::unordered_map<std::string, std::shared_ptr<StorageVaultAccessor>>*>( + args[0]); + auto* vault = try_any_cast<StorageVaultPB*>(args[1]); + map->emplace(vault->id(), accessor); + }); + sp->enable_processing(); + + val = instance.SerializeAsString(); + txn->put(key, val); + EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + + EXPECT_EQ(accessor->exists("data/0/test.csv"), 0); + + { + config::recycler_storage_vault_white_list = {}; + InstanceRecycler recycler(txn_kv, instance, thread_group, + std::make_shared<TxnLazyCommitter>(txn_kv)); + EXPECT_EQ(recycler.init(), 0); + EXPECT_EQ(recycler.accessor_map_.size(), 4); + } + + { + config::recycler_storage_vault_white_list = {"s3_1", "s3_2", "hdfs_1", "hdfs_2"}; + InstanceRecycler recycler(txn_kv, instance, thread_group, + std::make_shared<TxnLazyCommitter>(txn_kv)); + EXPECT_EQ(recycler.init(), 0); + EXPECT_EQ(recycler.accessor_map_.size(), 4); + } + + { + config::recycler_storage_vault_white_list = {"s3_1", "hdfs_1"}; + InstanceRecycler recycler(txn_kv, instance, thread_group, + std::make_shared<TxnLazyCommitter>(txn_kv)); + EXPECT_EQ(recycler.init(), 0); + EXPECT_EQ(recycler.accessor_map_.size(), 2); + EXPECT_EQ(recycler.accessor_map_.at("s3_1")->exists("data/0/test.csv"), 0); + EXPECT_EQ(recycler.accessor_map_.at("hdfs_1")->exists("data/0/test.csv"), 0); + } +} + TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v1) { auto* sp = SyncPoint::get_instance(); std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, [&sp](int*) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org