This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new fbcd8ec2c98 branch-3.0: [Enhancement](recycler) Add valid s3 vault config for recycler #47723 (#48375) fbcd8ec2c98 is described below commit fbcd8ec2c98f0085458c5f105f6eaed570026a12 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Thu Feb 27 17:58:48 2025 +0800 branch-3.0: [Enhancement](recycler) Add valid s3 vault config for recycler #47723 (#48375) Cherry-picked from #47723 Co-authored-by: abmdocrt <lianyuk...@selectdb.com> --- cloud/src/common/config.h | 3 + cloud/src/meta-service/meta_service_helper.h | 64 ++++++++----- cloud/src/recycler/recycler.cpp | 28 +++++- cloud/src/recycler/s3_accessor.cpp | 4 +- cloud/test/recycler_test.cpp | 134 +++++++++++++++++++++++++++ 5 files changed, 204 insertions(+), 29 deletions(-) diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h index 1823ad90bf2..a0c469e820d 100644 --- a/cloud/src/common/config.h +++ b/cloud/src/common/config.h @@ -252,4 +252,7 @@ CONF_Bool(enable_check_instance_id, "true"); // Check if ip eq 127.0.0.1, ms/recycler exit CONF_Bool(enable_loopback_address_for_ms, "false"); +// Which vaults should be recycled. If empty, recycle all vaults. +// Comma seprated list: recycler_storage_vault_white_list="aaa,bbb,ccc" +CONF_Strings(recycler_storage_vault_white_list, ""); } // namespace doris::cloud::config diff --git a/cloud/src/meta-service/meta_service_helper.h b/cloud/src/meta-service/meta_service_helper.h index 7d626dd1186..cd9ed2f7f1d 100644 --- a/cloud/src/meta-service/meta_service_helper.h +++ b/cloud/src/meta-service/meta_service_helper.h @@ -52,6 +52,46 @@ inline std::string md5(const std::string& str) { return ss.str(); } +/** + * Encrypts all "sk" values in the given debug string with MD5 hashes. + * + * Assumptions: + * - Input string contains one or more occurrences of "sk: " followed by a value in double quotes. + * - An md5() function exists that takes a std::string and returns its MD5 hash as a string. + * + * @param debug_string Input string containing "sk: " fields to be encrypted. + * @return A new string with all "sk" values replaced by their MD5 hashes. + * + * Behavior: + * 1. Searches for all occurrences of "sk: " in the input string. + * 2. For each occurrence, extracts the value between double quotes. + * 3. Replaces the original value with "md5: " followed by its MD5 hash. + * 4. Returns the modified string with all "sk" values encrypted. + */ +inline std::string encryt_sk(std::string debug_string) { + // Start position for searching "sk" fields + size_t pos = 0; + // Iterate through the string and find all occurrences of "sk: " + while ((pos = debug_string.find("sk: ", pos)) != std::string::npos) { + // Find the start and end of the "sk" value (assumed to be within quotes) + // Start after the quote + size_t sk_value_start = debug_string.find('\"', pos) + 1; + // End at the next quote + size_t sk_value_end = debug_string.find('\"', sk_value_start); + + // Extract the "sk" value + std::string sk_value = debug_string.substr(sk_value_start, sk_value_end - sk_value_start); + // Encrypt the "sk" value with MD5 + std::string encrypted_sk = "md5: " + md5(sk_value); + + // Replace the original "sk" value with the encrypted MD5 value + debug_string.replace(sk_value_start, sk_value_end - sk_value_start, encrypted_sk); + // Move the position to the end of the current "sk" field and continue searching + pos = sk_value_end; + } + return debug_string; +} + template <class Request> void begin_rpc(std::string_view func_name, brpc::Controller* ctrl, const Request* req) { if constexpr (std::is_same_v<Request, CreateRowsetRequest>) { @@ -133,29 +173,7 @@ void finish_rpc(std::string_view func_name, brpc::Controller* ctrl, Response* re << " status=" << res->status().ShortDebugString(); } else if constexpr (std::is_same_v<Response, GetObjStoreInfoResponse> || std::is_same_v<Response, GetStageResponse>) { - std::string debug_string = res->DebugString(); - // Start position for searching "sk" fields - size_t pos = 0; - // Iterate through the string and find all occurrences of "sk: " - while ((pos = debug_string.find("sk: ", pos)) != std::string::npos) { - // Find the start and end of the "sk" value (assumed to be within quotes) - // Start after the quote - size_t sk_value_start = debug_string.find('\"', pos) + 1; - // End at the next quote - size_t sk_value_end = debug_string.find('\"', sk_value_start); - - // Extract the "sk" value - std::string sk_value = - debug_string.substr(sk_value_start, sk_value_end - sk_value_start); - // Encrypt the "sk" value with MD5 - std::string encrypted_sk = "md5: " + md5(sk_value); - - // Replace the original "sk" value with the encrypted MD5 value - debug_string.replace(sk_value_start, sk_value_end - sk_value_start, encrypted_sk); - - // Move the position to the end of the current "sk" field and continue searching - pos = sk_value_end; - } + std::string debug_string = encryt_sk(res->DebugString()); TEST_SYNC_POINT_CALLBACK("sk_finish_rpc", &debug_string); LOG(INFO) << "finish " << func_name << " from " << ctrl->remote_side() << " response=" << debug_string; diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 52597fad04b..40c328d8945 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -27,11 +27,13 @@ #include <cstddef> #include <cstdint> #include <deque> +#include <numeric> #include <string> #include <string_view> #include "common/stopwatch.h" #include "meta-service/meta_service.h" +#include "meta-service/meta_service_helper.h" #include "meta-service/meta_service_schema.h" #include "meta-service/txn_kv.h" #include "meta-service/txn_kv_error.h" @@ -533,9 +535,27 @@ int InstanceRecycler::init_storage_vault_accessors() { LOG(WARNING) << "malformed storage vault, unable to deserialize key=" << hex(k); return -1; } + std::string recycler_storage_vault_white_list = accumulate( + config::recycler_storage_vault_white_list.begin(), + config::recycler_storage_vault_white_list.end(), std::string(), + [](std::string a, std::string b) { return a + (a.empty() ? "" : ",") + b; }); + LOG_INFO("config::recycler_storage_vault_white_list") + .tag("", recycler_storage_vault_white_list); + if (!config::recycler_storage_vault_white_list.empty()) { + if (auto it = std::find(config::recycler_storage_vault_white_list.begin(), + config::recycler_storage_vault_white_list.end(), vault.name()); + it == config::recycler_storage_vault_white_list.end()) { + LOG_WARNING( + "failed to init accessor for vault because this vault is not in " + "config::recycler_storage_vault_white_list. ") + .tag(" vault name:", vault.name()) + .tag(" config::recycler_storage_vault_white_list:", + recycler_storage_vault_white_list); + continue; + } + } TEST_SYNC_POINT_CALLBACK("InstanceRecycler::init_storage_vault_accessors.mock_vault", &accessor_map_, &vault); - if (vault.has_hdfs_info()) { auto accessor = std::make_shared<HdfsAccessor>(vault.hdfs_info()); int ret = accessor->init(); @@ -558,17 +578,17 @@ int InstanceRecycler::init_storage_vault_accessors() { } std::shared_ptr<S3Accessor> accessor; - int ret = S3Accessor::create(std::move(*s3_conf), &accessor); + int ret = S3Accessor::create(*s3_conf, &accessor); if (ret != 0) { LOG(WARNING) << "failed to init s3 accessor. instance_id=" << instance_id_ << " resource_id=" << vault.id() << " name=" << vault.name() << " ret=" << ret - << " s3_vault=" << vault.obj_info().ShortDebugString(); + << " s3_vault=" << encryt_sk(vault.obj_info().ShortDebugString()); continue; } LOG(INFO) << "succeed to init s3 accessor. instance_id=" << instance_id_ << " resource_id=" << vault.id() << " name=" << vault.name() << " ret=" << ret - << " s3_vault=" << vault.obj_info().ShortDebugString(); + << " s3_vault=" << encryt_sk(vault.obj_info().ShortDebugString()); accessor_map_.emplace(vault.id(), std::move(accessor)); } } diff --git a/cloud/src/recycler/s3_accessor.cpp b/cloud/src/recycler/s3_accessor.cpp index 0ef150e20d1..5356ddf38af 100644 --- a/cloud/src/recycler/s3_accessor.cpp +++ b/cloud/src/recycler/s3_accessor.cpp @@ -228,10 +228,10 @@ int S3Accessor::create(S3Conf conf, std::shared_ptr<S3Accessor>* accessor) { TEST_SYNC_POINT_RETURN_WITH_VALUE("S3Accessor::init.s3_init_failed", (int)-1); switch (conf.provider) { case S3Conf::GCS: - *accessor = std::make_shared<GcsAccessor>(std::move(conf)); + *accessor = std::make_shared<GcsAccessor>(conf); break; default: - *accessor = std::make_shared<S3Accessor>(std::move(conf)); + *accessor = std::make_shared<S3Accessor>(conf); break; } diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index f1834ad8003..d2e8c50ff4e 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -3616,6 +3616,140 @@ TEST(RecyclerTest, init_all_vault_accessors_failed_test) { EXPECT_EQ(recycler.init(), -2); } +TEST(RecyclerTest, recycler_storage_vault_white_list_test) { + auto* sp = SyncPoint::get_instance(); + std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, [&sp](int*) { + sp->clear_all_call_backs(); + sp->clear_trace(); + sp->disable_processing(); + }); + + auto txn_kv = std::make_shared<MemTxnKv>(); + EXPECT_EQ(txn_kv->init(), 0); + std::unique_ptr<Transaction> txn; + ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + std::string key; + std::string val; + + InstanceKeyInfo key_info {"test_instance"}; + instance_key(key_info, &key); + InstanceInfoPB instance; + instance.set_instance_id("GetObjStoreInfoTestInstance"); + { + ObjectStoreInfoPB obj_info; + StorageVaultPB vault; + obj_info.set_id("id"); + obj_info.set_ak("ak"); + obj_info.set_sk("sk"); + obj_info.set_provider(ObjectStoreInfoPB_Provider_COS); + vault.mutable_obj_info()->MergeFrom(obj_info); + vault.set_name("s3_1"); + vault.set_id("s3_1"); + instance.add_storage_vault_names(vault.name()); + instance.add_resource_ids(vault.id()); + txn->put(storage_vault_key({instance.instance_id(), "1"}), vault.SerializeAsString()); + } + + { + ObjectStoreInfoPB obj_info; + StorageVaultPB vault; + obj_info.set_id("id"); + obj_info.set_ak("ak"); + obj_info.set_sk("sk"); + obj_info.set_provider(ObjectStoreInfoPB_Provider_COS); + vault.mutable_obj_info()->MergeFrom(obj_info); + vault.set_name("s3_2"); + vault.set_id("s3_2"); + instance.add_storage_vault_names(vault.name()); + instance.add_resource_ids(vault.id()); + instance.set_instance_id("GetObjStoreInfoTestInstance"); + txn->put(storage_vault_key({instance.instance_id(), "2"}), vault.SerializeAsString()); + } + + { + HdfsBuildConf hdfs_build_conf; + StorageVaultPB vault; + hdfs_build_conf.set_fs_name("fs_name"); + hdfs_build_conf.set_user("root"); + HdfsVaultInfo hdfs_info; + hdfs_info.set_prefix("root_path"); + hdfs_info.mutable_build_conf()->MergeFrom(hdfs_build_conf); + vault.mutable_hdfs_info()->MergeFrom(hdfs_info); + vault.set_name("hdfs_1"); + vault.set_id("hdfs_1"); + instance.add_storage_vault_names(vault.name()); + instance.add_resource_ids(vault.id()); + instance.set_instance_id("GetObjStoreInfoTestInstance"); + txn->put(storage_vault_key({instance.instance_id(), "3"}), vault.SerializeAsString()); + } + + { + HdfsBuildConf hdfs_build_conf; + StorageVaultPB vault; + hdfs_build_conf.set_fs_name("fs_name"); + hdfs_build_conf.set_user("root"); + HdfsVaultInfo hdfs_info; + hdfs_info.set_prefix("root_path"); + hdfs_info.mutable_build_conf()->MergeFrom(hdfs_build_conf); + vault.mutable_hdfs_info()->MergeFrom(hdfs_info); + vault.set_name("hdfs_2"); + vault.set_id("hdfs_2"); + instance.add_storage_vault_names(vault.name()); + instance.add_resource_ids(vault.id()); + instance.set_instance_id("GetObjStoreInfoTestInstance"); + txn->put(storage_vault_key({instance.instance_id(), "4"}), vault.SerializeAsString()); + } + + auto accessor = std::make_shared<MockAccessor>(); + EXPECT_EQ(accessor->put_file("data/0/test.csv", ""), 0); + sp->set_call_back("HdfsAccessor::init.hdfs_init_failed", [](auto&& args) { + auto* ret = try_any_cast_ret<int>(args); + ret->first = 0; + ret->second = true; + }); + sp->set_call_back( + "InstanceRecycler::init_storage_vault_accessors.mock_vault", [&accessor](auto&& args) { + auto* map = try_any_cast< + std::unordered_map<std::string, std::shared_ptr<StorageVaultAccessor>>*>( + args[0]); + auto* vault = try_any_cast<StorageVaultPB*>(args[1]); + map->emplace(vault->id(), accessor); + }); + sp->enable_processing(); + + val = instance.SerializeAsString(); + txn->put(key, val); + EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + + EXPECT_EQ(accessor->exists("data/0/test.csv"), 0); + + { + config::recycler_storage_vault_white_list = {}; + InstanceRecycler recycler(txn_kv, instance, thread_group, + std::make_shared<TxnLazyCommitter>(txn_kv)); + EXPECT_EQ(recycler.init(), 0); + EXPECT_EQ(recycler.accessor_map_.size(), 4); + } + + { + config::recycler_storage_vault_white_list = {"s3_1", "s3_2", "hdfs_1", "hdfs_2"}; + InstanceRecycler recycler(txn_kv, instance, thread_group, + std::make_shared<TxnLazyCommitter>(txn_kv)); + EXPECT_EQ(recycler.init(), 0); + EXPECT_EQ(recycler.accessor_map_.size(), 4); + } + + { + config::recycler_storage_vault_white_list = {"s3_1", "hdfs_1"}; + InstanceRecycler recycler(txn_kv, instance, thread_group, + std::make_shared<TxnLazyCommitter>(txn_kv)); + EXPECT_EQ(recycler.init(), 0); + EXPECT_EQ(recycler.accessor_map_.size(), 2); + EXPECT_EQ(recycler.accessor_map_.at("s3_1")->exists("data/0/test.csv"), 0); + EXPECT_EQ(recycler.accessor_map_.at("hdfs_1")->exists("data/0/test.csv"), 0); + } +} + TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v1) { auto* sp = SyncPoint::get_instance(); std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, [&sp](int*) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org