This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b31ceae386 [feature-wip](Cloud) Add azure obj client into recycler 
(#35849)
5b31ceae386 is described below

commit 5b31ceae3869ebf086232473fe12620b19297108
Author: AlexYue <yj976240...@gmail.com>
AuthorDate: Wed Jun 5 14:16:17 2024 +0800

    [feature-wip](Cloud) Add azure obj client into recycler (#35849)
    
    
    As one subsequent pr of #35307, this pr will link azure into recycler,
    and implements the corresponding interface of ObjStorageClient for
    Azure.
---
 cloud/cmake/thirdparty.cmake                       |   6 +
 cloud/src/recycler/azure_obj_client.cpp            | 202 +++++++++++++++++++++
 .../{s3_obj_client.h => azure_obj_client.h}        |  19 +-
 cloud/src/recycler/obj_store_accessor.h            |  10 +-
 cloud/src/recycler/s3_accessor.cpp                 |  17 +-
 cloud/src/recycler/s3_accessor.h                   |   2 +-
 cloud/src/recycler/s3_obj_client.h                 |   2 +-
 7 files changed, 243 insertions(+), 15 deletions(-)

diff --git a/cloud/cmake/thirdparty.cmake b/cloud/cmake/thirdparty.cmake
index 0e148896bfc..bacd7d25b3d 100644
--- a/cloud/cmake/thirdparty.cmake
+++ b/cloud/cmake/thirdparty.cmake
@@ -102,6 +102,12 @@ add_thirdparty(lzma LIB64)
 add_thirdparty(idn LIB64)
 add_thirdparty(gsasl)
 # end krb5 libs
+# begin azure libs
+add_thirdparty(azure-core)
+add_thirdparty(azure-identity)
+add_thirdparty(azure-storage-blobs)
+add_thirdparty(azure-storage-common)
+# end azure libs
 
 add_thirdparty(gtest NOTADD)
 add_thirdparty(gtest_main NOTADD)
diff --git a/cloud/src/recycler/azure_obj_client.cpp 
b/cloud/src/recycler/azure_obj_client.cpp
new file mode 100644
index 00000000000..179ebd980fa
--- /dev/null
+++ b/cloud/src/recycler/azure_obj_client.cpp
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "recycler/azure_obj_client.h"
+
+#include <fmt/core.h>
+#include <glog/logging.h>
+
+#include <algorithm>
+#include <azure/core/io/body_stream.hpp>
+#include <azure/storage/blobs.hpp>
+#include <azure/storage/blobs/blob_client.hpp>
+#include <azure/storage/blobs/blob_container_client.hpp>
+#include <azure/storage/blobs/rest_client.hpp>
+#include <azure/storage/common/storage_credential.hpp>
+#include <azure/storage/common/storage_exception.hpp>
+#include <iterator>
+
+#include "recycler/obj_store_accessor.h"
+
+namespace doris::cloud {
+
+constexpr size_t BlobBatchMaxOperations = 256;
+
+template <typename Func>
+ObjectStorageResponse do_azure_client_call(Func f) {
+    try {
+        f();
+    } catch (Azure::Storage::StorageException& e) {
+        return {-1, fmt::format("Azure request failed because {}, http code 
{}, request id {}",
+                                e.Message, static_cast<int>(e.StatusCode), 
e.RequestId)};
+    }
+    return {};
+}
+
+ObjectStorageResponse AzureObjClient::put_object(const 
ObjectStoragePathOptions& opts,
+                                                 std::string_view stream) {
+    auto client = _client->GetBlockBlobClient(opts.key);
+    return do_azure_client_call([&]() {
+        client.UploadFrom(reinterpret_cast<const uint8_t*>(stream.data()), 
stream.size());
+    });
+}
+
+ObjectStorageResponse AzureObjClient::head_object(const 
ObjectStoragePathOptions& opts) {
+    try {
+        Azure::Storage::Blobs::Models::BlobProperties properties =
+                _client->GetBlockBlobClient(opts.key).GetProperties().Value;
+        return {};
+    } catch (Azure::Storage::StorageException& e) {
+        if (e.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) {
+            return {1};
+        }
+        return {-1, fmt::format("Failed to head azure blob due to {}, http 
code {}, request id {}",
+                                e.Message, static_cast<int>(e.StatusCode), 
e.RequestId)};
+    }
+}
+
+ObjectStorageResponse AzureObjClient::list_objects(const 
ObjectStoragePathOptions& opts,
+                                                   std::vector<ObjectMeta>* 
files) {
+    auto get_object_meta = [&](auto&& resp) {
+        std::ranges::transform(
+                resp.Blobs, std::back_inserter(*files), [](auto&& blob_item) 
-> ObjectMeta {
+                    return {.path = std::move(blob_item.Name),
+                            .size = blob_item.BlobSize,
+                            .last_modify_second =
+                                    
blob_item.Details.LastModified.time_since_epoch().count()};
+                });
+    };
+    return do_azure_client_call([&]() {
+        Azure::Storage::Blobs::ListBlobsOptions list_opts;
+        list_opts.Prefix = opts.prefix;
+        auto resp = _client->ListBlobs(list_opts);
+        get_object_meta(resp);
+        while (!resp.NextPageToken->empty()) {
+            list_opts.ContinuationToken = resp.NextPageToken;
+            resp = _client->ListBlobs(list_opts);
+            get_object_meta(resp);
+        }
+    });
+}
+
+// As Azure's doc said, the batch size is 256
+// You can find out the num in 
https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id
+// > Each batch request supports a maximum of 256 subrequests.
+ObjectStorageResponse AzureObjClient::delete_objects(const 
ObjectStoragePathOptions& opts,
+                                                     std::vector<std::string> 
objs) {
+    // TODO(ByteYue) : use range to adate this code when compiler is ready
+    // auto chunkedView = objs | std::views::chunk(BlobBatchMaxOperations);
+    auto begin = std::begin(objs);
+    auto end = std::end(objs);
+
+    while (begin != end) {
+        auto batch = _client->CreateBatch();
+        auto chunkEnd = begin;
+        std::advance(chunkEnd, std::min(BlobBatchMaxOperations,
+                                        
static_cast<size_t>(std::distance(begin, end))));
+        for (auto it = begin; it != chunkEnd; ++it) {
+            batch.DeleteBlob(*it);
+        }
+        begin = chunkEnd;
+        auto resp = do_azure_client_call([&]() { _client->SubmitBatch(batch); 
});
+        if (resp.ret != 0) {
+            return resp;
+        }
+    }
+    return {};
+}
+
+ObjectStorageResponse AzureObjClient::delete_object(const 
ObjectStoragePathOptions& opts) {
+    return do_azure_client_call([&]() { _client->DeleteBlob(opts.key); });
+}
+
+ObjectStorageResponse AzureObjClient::delete_objects_recursively(
+        const ObjectStoragePathOptions& opts) {
+    Azure::Storage::Blobs::ListBlobsOptions list_opts;
+    list_opts.Prefix = opts.prefix;
+    list_opts.PageSizeHint = BlobBatchMaxOperations;
+    auto resp = _client->ListBlobs(list_opts);
+    auto batch = _client->CreateBatch();
+    for (auto&& blob_item : resp.Blobs) {
+        batch.DeleteBlob(blob_item.Name);
+    }
+    auto response = do_azure_client_call([&]() { _client->SubmitBatch(batch); 
});
+    if (response.ret != 0) {
+        return response;
+    }
+    while (!resp.NextPageToken->empty()) {
+        batch = _client->CreateBatch();
+        list_opts.ContinuationToken = resp.NextPageToken;
+        resp = _client->ListBlobs(list_opts);
+        for (auto&& blob_item : resp.Blobs) {
+            batch.DeleteBlob(blob_item.Name);
+        }
+        auto response = do_azure_client_call([&]() { 
_client->SubmitBatch(batch); });
+        if (response.ret != 0) {
+            return response;
+        }
+    }
+    return {};
+}
+
+ObjectStorageResponse AzureObjClient::delete_expired(const 
ObjectStorageDeleteExpiredOptions& opts,
+                                                     int64_t expired_time) {
+    Azure::Storage::Blobs::ListBlobsOptions list_opts;
+    list_opts.Prefix = opts.path_opts.prefix;
+    list_opts.PageSizeHint = BlobBatchMaxOperations;
+    auto resp = _client->ListBlobs(list_opts);
+    auto batch = _client->CreateBatch();
+    for (auto&& blob_item : resp.Blobs) {
+        batch.DeleteBlob(blob_item.Name);
+    }
+    auto response = do_azure_client_call([&]() { _client->SubmitBatch(batch); 
});
+    if (response.ret != 0) {
+        return response;
+    }
+    while (!resp.NextPageToken->empty()) {
+        batch = _client->CreateBatch();
+        list_opts.ContinuationToken = resp.NextPageToken;
+        resp = _client->ListBlobs(list_opts);
+        for (auto&& blob_item : resp.Blobs) {
+            if (blob_item.Details.LastModified.time_since_epoch().count() < 
expired_time) {
+                batch.DeleteBlob(blob_item.Name);
+            }
+        }
+        auto response = do_azure_client_call([&]() { 
_client->SubmitBatch(batch); });
+        if (response.ret != 0) {
+            return response;
+        }
+    }
+    return {};
+}
+
+ObjectStorageResponse AzureObjClient::get_life_cycle(const 
ObjectStoragePathOptions& opts,
+                                                     int64_t* expiration_days) 
{
+    return {-1};
+}
+
+ObjectStorageResponse AzureObjClient::check_versioning(const 
ObjectStoragePathOptions& opts) {
+    return {-1};
+}
+
+const std::shared_ptr<Aws::S3::S3Client>& AzureObjClient::s3_client() {
+    CHECK(true) << "Currently this is unreachable";
+    // TODO(ByteYue): use std::unreachable() instead when compiler supports it
+    __builtin_unreachable();
+}
+
+} // namespace doris::cloud
\ No newline at end of file
diff --git a/cloud/src/recycler/s3_obj_client.h 
b/cloud/src/recycler/azure_obj_client.h
similarity index 80%
copy from cloud/src/recycler/s3_obj_client.h
copy to cloud/src/recycler/azure_obj_client.h
index 891474b5289..79b129ed297 100644
--- a/cloud/src/recycler/s3_obj_client.h
+++ b/cloud/src/recycler/azure_obj_client.h
@@ -21,16 +21,16 @@
 
 #include "recycler/obj_store_accessor.h"
 
-namespace Aws::S3 {
-class S3Client;
-} // namespace Aws::S3
+namespace Azure::Storage::Blobs {
+class BlobContainerClient;
+} // namespace Azure::Storage::Blobs
 
 namespace doris::cloud {
-
-class S3ObjClient : public ObjStorageClient {
+class AzureObjClient : public ObjStorageClient {
 public:
-    S3ObjClient(std::shared_ptr<Aws::S3::S3Client> client) : 
s3_client_(std::move(client)) {}
-    ~S3ObjClient() override = default;
+    AzureObjClient(std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> 
client)
+            : _client(std::move(client)) {}
+    ~AzureObjClient() override = default;
 
     ObjectStorageResponse put_object(const ObjectStoragePathOptions& opts,
                                      std::string_view stream) override;
@@ -48,10 +48,9 @@ public:
 
     ObjectStorageResponse check_versioning(const ObjectStoragePathOptions& 
opts) override;
 
-    const std::shared_ptr<Aws::S3::S3Client>& s3_client() { return s3_client_; 
}
+    const std::shared_ptr<Aws::S3::S3Client>& s3_client() override;
 
 private:
-    std::shared_ptr<Aws::S3::S3Client> s3_client_;
+    std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> _client;
 };
-
 } // namespace doris::cloud
\ No newline at end of file
diff --git a/cloud/src/recycler/obj_store_accessor.h 
b/cloud/src/recycler/obj_store_accessor.h
index a29266133ca..0238a317880 100644
--- a/cloud/src/recycler/obj_store_accessor.h
+++ b/cloud/src/recycler/obj_store_accessor.h
@@ -18,9 +18,14 @@
 #pragma once
 
 #include <functional>
+#include <memory>
 #include <string>
 #include <vector>
 
+namespace Aws::S3 {
+class S3Client;
+} // namespace Aws::S3
+
 namespace doris::cloud {
 
 struct ObjectMeta {
@@ -32,6 +37,7 @@ struct ObjectMeta {
 enum class AccessorType {
     S3,
     HDFS,
+    AZURE,
 };
 
 // TODO(plat1ko): Redesign `Accessor` interface to adapt to storage vaults 
other than S3 style
@@ -86,7 +92,7 @@ struct ObjectStorageDeleteExpiredOptions {
 struct ObjectCompleteMultiParts {};
 
 struct ObjectStorageResponse {
-    ObjectStorageResponse(int r, std::string msg = "") : ret(r), 
error_msg(std::move(msg)) {}
+    ObjectStorageResponse(int r = 0, std::string msg = "") : ret(r), 
error_msg(std::move(msg)) {}
     // clang-format off
     int ret {0}; // To unify the error handle logic with BE, we'd better use 
the same error code as BE
     // clang-format on
@@ -124,6 +130,8 @@ public:
                                                  int64_t* expiration_days) = 0;
     // Check if the objects' versioning is on or off
     virtual ObjectStorageResponse check_versioning(const 
ObjectStoragePathOptions& opts) = 0;
+
+    virtual const std::shared_ptr<Aws::S3::S3Client>& s3_client() = 0;
 };
 
 } // namespace doris::cloud
diff --git a/cloud/src/recycler/s3_accessor.cpp 
b/cloud/src/recycler/s3_accessor.cpp
index 04f642f3831..43341ab63d3 100644
--- a/cloud/src/recycler/s3_accessor.cpp
+++ b/cloud/src/recycler/s3_accessor.cpp
@@ -31,14 +31,16 @@
 #include <aws/s3/model/PutObjectRequest.h>
 
 #include <algorithm>
+#include <azure/storage/blobs/blob_container_client.hpp>
+#include <azure/storage/common/storage_credential.hpp>
 #include <execution>
-#include <type_traits>
+#include <memory>
 #include <utility>
 
 #include "common/config.h"
 #include "common/logging.h"
-#include "common/sync_point.h"
 #include "rate-limiter/s3_rate_limiter.h"
+#include "recycler/azure_obj_client.h"
 #include "recycler/obj_store_accessor.h"
 #include "recycler/s3_obj_client.h"
 
@@ -138,6 +140,17 @@ std::string S3Accessor::get_relative_path(const 
std::string& key) const {
 
 int S3Accessor::init() {
     static S3Environment s3_env;
+    if (type() == AccessorType::AZURE) {
+        auto cred =
+                
std::make_shared<Azure::Storage::StorageSharedKeyCredential>(conf_.ak, 
conf_.sk);
+        const std::string container_name = conf_.bucket;
+        const std::string uri =
+                fmt::format("http://{}.blob.core.windows.net/{}";, conf_.ak, 
container_name);
+        auto container_client =
+                
std::make_shared<Azure::Storage::Blobs::BlobContainerClient>(uri, cred);
+        obj_client_ = 
std::make_shared<AzureObjClient>(std::move(container_client));
+        return 0;
+    }
     Aws::Auth::AWSCredentials aws_cred(conf_.ak, conf_.sk);
     Aws::Client::ClientConfiguration aws_config;
     aws_config.endpointOverride = conf_.endpoint;
diff --git a/cloud/src/recycler/s3_accessor.h b/cloud/src/recycler/s3_accessor.h
index 5221f90cafc..3ebaf89cff3 100644
--- a/cloud/src/recycler/s3_accessor.h
+++ b/cloud/src/recycler/s3_accessor.h
@@ -93,7 +93,7 @@ private:
 private:
     S3Conf conf_;
     std::string path_;
-    std::shared_ptr<S3ObjClient> obj_client_;
+    std::shared_ptr<ObjStorageClient> obj_client_;
 };
 
 class GcsAccessor final : public S3Accessor {
diff --git a/cloud/src/recycler/s3_obj_client.h 
b/cloud/src/recycler/s3_obj_client.h
index 891474b5289..4f6ef9eab08 100644
--- a/cloud/src/recycler/s3_obj_client.h
+++ b/cloud/src/recycler/s3_obj_client.h
@@ -48,7 +48,7 @@ public:
 
     ObjectStorageResponse check_versioning(const ObjectStoragePathOptions& 
opts) override;
 
-    const std::shared_ptr<Aws::S3::S3Client>& s3_client() { return s3_client_; 
}
+    const std::shared_ptr<Aws::S3::S3Client>& s3_client() override { return 
s3_client_; }
 
 private:
     std::shared_ptr<Aws::S3::S3Client> s3_client_;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to