This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c8cadba38d8 [fix](recycler) Fix missing provider in S3Conf (#37468)
c8cadba38d8 is described below

commit c8cadba38d897b2778495ca59b290e34406512c8
Author: plat1ko <platonekos...@gmail.com>
AuthorDate: Tue Jul 9 00:31:18 2024 +0800

    [fix](recycler) Fix missing provider in S3Conf (#37468)
    
    Fix missing provider in S3Conf, which is used to determine which obj
    client to use.
---
 cloud/src/recycler/recycler.cpp    | 110 +++++++++++++------------------------
 cloud/src/recycler/s3_accessor.cpp |  31 ++++++-----
 cloud/src/recycler/s3_accessor.h   |   3 +-
 3 files changed, 59 insertions(+), 85 deletions(-)

diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 7d3aa5ad794..5887b3759a0 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -2233,49 +2233,35 @@ int InstanceRecycler::init_copy_job_accessor(const 
std::string& stage_id,
     }
 #else
     // init s3 accessor and add to accessor map
-    bool found = false;
-    ObjectStoreInfoPB object_store_info;
-    StagePB::StageAccessType stage_access_type = StagePB::AKSK;
-    for (auto& s : instance_info_.stages()) {
-        if (s.stage_id() == stage_id) {
-            object_store_info = s.obj_info();
-            if (s.has_access_type()) {
-                stage_access_type = s.access_type();
-            }
-            found = true;
-            break;
-        }
-    }
-    if (!found) {
+    auto stage_it =
+            std::find_if(instance_info_.stages().begin(), 
instance_info_.stages().end(),
+                         [&stage_id](auto&& stage) { return stage.stage_id() 
== stage_id; });
+
+    if (stage_it == instance_info_.stages().end()) {
         LOG(INFO) << "Recycle nonexisted stage copy jobs. instance_id=" << 
instance_id_
                   << ", stage_id=" << stage_id << ", stage_type=" << 
stage_type;
         return 1;
     }
+
+    const auto& object_store_info = stage_it->obj_info();
+    auto stage_access_type = stage_it->has_access_type() ? 
stage_it->access_type() : StagePB::AKSK;
+
     S3Conf s3_conf;
     if (stage_type == StagePB::EXTERNAL) {
-        s3_conf.endpoint = object_store_info.endpoint();
-        s3_conf.region = object_store_info.region();
-        s3_conf.bucket = object_store_info.bucket();
-        s3_conf.prefix = object_store_info.prefix();
         if (stage_access_type == StagePB::AKSK) {
-            s3_conf.ak = object_store_info.ak();
-            s3_conf.sk = object_store_info.sk();
-            if (object_store_info.has_encryption_info()) {
-                AkSkPair plain_ak_sk_pair;
-                int ret = decrypt_ak_sk_helper(object_store_info.ak(), 
object_store_info.sk(),
-                                               
object_store_info.encryption_info(),
-                                               &plain_ak_sk_pair);
-                if (ret != 0) {
-                    LOG(WARNING) << "fail to decrypt ak sk. instance_id: " << 
instance_id_
-                                 << " obj_info: " << 
proto_to_json(object_store_info);
-                    return -1;
-                }
-                s3_conf.ak = std::move(plain_ak_sk_pair.first);
-                s3_conf.sk = std::move(plain_ak_sk_pair.second);
+            auto conf = S3Conf::from_obj_store_info(object_store_info);
+            if (!conf) {
+                return -1;
             }
+
+            s3_conf = std::move(*conf);
         } else if (stage_access_type == StagePB::BUCKET_ACL) {
-            s3_conf.ak = instance_info_.ram_user().ak();
-            s3_conf.sk = instance_info_.ram_user().sk();
+            auto conf = S3Conf::from_obj_store_info(object_store_info, true /* 
skip_aksk */);
+            if (!conf) {
+                return -1;
+            }
+
+            s3_conf = std::move(*conf);
             if (instance_info_.ram_user().has_encryption_info()) {
                 AkSkPair plain_ak_sk_pair;
                 int ret = decrypt_ak_sk_helper(
@@ -2288,6 +2274,9 @@ int InstanceRecycler::init_copy_job_accessor(const 
std::string& stage_id,
                 }
                 s3_conf.ak = std::move(plain_ak_sk_pair.first);
                 s3_conf.sk = std::move(plain_ak_sk_pair.second);
+            } else {
+                s3_conf.ak = instance_info_.ram_user().ak();
+                s3_conf.sk = instance_info_.ram_user().sk();
             }
         } else {
             LOG(INFO) << "Unsupported stage access type=" << stage_access_type
@@ -2300,24 +2289,14 @@ int InstanceRecycler::init_copy_job_accessor(const 
std::string& stage_id,
             LOG(WARNING) << "invalid idx: " << idx;
             return -1;
         }
-        auto& old_obj = instance_info_.obj_info()[idx - 1];
-        s3_conf.ak = old_obj.ak();
-        s3_conf.sk = old_obj.sk();
-        if (old_obj.has_encryption_info()) {
-            AkSkPair plain_ak_sk_pair;
-            int ret = decrypt_ak_sk_helper(old_obj.ak(), old_obj.sk(), 
old_obj.encryption_info(),
-                                           &plain_ak_sk_pair);
-            if (ret != 0) {
-                LOG(WARNING) << "fail to decrypt ak sk. instance_id: " << 
instance_id_
-                             << " obj_info: " << proto_to_json(old_obj);
-                return -1;
-            }
-            s3_conf.ak = std::move(plain_ak_sk_pair.first);
-            s3_conf.sk = std::move(plain_ak_sk_pair.second);
+
+        const auto& old_obj = instance_info_.obj_info()[idx - 1];
+        auto conf = S3Conf::from_obj_store_info(old_obj);
+        if (!conf) {
+            return -1;
         }
-        s3_conf.endpoint = old_obj.endpoint();
-        s3_conf.region = old_obj.region();
-        s3_conf.bucket = old_obj.bucket();
+
+        s3_conf = std::move(*conf);
         s3_conf.prefix = object_store_info.prefix();
     } else {
         LOG(WARNING) << "unknown stage type " << stage_type;
@@ -2468,28 +2447,17 @@ int InstanceRecycler::recycle_expired_stage_objects() {
             LOG(WARNING) << "invalid idx: " << idx << ", id: " << 
stage.obj_info().id();
             continue;
         }
-        auto& old_obj = instance_info_.obj_info()[idx - 1];
-        S3Conf s3_conf;
-        s3_conf.ak = old_obj.ak();
-        s3_conf.sk = old_obj.sk();
-        if (old_obj.has_encryption_info()) {
-            AkSkPair plain_ak_sk_pair;
-            int ret1 = decrypt_ak_sk_helper(old_obj.ak(), old_obj.sk(), 
old_obj.encryption_info(),
-                                            &plain_ak_sk_pair);
-            if (ret1 != 0) {
-                LOG(WARNING) << "fail to decrypt ak sk "
-                             << "obj_info:" << proto_to_json(old_obj);
-            } else {
-                s3_conf.ak = std::move(plain_ak_sk_pair.first);
-                s3_conf.sk = std::move(plain_ak_sk_pair.second);
-            }
+
+        const auto& old_obj = instance_info_.obj_info()[idx - 1];
+        auto s3_conf = S3Conf::from_obj_store_info(old_obj);
+        if (!s3_conf) {
+            LOG(WARNING) << "failed to init accessor";
+            continue;
         }
-        s3_conf.endpoint = old_obj.endpoint();
-        s3_conf.region = old_obj.region();
-        s3_conf.bucket = old_obj.bucket();
-        s3_conf.prefix = stage.obj_info().prefix();
+
+        s3_conf->prefix = stage.obj_info().prefix();
         std::shared_ptr<S3Accessor> accessor;
-        int ret1 = S3Accessor::create(std::move(s3_conf), &accessor);
+        int ret1 = S3Accessor::create(std::move(*s3_conf), &accessor);
         if (ret1 != 0) {
             LOG(WARNING) << "failed to init s3 accessor ret=" << ret1;
             ret = -1;
diff --git a/cloud/src/recycler/s3_accessor.cpp 
b/cloud/src/recycler/s3_accessor.cpp
index 5bf37a52199..1f43f6c6b0e 100644
--- a/cloud/src/recycler/s3_accessor.cpp
+++ b/cloud/src/recycler/s3_accessor.cpp
@@ -153,7 +153,8 @@ private:
     size_t prefix_length_;
 };
 
-std::optional<S3Conf> S3Conf::from_obj_store_info(const ObjectStoreInfoPB& 
obj_info) {
+std::optional<S3Conf> S3Conf::from_obj_store_info(const ObjectStoreInfoPB& 
obj_info,
+                                                  bool skip_aksk) {
     S3Conf s3_conf;
 
     switch (obj_info.provider()) {
@@ -175,20 +176,22 @@ std::optional<S3Conf> S3Conf::from_obj_store_info(const 
ObjectStoreInfoPB& obj_i
         return std::nullopt;
     }
 
-    if (obj_info.has_encryption_info()) {
-        AkSkPair plain_ak_sk_pair;
-        int ret = decrypt_ak_sk_helper(obj_info.ak(), obj_info.sk(), 
obj_info.encryption_info(),
-                                       &plain_ak_sk_pair);
-        if (ret != 0) {
-            LOG_WARNING("fail to decrypt ak sk").tag("obj_info", 
proto_to_json(obj_info));
-            return std::nullopt;
+    if (!skip_aksk) {
+        if (obj_info.has_encryption_info()) {
+            AkSkPair plain_ak_sk_pair;
+            int ret = decrypt_ak_sk_helper(obj_info.ak(), obj_info.sk(), 
obj_info.encryption_info(),
+                                           &plain_ak_sk_pair);
+            if (ret != 0) {
+                LOG_WARNING("fail to decrypt ak sk").tag("obj_info", 
proto_to_json(obj_info));
+                return std::nullopt;
+            } else {
+                s3_conf.ak = std::move(plain_ak_sk_pair.first);
+                s3_conf.sk = std::move(plain_ak_sk_pair.second);
+            }
         } else {
-            s3_conf.ak = std::move(plain_ak_sk_pair.first);
-            s3_conf.sk = std::move(plain_ak_sk_pair.second);
+            s3_conf.ak = obj_info.ak();
+            s3_conf.sk = obj_info.sk();
         }
-    } else {
-        s3_conf.ak = obj_info.ak();
-        s3_conf.sk = obj_info.sk();
     }
 
     s3_conf.endpoint = obj_info.endpoint();
@@ -234,6 +237,8 @@ int S3Accessor::init() {
                            conf_.ak, conf_.bucket);
         auto container_client =
                 
std::make_shared<Azure::Storage::Blobs::BlobContainerClient>(uri_, cred);
+        // uri format for debug: 
${scheme}://${ak}.blob.core.windows.net/${bucket}/${prefix}
+        uri_ = uri_ + '/' + conf_.prefix;
         obj_client_ = 
std::make_shared<AzureObjClient>(std::move(container_client));
         return 0;
     }
diff --git a/cloud/src/recycler/s3_accessor.h b/cloud/src/recycler/s3_accessor.h
index 5d50e4abab9..8e9b53b4392 100644
--- a/cloud/src/recycler/s3_accessor.h
+++ b/cloud/src/recycler/s3_accessor.h
@@ -50,7 +50,8 @@ struct S3Conf {
 
     Provider provider;
 
-    static std::optional<S3Conf> from_obj_store_info(const ObjectStoreInfoPB& 
obj_info);
+    static std::optional<S3Conf> from_obj_store_info(const ObjectStoreInfoPB& 
obj_info,
+                                                     bool skip_aksk = false);
 };
 
 class S3Accessor : public StorageVaultAccessor {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to