This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c8cadba38d8 [fix](recycler) Fix missing provider in S3Conf (#37468) c8cadba38d8 is described below commit c8cadba38d897b2778495ca59b290e34406512c8 Author: plat1ko <platonekos...@gmail.com> AuthorDate: Tue Jul 9 00:31:18 2024 +0800 [fix](recycler) Fix missing provider in S3Conf (#37468) Fix missing provider in S3Conf, which is used to determine which obj client to use. --- cloud/src/recycler/recycler.cpp | 110 +++++++++++++------------------------ cloud/src/recycler/s3_accessor.cpp | 31 ++++++----- cloud/src/recycler/s3_accessor.h | 3 +- 3 files changed, 59 insertions(+), 85 deletions(-) diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 7d3aa5ad794..5887b3759a0 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -2233,49 +2233,35 @@ int InstanceRecycler::init_copy_job_accessor(const std::string& stage_id, } #else // init s3 accessor and add to accessor map - bool found = false; - ObjectStoreInfoPB object_store_info; - StagePB::StageAccessType stage_access_type = StagePB::AKSK; - for (auto& s : instance_info_.stages()) { - if (s.stage_id() == stage_id) { - object_store_info = s.obj_info(); - if (s.has_access_type()) { - stage_access_type = s.access_type(); - } - found = true; - break; - } - } - if (!found) { + auto stage_it = + std::find_if(instance_info_.stages().begin(), instance_info_.stages().end(), + [&stage_id](auto&& stage) { return stage.stage_id() == stage_id; }); + + if (stage_it == instance_info_.stages().end()) { LOG(INFO) << "Recycle nonexisted stage copy jobs. instance_id=" << instance_id_ << ", stage_id=" << stage_id << ", stage_type=" << stage_type; return 1; } + + const auto& object_store_info = stage_it->obj_info(); + auto stage_access_type = stage_it->has_access_type() ? stage_it->access_type() : StagePB::AKSK; + S3Conf s3_conf; if (stage_type == StagePB::EXTERNAL) { - s3_conf.endpoint = object_store_info.endpoint(); - s3_conf.region = object_store_info.region(); - s3_conf.bucket = object_store_info.bucket(); - s3_conf.prefix = object_store_info.prefix(); if (stage_access_type == StagePB::AKSK) { - s3_conf.ak = object_store_info.ak(); - s3_conf.sk = object_store_info.sk(); - if (object_store_info.has_encryption_info()) { - AkSkPair plain_ak_sk_pair; - int ret = decrypt_ak_sk_helper(object_store_info.ak(), object_store_info.sk(), - object_store_info.encryption_info(), - &plain_ak_sk_pair); - if (ret != 0) { - LOG(WARNING) << "fail to decrypt ak sk. instance_id: " << instance_id_ - << " obj_info: " << proto_to_json(object_store_info); - return -1; - } - s3_conf.ak = std::move(plain_ak_sk_pair.first); - s3_conf.sk = std::move(plain_ak_sk_pair.second); + auto conf = S3Conf::from_obj_store_info(object_store_info); + if (!conf) { + return -1; } + + s3_conf = std::move(*conf); } else if (stage_access_type == StagePB::BUCKET_ACL) { - s3_conf.ak = instance_info_.ram_user().ak(); - s3_conf.sk = instance_info_.ram_user().sk(); + auto conf = S3Conf::from_obj_store_info(object_store_info, true /* skip_aksk */); + if (!conf) { + return -1; + } + + s3_conf = std::move(*conf); if (instance_info_.ram_user().has_encryption_info()) { AkSkPair plain_ak_sk_pair; int ret = decrypt_ak_sk_helper( @@ -2288,6 +2274,9 @@ int InstanceRecycler::init_copy_job_accessor(const std::string& stage_id, } s3_conf.ak = std::move(plain_ak_sk_pair.first); s3_conf.sk = std::move(plain_ak_sk_pair.second); + } else { + s3_conf.ak = instance_info_.ram_user().ak(); + s3_conf.sk = instance_info_.ram_user().sk(); } } else { LOG(INFO) << "Unsupported stage access type=" << stage_access_type @@ -2300,24 +2289,14 @@ int InstanceRecycler::init_copy_job_accessor(const std::string& stage_id, LOG(WARNING) << "invalid idx: " << idx; return -1; } - auto& old_obj = instance_info_.obj_info()[idx - 1]; - s3_conf.ak = old_obj.ak(); - s3_conf.sk = old_obj.sk(); - if (old_obj.has_encryption_info()) { - AkSkPair plain_ak_sk_pair; - int ret = decrypt_ak_sk_helper(old_obj.ak(), old_obj.sk(), old_obj.encryption_info(), - &plain_ak_sk_pair); - if (ret != 0) { - LOG(WARNING) << "fail to decrypt ak sk. instance_id: " << instance_id_ - << " obj_info: " << proto_to_json(old_obj); - return -1; - } - s3_conf.ak = std::move(plain_ak_sk_pair.first); - s3_conf.sk = std::move(plain_ak_sk_pair.second); + + const auto& old_obj = instance_info_.obj_info()[idx - 1]; + auto conf = S3Conf::from_obj_store_info(old_obj); + if (!conf) { + return -1; } - s3_conf.endpoint = old_obj.endpoint(); - s3_conf.region = old_obj.region(); - s3_conf.bucket = old_obj.bucket(); + + s3_conf = std::move(*conf); s3_conf.prefix = object_store_info.prefix(); } else { LOG(WARNING) << "unknown stage type " << stage_type; @@ -2468,28 +2447,17 @@ int InstanceRecycler::recycle_expired_stage_objects() { LOG(WARNING) << "invalid idx: " << idx << ", id: " << stage.obj_info().id(); continue; } - auto& old_obj = instance_info_.obj_info()[idx - 1]; - S3Conf s3_conf; - s3_conf.ak = old_obj.ak(); - s3_conf.sk = old_obj.sk(); - if (old_obj.has_encryption_info()) { - AkSkPair plain_ak_sk_pair; - int ret1 = decrypt_ak_sk_helper(old_obj.ak(), old_obj.sk(), old_obj.encryption_info(), - &plain_ak_sk_pair); - if (ret1 != 0) { - LOG(WARNING) << "fail to decrypt ak sk " - << "obj_info:" << proto_to_json(old_obj); - } else { - s3_conf.ak = std::move(plain_ak_sk_pair.first); - s3_conf.sk = std::move(plain_ak_sk_pair.second); - } + + const auto& old_obj = instance_info_.obj_info()[idx - 1]; + auto s3_conf = S3Conf::from_obj_store_info(old_obj); + if (!s3_conf) { + LOG(WARNING) << "failed to init accessor"; + continue; } - s3_conf.endpoint = old_obj.endpoint(); - s3_conf.region = old_obj.region(); - s3_conf.bucket = old_obj.bucket(); - s3_conf.prefix = stage.obj_info().prefix(); + + s3_conf->prefix = stage.obj_info().prefix(); std::shared_ptr<S3Accessor> accessor; - int ret1 = S3Accessor::create(std::move(s3_conf), &accessor); + int ret1 = S3Accessor::create(std::move(*s3_conf), &accessor); if (ret1 != 0) { LOG(WARNING) << "failed to init s3 accessor ret=" << ret1; ret = -1; diff --git a/cloud/src/recycler/s3_accessor.cpp b/cloud/src/recycler/s3_accessor.cpp index 5bf37a52199..1f43f6c6b0e 100644 --- a/cloud/src/recycler/s3_accessor.cpp +++ b/cloud/src/recycler/s3_accessor.cpp @@ -153,7 +153,8 @@ private: size_t prefix_length_; }; -std::optional<S3Conf> S3Conf::from_obj_store_info(const ObjectStoreInfoPB& obj_info) { +std::optional<S3Conf> S3Conf::from_obj_store_info(const ObjectStoreInfoPB& obj_info, + bool skip_aksk) { S3Conf s3_conf; switch (obj_info.provider()) { @@ -175,20 +176,22 @@ std::optional<S3Conf> S3Conf::from_obj_store_info(const ObjectStoreInfoPB& obj_i return std::nullopt; } - if (obj_info.has_encryption_info()) { - AkSkPair plain_ak_sk_pair; - int ret = decrypt_ak_sk_helper(obj_info.ak(), obj_info.sk(), obj_info.encryption_info(), - &plain_ak_sk_pair); - if (ret != 0) { - LOG_WARNING("fail to decrypt ak sk").tag("obj_info", proto_to_json(obj_info)); - return std::nullopt; + if (!skip_aksk) { + if (obj_info.has_encryption_info()) { + AkSkPair plain_ak_sk_pair; + int ret = decrypt_ak_sk_helper(obj_info.ak(), obj_info.sk(), obj_info.encryption_info(), + &plain_ak_sk_pair); + if (ret != 0) { + LOG_WARNING("fail to decrypt ak sk").tag("obj_info", proto_to_json(obj_info)); + return std::nullopt; + } else { + s3_conf.ak = std::move(plain_ak_sk_pair.first); + s3_conf.sk = std::move(plain_ak_sk_pair.second); + } } else { - s3_conf.ak = std::move(plain_ak_sk_pair.first); - s3_conf.sk = std::move(plain_ak_sk_pair.second); + s3_conf.ak = obj_info.ak(); + s3_conf.sk = obj_info.sk(); } - } else { - s3_conf.ak = obj_info.ak(); - s3_conf.sk = obj_info.sk(); } s3_conf.endpoint = obj_info.endpoint(); @@ -234,6 +237,8 @@ int S3Accessor::init() { conf_.ak, conf_.bucket); auto container_client = std::make_shared<Azure::Storage::Blobs::BlobContainerClient>(uri_, cred); + // uri format for debug: ${scheme}://${ak}.blob.core.windows.net/${bucket}/${prefix} + uri_ = uri_ + '/' + conf_.prefix; obj_client_ = std::make_shared<AzureObjClient>(std::move(container_client)); return 0; } diff --git a/cloud/src/recycler/s3_accessor.h b/cloud/src/recycler/s3_accessor.h index 5d50e4abab9..8e9b53b4392 100644 --- a/cloud/src/recycler/s3_accessor.h +++ b/cloud/src/recycler/s3_accessor.h @@ -50,7 +50,8 @@ struct S3Conf { Provider provider; - static std::optional<S3Conf> from_obj_store_info(const ObjectStoreInfoPB& obj_info); + static std::optional<S3Conf> from_obj_store_info(const ObjectStoreInfoPB& obj_info, + bool skip_aksk = false); }; class S3Accessor : public StorageVaultAccessor { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org