This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 5b31ceae386 [feature-wip](Cloud) Add azure obj client into recycler (#35849) 5b31ceae386 is described below commit 5b31ceae3869ebf086232473fe12620b19297108 Author: AlexYue <yj976240...@gmail.com> AuthorDate: Wed Jun 5 14:16:17 2024 +0800 [feature-wip](Cloud) Add azure obj client into recycler (#35849) As one subsequent pr of #35307, this pr will link azure into recycler, and implements the corresponding interface of ObjStorageClient for Azure. --- cloud/cmake/thirdparty.cmake | 6 + cloud/src/recycler/azure_obj_client.cpp | 202 +++++++++++++++++++++ .../{s3_obj_client.h => azure_obj_client.h} | 19 +- cloud/src/recycler/obj_store_accessor.h | 10 +- cloud/src/recycler/s3_accessor.cpp | 17 +- cloud/src/recycler/s3_accessor.h | 2 +- cloud/src/recycler/s3_obj_client.h | 2 +- 7 files changed, 243 insertions(+), 15 deletions(-) diff --git a/cloud/cmake/thirdparty.cmake b/cloud/cmake/thirdparty.cmake index 0e148896bfc..bacd7d25b3d 100644 --- a/cloud/cmake/thirdparty.cmake +++ b/cloud/cmake/thirdparty.cmake @@ -102,6 +102,12 @@ add_thirdparty(lzma LIB64) add_thirdparty(idn LIB64) add_thirdparty(gsasl) # end krb5 libs +# begin azure libs +add_thirdparty(azure-core) +add_thirdparty(azure-identity) +add_thirdparty(azure-storage-blobs) +add_thirdparty(azure-storage-common) +# end azure libs add_thirdparty(gtest NOTADD) add_thirdparty(gtest_main NOTADD) diff --git a/cloud/src/recycler/azure_obj_client.cpp b/cloud/src/recycler/azure_obj_client.cpp new file mode 100644 index 00000000000..179ebd980fa --- /dev/null +++ b/cloud/src/recycler/azure_obj_client.cpp @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "recycler/azure_obj_client.h" + +#include <fmt/core.h> +#include <glog/logging.h> + +#include <algorithm> +#include <azure/core/io/body_stream.hpp> +#include <azure/storage/blobs.hpp> +#include <azure/storage/blobs/blob_client.hpp> +#include <azure/storage/blobs/blob_container_client.hpp> +#include <azure/storage/blobs/rest_client.hpp> +#include <azure/storage/common/storage_credential.hpp> +#include <azure/storage/common/storage_exception.hpp> +#include <iterator> + +#include "recycler/obj_store_accessor.h" + +namespace doris::cloud { + +constexpr size_t BlobBatchMaxOperations = 256; + +template <typename Func> +ObjectStorageResponse do_azure_client_call(Func f) { + try { + f(); + } catch (Azure::Storage::StorageException& e) { + return {-1, fmt::format("Azure request failed because {}, http code {}, request id {}", + e.Message, static_cast<int>(e.StatusCode), e.RequestId)}; + } + return {}; +} + +ObjectStorageResponse AzureObjClient::put_object(const ObjectStoragePathOptions& opts, + std::string_view stream) { + auto client = _client->GetBlockBlobClient(opts.key); + return do_azure_client_call([&]() { + client.UploadFrom(reinterpret_cast<const uint8_t*>(stream.data()), stream.size()); + }); +} + +ObjectStorageResponse AzureObjClient::head_object(const ObjectStoragePathOptions& opts) { + try { + Azure::Storage::Blobs::Models::BlobProperties properties = + _client->GetBlockBlobClient(opts.key).GetProperties().Value; + return {}; + } catch (Azure::Storage::StorageException& e) { + if (e.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) { + return {1}; + } + return {-1, fmt::format("Failed to head azure blob due to {}, http code {}, request id {}", + e.Message, static_cast<int>(e.StatusCode), e.RequestId)}; + } +} + +ObjectStorageResponse AzureObjClient::list_objects(const ObjectStoragePathOptions& opts, + std::vector<ObjectMeta>* files) { + auto get_object_meta = [&](auto&& resp) { + std::ranges::transform( + resp.Blobs, std::back_inserter(*files), [](auto&& blob_item) -> ObjectMeta { + return {.path = std::move(blob_item.Name), + .size = blob_item.BlobSize, + .last_modify_second = + blob_item.Details.LastModified.time_since_epoch().count()}; + }); + }; + return do_azure_client_call([&]() { + Azure::Storage::Blobs::ListBlobsOptions list_opts; + list_opts.Prefix = opts.prefix; + auto resp = _client->ListBlobs(list_opts); + get_object_meta(resp); + while (!resp.NextPageToken->empty()) { + list_opts.ContinuationToken = resp.NextPageToken; + resp = _client->ListBlobs(list_opts); + get_object_meta(resp); + } + }); +} + +// As Azure's doc said, the batch size is 256 +// You can find out the num in https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id +// > Each batch request supports a maximum of 256 subrequests. +ObjectStorageResponse AzureObjClient::delete_objects(const ObjectStoragePathOptions& opts, + std::vector<std::string> objs) { + // TODO(ByteYue) : use range to adate this code when compiler is ready + // auto chunkedView = objs | std::views::chunk(BlobBatchMaxOperations); + auto begin = std::begin(objs); + auto end = std::end(objs); + + while (begin != end) { + auto batch = _client->CreateBatch(); + auto chunkEnd = begin; + std::advance(chunkEnd, std::min(BlobBatchMaxOperations, + static_cast<size_t>(std::distance(begin, end)))); + for (auto it = begin; it != chunkEnd; ++it) { + batch.DeleteBlob(*it); + } + begin = chunkEnd; + auto resp = do_azure_client_call([&]() { _client->SubmitBatch(batch); }); + if (resp.ret != 0) { + return resp; + } + } + return {}; +} + +ObjectStorageResponse AzureObjClient::delete_object(const ObjectStoragePathOptions& opts) { + return do_azure_client_call([&]() { _client->DeleteBlob(opts.key); }); +} + +ObjectStorageResponse AzureObjClient::delete_objects_recursively( + const ObjectStoragePathOptions& opts) { + Azure::Storage::Blobs::ListBlobsOptions list_opts; + list_opts.Prefix = opts.prefix; + list_opts.PageSizeHint = BlobBatchMaxOperations; + auto resp = _client->ListBlobs(list_opts); + auto batch = _client->CreateBatch(); + for (auto&& blob_item : resp.Blobs) { + batch.DeleteBlob(blob_item.Name); + } + auto response = do_azure_client_call([&]() { _client->SubmitBatch(batch); }); + if (response.ret != 0) { + return response; + } + while (!resp.NextPageToken->empty()) { + batch = _client->CreateBatch(); + list_opts.ContinuationToken = resp.NextPageToken; + resp = _client->ListBlobs(list_opts); + for (auto&& blob_item : resp.Blobs) { + batch.DeleteBlob(blob_item.Name); + } + auto response = do_azure_client_call([&]() { _client->SubmitBatch(batch); }); + if (response.ret != 0) { + return response; + } + } + return {}; +} + +ObjectStorageResponse AzureObjClient::delete_expired(const ObjectStorageDeleteExpiredOptions& opts, + int64_t expired_time) { + Azure::Storage::Blobs::ListBlobsOptions list_opts; + list_opts.Prefix = opts.path_opts.prefix; + list_opts.PageSizeHint = BlobBatchMaxOperations; + auto resp = _client->ListBlobs(list_opts); + auto batch = _client->CreateBatch(); + for (auto&& blob_item : resp.Blobs) { + batch.DeleteBlob(blob_item.Name); + } + auto response = do_azure_client_call([&]() { _client->SubmitBatch(batch); }); + if (response.ret != 0) { + return response; + } + while (!resp.NextPageToken->empty()) { + batch = _client->CreateBatch(); + list_opts.ContinuationToken = resp.NextPageToken; + resp = _client->ListBlobs(list_opts); + for (auto&& blob_item : resp.Blobs) { + if (blob_item.Details.LastModified.time_since_epoch().count() < expired_time) { + batch.DeleteBlob(blob_item.Name); + } + } + auto response = do_azure_client_call([&]() { _client->SubmitBatch(batch); }); + if (response.ret != 0) { + return response; + } + } + return {}; +} + +ObjectStorageResponse AzureObjClient::get_life_cycle(const ObjectStoragePathOptions& opts, + int64_t* expiration_days) { + return {-1}; +} + +ObjectStorageResponse AzureObjClient::check_versioning(const ObjectStoragePathOptions& opts) { + return {-1}; +} + +const std::shared_ptr<Aws::S3::S3Client>& AzureObjClient::s3_client() { + CHECK(true) << "Currently this is unreachable"; + // TODO(ByteYue): use std::unreachable() instead when compiler supports it + __builtin_unreachable(); +} + +} // namespace doris::cloud \ No newline at end of file diff --git a/cloud/src/recycler/s3_obj_client.h b/cloud/src/recycler/azure_obj_client.h similarity index 80% copy from cloud/src/recycler/s3_obj_client.h copy to cloud/src/recycler/azure_obj_client.h index 891474b5289..79b129ed297 100644 --- a/cloud/src/recycler/s3_obj_client.h +++ b/cloud/src/recycler/azure_obj_client.h @@ -21,16 +21,16 @@ #include "recycler/obj_store_accessor.h" -namespace Aws::S3 { -class S3Client; -} // namespace Aws::S3 +namespace Azure::Storage::Blobs { +class BlobContainerClient; +} // namespace Azure::Storage::Blobs namespace doris::cloud { - -class S3ObjClient : public ObjStorageClient { +class AzureObjClient : public ObjStorageClient { public: - S3ObjClient(std::shared_ptr<Aws::S3::S3Client> client) : s3_client_(std::move(client)) {} - ~S3ObjClient() override = default; + AzureObjClient(std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> client) + : _client(std::move(client)) {} + ~AzureObjClient() override = default; ObjectStorageResponse put_object(const ObjectStoragePathOptions& opts, std::string_view stream) override; @@ -48,10 +48,9 @@ public: ObjectStorageResponse check_versioning(const ObjectStoragePathOptions& opts) override; - const std::shared_ptr<Aws::S3::S3Client>& s3_client() { return s3_client_; } + const std::shared_ptr<Aws::S3::S3Client>& s3_client() override; private: - std::shared_ptr<Aws::S3::S3Client> s3_client_; + std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> _client; }; - } // namespace doris::cloud \ No newline at end of file diff --git a/cloud/src/recycler/obj_store_accessor.h b/cloud/src/recycler/obj_store_accessor.h index a29266133ca..0238a317880 100644 --- a/cloud/src/recycler/obj_store_accessor.h +++ b/cloud/src/recycler/obj_store_accessor.h @@ -18,9 +18,14 @@ #pragma once #include <functional> +#include <memory> #include <string> #include <vector> +namespace Aws::S3 { +class S3Client; +} // namespace Aws::S3 + namespace doris::cloud { struct ObjectMeta { @@ -32,6 +37,7 @@ struct ObjectMeta { enum class AccessorType { S3, HDFS, + AZURE, }; // TODO(plat1ko): Redesign `Accessor` interface to adapt to storage vaults other than S3 style @@ -86,7 +92,7 @@ struct ObjectStorageDeleteExpiredOptions { struct ObjectCompleteMultiParts {}; struct ObjectStorageResponse { - ObjectStorageResponse(int r, std::string msg = "") : ret(r), error_msg(std::move(msg)) {} + ObjectStorageResponse(int r = 0, std::string msg = "") : ret(r), error_msg(std::move(msg)) {} // clang-format off int ret {0}; // To unify the error handle logic with BE, we'd better use the same error code as BE // clang-format on @@ -124,6 +130,8 @@ public: int64_t* expiration_days) = 0; // Check if the objects' versioning is on or off virtual ObjectStorageResponse check_versioning(const ObjectStoragePathOptions& opts) = 0; + + virtual const std::shared_ptr<Aws::S3::S3Client>& s3_client() = 0; }; } // namespace doris::cloud diff --git a/cloud/src/recycler/s3_accessor.cpp b/cloud/src/recycler/s3_accessor.cpp index 04f642f3831..43341ab63d3 100644 --- a/cloud/src/recycler/s3_accessor.cpp +++ b/cloud/src/recycler/s3_accessor.cpp @@ -31,14 +31,16 @@ #include <aws/s3/model/PutObjectRequest.h> #include <algorithm> +#include <azure/storage/blobs/blob_container_client.hpp> +#include <azure/storage/common/storage_credential.hpp> #include <execution> -#include <type_traits> +#include <memory> #include <utility> #include "common/config.h" #include "common/logging.h" -#include "common/sync_point.h" #include "rate-limiter/s3_rate_limiter.h" +#include "recycler/azure_obj_client.h" #include "recycler/obj_store_accessor.h" #include "recycler/s3_obj_client.h" @@ -138,6 +140,17 @@ std::string S3Accessor::get_relative_path(const std::string& key) const { int S3Accessor::init() { static S3Environment s3_env; + if (type() == AccessorType::AZURE) { + auto cred = + std::make_shared<Azure::Storage::StorageSharedKeyCredential>(conf_.ak, conf_.sk); + const std::string container_name = conf_.bucket; + const std::string uri = + fmt::format("http://{}.blob.core.windows.net/{}", conf_.ak, container_name); + auto container_client = + std::make_shared<Azure::Storage::Blobs::BlobContainerClient>(uri, cred); + obj_client_ = std::make_shared<AzureObjClient>(std::move(container_client)); + return 0; + } Aws::Auth::AWSCredentials aws_cred(conf_.ak, conf_.sk); Aws::Client::ClientConfiguration aws_config; aws_config.endpointOverride = conf_.endpoint; diff --git a/cloud/src/recycler/s3_accessor.h b/cloud/src/recycler/s3_accessor.h index 5221f90cafc..3ebaf89cff3 100644 --- a/cloud/src/recycler/s3_accessor.h +++ b/cloud/src/recycler/s3_accessor.h @@ -93,7 +93,7 @@ private: private: S3Conf conf_; std::string path_; - std::shared_ptr<S3ObjClient> obj_client_; + std::shared_ptr<ObjStorageClient> obj_client_; }; class GcsAccessor final : public S3Accessor { diff --git a/cloud/src/recycler/s3_obj_client.h b/cloud/src/recycler/s3_obj_client.h index 891474b5289..4f6ef9eab08 100644 --- a/cloud/src/recycler/s3_obj_client.h +++ b/cloud/src/recycler/s3_obj_client.h @@ -48,7 +48,7 @@ public: ObjectStorageResponse check_versioning(const ObjectStoragePathOptions& opts) override; - const std::shared_ptr<Aws::S3::S3Client>& s3_client() { return s3_client_; } + const std::shared_ptr<Aws::S3::S3Client>& s3_client() override { return s3_client_; } private: std::shared_ptr<Aws::S3::S3Client> s3_client_; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org