platoneko commented on code in PR #35447: URL: https://github.com/apache/doris/pull/35447#discussion_r1620561820
########## be/src/io/fs/obj_storage_client.h: ########## @@ -46,14 +46,25 @@ struct ObjectStoragePathOptions { struct ObjectCompleteMultiParts {}; +// We only store error code along with err_msg instead of Status to unify BE and recycler's error handle logic struct ObjectStorageResponse { - Status status = Status::OK(); + ObjectStorageResponse(int code = 0, std::string msg = "") + : code(code), err_msg(std::move(msg)) {} + ObjectStorageResponse(std::pair<int, std::string> msg_pair) + : code(msg_pair.first), err_msg(std::move(msg_pair.second)) {} + ObjectStorageResponse(Status st) : ObjectStorageResponse(st.retrieve_error_msg()) {} + int code {0}; + std::string err_msg = std::string(); Review Comment: ```suggestion std::string err_msg; ``` ########## be/src/io/fs/s3_obj_storage_client.cpp: ########## Review Comment: 在 BE 中 FileInfo 中的 file_name 不应该包含 list 使用的 prefix 那段 e.g.: prefix = "doris_s3_vault/data/114115/", key = "doris_s3_vault/data/114115/abc_0.dat", file_name = "abc_0.dat" prefix = "doris_s3_vault/data/100/114115/", key = "doris_s3_vault/data/100/114115/abc/0.dat", file_name = "abc/0.dat" ########## cloud/src/recycler/s3_obj_client.cpp: ########## @@ -0,0 +1,372 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "recycler/s3_obj_client.h" + +#include <aws/s3/S3Client.h> +#include <aws/s3/model/DeleteObjectRequest.h> +#include <aws/s3/model/DeleteObjectsRequest.h> +#include <aws/s3/model/GetBucketLifecycleConfigurationRequest.h> +#include <aws/s3/model/GetBucketVersioningRequest.h> +#include <aws/s3/model/HeadObjectRequest.h> +#include <aws/s3/model/ListObjectsV2Request.h> +#include <aws/s3/model/PutObjectRequest.h> + +#include "common/logging.h" +#include "common/sync_point.h" + +namespace doris::cloud { + +#ifndef UNIT_TEST +#define HELPER_MACRO(ret, req, point_name) +#else +#define HELPER_MACRO(ret, req, point_name) \ + do { \ + std::pair p {&ret, &req}; \ + [[maybe_unused]] auto ret_pair = [&p]() mutable { \ + TEST_SYNC_POINT_RETURN_WITH_VALUE(point_name, &p); \ + return p; \ + }(); \ + return ret; \ + } while (false); +#endif +#define SYNC_POINT_HOOK_RETURN_VALUE(expr, request, point_name) \ + [&]() -> decltype(auto) { \ + using T = decltype((expr)); \ + [[maybe_unused]] T t; \ + HELPER_MACRO(t, request, point_name) \ + return (expr); \ + }() + +ObjectStorageResponse S3ObjClient::put_object(const ObjectStoragePathOptions& opts, + std::string_view stream) { + Aws::S3::Model::PutObjectRequest request; + request.WithBucket(opts.bucket).WithKey(opts.key); + auto input = Aws::MakeShared<Aws::StringStream>("S3Accessor"); + *input << stream; + request.SetBody(input); + auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->PutObject(request), + std::ref(request).get(), "s3_client::put_object"); + if (!outcome.IsSuccess()) { + LOG_WARNING("failed to put object") + .tag("endpoint", opts.endpoint) + .tag("bucket", opts.bucket) + .tag("key", opts.key) + .tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode())) + .tag("error", outcome.GetError().GetMessage()); + return -1; + } + return 0; +} +ObjectStorageResponse S3ObjClient::head_object(const ObjectStoragePathOptions& opts) { + Aws::S3::Model::HeadObjectRequest request; + request.WithBucket(opts.bucket).WithKey(opts.key); + auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->HeadObject(request), + std::ref(request).get(), "s3_client::head_object"); + if (outcome.IsSuccess()) { + return 0; + } else if (outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) { + return 1; + } else { + LOG_WARNING("failed to head object") + .tag("endpoint", opts.endpoint) + .tag("bucket", opts.bucket) + .tag("key", opts.key) + .tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode())) + .tag("error", outcome.GetError().GetMessage()); + return -1; + } +} +ObjectStorageResponse S3ObjClient::list_objects(const ObjectStoragePathOptions& opts, + std::vector<ObjectMeta>* files) { + Aws::S3::Model::ListObjectsV2Request request; + request.WithBucket(opts.bucket).WithPrefix(opts.prefix); + + bool is_truncated = false; + do { + auto outcome = + SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->ListObjectsV2(request), + std::ref(request).get(), "s3_client::list_objects_v2"); + if (!outcome.IsSuccess()) { + LOG_WARNING("failed to list objects") + .tag("endpoint", opts.endpoint) + .tag("bucket", opts.bucket) + .tag("prefix", opts.prefix) + .tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode())) + .tag("error", outcome.GetError().GetMessage()); + return -1; + } + const auto& result = outcome.GetResult(); + VLOG_DEBUG << "get " << result.GetContents().size() << " objects"; + for (const auto& obj : result.GetContents()) { + files->push_back({obj.GetKey().substr(opts.prefix.size() + 1), obj.GetSize(), Review Comment: 在 `ObjStoreAccessor` 中 `ObjectMeta` 的 `path` 需要存除去 Accessor prefix 的那段,而不是除去传入的 prefix 的段。e.g. accessor.prefix = "doris_s3_vault", opts.preifx = "doris_s3_vault/data/114115/" ########## be/src/io/fs/obj_storage_client.h: ########## @@ -46,14 +46,25 @@ struct ObjectStoragePathOptions { struct ObjectCompleteMultiParts {}; +// We only store error code along with err_msg instead of Status to unify BE and recycler's error handle logic struct ObjectStorageResponse { Review Comment: 考虑定义个 `struct ObjectStorageError` 并定义一套 ErrorCode,然后在 BE 和 Recycler 定义从 `ObjectStorageError` 到各自错误状态的转换函数 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org