This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d4439383e54 [feature](Cloud) Add azure's support to BE (#35670)
d4439383e54 is described below

commit d4439383e54004db01b0012d44c8003b5c547432
Author: AlexYue <yj976240...@gmail.com>
AuthorDate: Tue Jun 11 15:03:38 2024 +0800

    [feature](Cloud) Add azure's support to BE (#35670)
    
    As one subsequent pr of #35307, this pr will link azure into BE, and
    implements the corresponding interface of ObjStorageClient for Azure.
---
 be/cmake/thirdparty.cmake                 |   5 +
 be/src/agent/task_worker_pool.cpp         |  28 +---
 be/src/cloud/cloud_meta_mgr.cpp           |  15 +-
 be/src/io/fs/azure_obj_storage_client.cpp | 235 ++++++++++++++++++++++++++++++
 be/src/io/fs/azure_obj_storage_client.h   |  63 ++++++++
 be/src/io/fs/obj_storage_client.h         |   3 +-
 be/src/io/fs/s3_file_system.cpp           |   8 +-
 be/src/util/s3_util.cpp                   | 148 +++++++++++++++++--
 be/src/util/s3_util.h                     |  26 ++--
 be/test/io/fs/azure_test.cpp              |  96 ++++++++++++
 be/test/io/fs/s3_file_writer_test.cpp     |   3 +-
 gensrc/proto/cloud.proto                  |  14 +-
 gensrc/thrift/AgentService.thrift         |  11 ++
 13 files changed, 593 insertions(+), 62 deletions(-)

diff --git a/be/cmake/thirdparty.cmake b/be/cmake/thirdparty.cmake
index 78f28fe72dc..41203a9006a 100644
--- a/be/cmake/thirdparty.cmake
+++ b/be/cmake/thirdparty.cmake
@@ -139,6 +139,11 @@ if (NOT OS_MACOSX)
     add_thirdparty(aws-s2n LIBNAME "lib/libs2n.a")
 endif()
 
+add_thirdparty(azure-core)
+add_thirdparty(azure-identity)
+add_thirdparty(azure-storage-blobs)
+add_thirdparty(azure-storage-common)
+
 add_thirdparty(minizip LIB64)
 add_thirdparty(simdjson LIB64)
 add_thirdparty(idn LIB64)
diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 31cfe19d4f0..a776be08f79 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -53,6 +53,7 @@
 #include "io/fs/file_system.h"
 #include "io/fs/hdfs_file_system.h"
 #include "io/fs/local_file_system.h"
+#include "io/fs/obj_storage_client.h"
 #include "io/fs/path.h"
 #include "io/fs/remote_file_system.h"
 #include "io/fs/s3_file_system.h"
@@ -1378,23 +1379,8 @@ void update_s3_resource(const TStorageResource& param, 
io::RemoteFileSystemSPtr
 
     if (!existed_fs) {
         // No such FS instance on BE
-        S3Conf s3_conf {
-                .bucket = param.s3_storage_param.bucket,
-                .prefix = param.s3_storage_param.root_path,
-                .client_conf = {
-                        .endpoint = param.s3_storage_param.endpoint,
-                        .region = param.s3_storage_param.region,
-                        .ak = param.s3_storage_param.ak,
-                        .sk = param.s3_storage_param.sk,
-                        .token = param.s3_storage_param.token,
-                        .max_connections = param.s3_storage_param.max_conn,
-                        .request_timeout_ms = 
param.s3_storage_param.request_timeout_ms,
-                        .connect_timeout_ms = 
param.s3_storage_param.conn_timeout_ms,
-                        // When using cold heat separation in minio, user 
might use ip address directly,
-                        // which needs enable use_virtual_addressing to true
-                        .use_virtual_addressing = 
!param.s3_storage_param.use_path_style,
-                }};
-        auto res = io::S3FileSystem::create(std::move(s3_conf), 
std::to_string(param.id));
+        auto res = 
io::S3FileSystem::create(S3Conf::get_s3_conf(param.s3_storage_param),
+                                            std::to_string(param.id));
         if (!res.has_value()) {
             st = std::move(res).error();
         } else {
@@ -1403,10 +1389,12 @@ void update_s3_resource(const TStorageResource& param, 
io::RemoteFileSystemSPtr
     } else {
         DCHECK_EQ(existed_fs->type(), io::FileSystemType::S3) << param.id << ' 
' << param.name;
         auto client = 
static_cast<io::S3FileSystem*>(existed_fs.get())->client_holder();
+        auto new_s3_conf = S3Conf::get_s3_conf(param.s3_storage_param);
         S3ClientConf conf {
-                .ak = param.s3_storage_param.ak,
-                .sk = param.s3_storage_param.sk,
-                .token = param.s3_storage_param.token,
+                .ak = std::move(new_s3_conf.client_conf.ak),
+                .sk = std::move(new_s3_conf.client_conf.sk),
+                .token = std::move(new_s3_conf.client_conf.token),
+                .provider = new_s3_conf.client_conf.provider,
         };
         st = client->reset(conf);
         fs = std::move(existed_fs);
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index a14ec2b0497..d55c884a6c2 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -46,6 +46,7 @@
 #include "gen_cpp/Types_types.h"
 #include "gen_cpp/cloud.pb.h"
 #include "gen_cpp/olap_file.pb.h"
+#include "io/fs/obj_storage_client.h"
 #include "olap/olap_common.h"
 #include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_factory.h"
@@ -825,17 +826,7 @@ Status 
CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos) {
     }
 
     auto add_obj_store = [&vault_infos](const auto& obj_store) {
-        vault_infos->emplace_back(obj_store.id(),
-                                  S3Conf {
-                                          .bucket = obj_store.bucket(),
-                                          .prefix = obj_store.prefix(),
-                                          .client_conf {.endpoint = 
obj_store.endpoint(),
-                                                        .region = 
obj_store.region(),
-                                                        .ak = obj_store.ak(),
-                                                        .sk = obj_store.sk()},
-                                          .sse_enabled = 
obj_store.sse_enabled(),
-                                          .provider = obj_store.provider(),
-                                  },
+        vault_infos->emplace_back(obj_store.id(), 
S3Conf::get_s3_conf(obj_store),
                                   StorageVaultPB_PathFormat {});
     };
 
@@ -853,7 +844,7 @@ Status 
CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos) {
         resp.mutable_obj_info(i)->set_sk(resp.obj_info(i).sk().substr(0, 2) + 
"xxx");
     }
     for (int i = 0; i < resp.storage_vault_size(); ++i) {
-        auto j = resp.mutable_storage_vault(i);
+        auto* j = resp.mutable_storage_vault(i);
         if (!j->has_obj_info()) continue;
         j->mutable_obj_info()->set_sk(j->obj_info().sk().substr(0, 2) + "xxx");
     }
diff --git a/be/src/io/fs/azure_obj_storage_client.cpp 
b/be/src/io/fs/azure_obj_storage_client.cpp
new file mode 100644
index 00000000000..9569bf9a8e8
--- /dev/null
+++ b/be/src/io/fs/azure_obj_storage_client.cpp
@@ -0,0 +1,235 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/fs/azure_obj_storage_client.h"
+
+#include <algorithm>
+#include <azure/core/io/body_stream.hpp>
+#include <azure/storage/blobs.hpp>
+#include <azure/storage/blobs/blob_client.hpp>
+#include <azure/storage/blobs/blob_container_client.hpp>
+#include <azure/storage/blobs/rest_client.hpp>
+#include <azure/storage/common/storage_credential.hpp>
+#include <azure/storage/common/storage_exception.hpp>
+#include <iterator>
+
+#include "common/logging.h"
+#include "common/status.h"
+#include "io/fs/obj_storage_client.h"
+
+namespace {
+std::string wrap_object_storage_path_msg(const 
doris::io::ObjectStoragePathOptions& opts) {
+    return fmt::format("bucket {}, key {}, prefix {}, path {}", opts.bucket, 
opts.key, opts.prefix,
+                       opts.path.native());
+}
+} // namespace
+
+namespace doris::io {
+
+// As Azure's doc said, the batch size is 256
+// You can find out the num in 
https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id
+// > Each batch request supports a maximum of 256 subrequests.
+constexpr size_t BlobBatchMaxOperations = 256;
+
+template <typename Func>
+ObjectStorageResponse do_azure_client_call(Func f, const 
ObjectStoragePathOptions& opts) {
+    try {
+        f();
+    } catch (Azure::Storage::StorageException& e) {
+        auto msg = fmt::format("Azure request failed because {}, path msg {}", 
e.Message,
+                               wrap_object_storage_path_msg(opts));
+        LOG_WARNING(msg);
+        return {.status = 
convert_to_obj_response(Status::InternalError<false>(std::move(msg))),
+                .http_code = static_cast<int>(e.StatusCode),
+                .request_id = std::move(e.RequestId)};
+    }
+    return {};
+}
+
+// Azure would do nothing
+ObjectStorageUploadResponse AzureObjStorageClient::create_multipart_upload(
+        const ObjectStoragePathOptions& opts) {
+    return {};
+}
+
+ObjectStorageResponse AzureObjStorageClient::put_object(const 
ObjectStoragePathOptions& opts,
+                                                        std::string_view 
stream) {
+    auto client = _client->GetBlockBlobClient(opts.key);
+    return do_azure_client_call(
+            [&]() {
+                client.UploadFrom(reinterpret_cast<const 
uint8_t*>(stream.data()), stream.size());
+            },
+            opts);
+}
+
+ObjectStorageUploadResponse AzureObjStorageClient::upload_part(const 
ObjectStoragePathOptions& opts,
+                                                               
std::string_view stream,
+                                                               int part_num) {
+    auto client = _client->GetBlockBlobClient(opts.key);
+    try {
+        Azure::Core::IO::MemoryBodyStream memory_body(
+                reinterpret_cast<const uint8_t*>(stream.data()), 
stream.size());
+        client.StageBlock(std::to_string(part_num), memory_body);
+    } catch (Azure::Storage::StorageException& e) {
+        auto msg = fmt::format("Azure request failed because {}, path msg {}", 
e.Message,
+                               wrap_object_storage_path_msg(opts));
+        LOG_WARNING(msg);
+        // clang-format off
+        return {
+            .resp = {
+                .status = convert_to_obj_response(
+                        Status::InternalError<false>(std::move(msg))),
+                .http_code = static_cast<int>(e.StatusCode),
+                .request_id = std::move(e.RequestId),
+            },
+        };
+        // clang-format on
+    }
+    return {};
+}
+
+ObjectStorageResponse AzureObjStorageClient::complete_multipart_upload(
+        const ObjectStoragePathOptions& opts, const ObjectCompleteMultiParts& 
completed_parts) {
+    auto client = _client->GetBlockBlobClient(opts.key);
+    const auto& block_ids = static_cast<const 
AzureCompleteMultiParts&>(completed_parts).block_ids;
+    std::vector<std::string> string_block_ids;
+    std::ranges::transform(block_ids, std::back_inserter(string_block_ids),
+                           [](int i) { return std::to_string(i); });
+    return do_azure_client_call([&]() { 
client.CommitBlockList(string_block_ids); }, opts);
+}
+
+ObjectStorageHeadResponse AzureObjStorageClient::head_object(const 
ObjectStoragePathOptions& opts) {
+    try {
+        Azure::Storage::Blobs::Models::BlobProperties properties =
+                _client->GetBlockBlobClient(opts.key).GetProperties().Value;
+        return {.file_size = properties.BlobSize};
+    } catch (Azure::Storage::StorageException& e) {
+        if (e.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) {
+            return ObjectStorageHeadResponse {
+                    .resp = {.status = 
convert_to_obj_response(Status::NotFound<false>("")),
+                             .http_code = static_cast<int>(e.StatusCode),
+                             .request_id = std::move(e.RequestId)},
+            };
+        }
+        auto msg = fmt::format("Failed to head azure blob due to {}, path msg 
{}", e.Message,
+                               wrap_object_storage_path_msg(opts));
+        return ObjectStorageHeadResponse {
+                .resp = {.status = convert_to_obj_response(
+                                 Status::InternalError<false>(std::move(msg))),
+                         .http_code = static_cast<int>(e.StatusCode),
+                         .request_id = std::move(e.RequestId)},
+        };
+    }
+}
+
+ObjectStorageResponse AzureObjStorageClient::get_object(const 
ObjectStoragePathOptions& opts,
+                                                        void* buffer, size_t 
offset,
+                                                        size_t bytes_read, 
size_t* size_return) {
+    auto client = _client->GetBlockBlobClient(opts.key);
+    return do_azure_client_call(
+            [&]() {
+                Azure::Storage::Blobs::DownloadBlobToOptions download_opts;
+                download_opts.Range->Offset = offset;
+                download_opts.Range->Length = bytes_read;
+                client.DownloadTo(reinterpret_cast<uint8_t*>(buffer), 
bytes_read, download_opts);
+            },
+            opts);
+}
+
+ObjectStorageResponse AzureObjStorageClient::list_objects(const 
ObjectStoragePathOptions& opts,
+                                                          
std::vector<FileInfo>* files) {
+    auto get_file_file = [&](Azure::Storage::Blobs::ListBlobsPagedResponse& 
resp) {
+        std::ranges::transform(resp.Blobs, std::back_inserter(*files), 
[](auto&& blob_item) {
+            return FileInfo {
+                    .file_name = blob_item.Name, .file_size = 
blob_item.BlobSize, .is_file = true};
+        });
+    };
+    return do_azure_client_call(
+            [&]() {
+                Azure::Storage::Blobs::ListBlobsOptions list_opts;
+                list_opts.Prefix = opts.prefix;
+                auto resp = _client->ListBlobs(list_opts);
+                get_file_file(resp);
+                while (!resp.NextPageToken->empty()) {
+                    list_opts.ContinuationToken = resp.NextPageToken;
+                    resp = _client->ListBlobs(list_opts);
+                    get_file_file(resp);
+                }
+            },
+            opts);
+}
+
+// As Azure's doc said, the batch size is 256
+// You can find out the num in 
https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id
+// > Each batch request supports a maximum of 256 subrequests.
+ObjectStorageResponse AzureObjStorageClient::delete_objects(const 
ObjectStoragePathOptions& opts,
+                                                            
std::vector<std::string> objs) {
+    // TODO(ByteYue) : use range to adate this code when compiler is ready
+    // auto chunkedView = objs | std::views::chunk(BlobBatchMaxOperations);
+    auto begin = std::begin(objs);
+    auto end = std::end(objs);
+
+    while (begin != end) {
+        auto batch = _client->CreateBatch();
+        auto chunkEnd = begin;
+        std::advance(chunkEnd, std::min(BlobBatchMaxOperations,
+                                        
static_cast<size_t>(std::distance(begin, end))));
+        for (auto it = begin; it != chunkEnd; ++it) {
+            batch.DeleteBlob(*it);
+        }
+        begin = chunkEnd;
+        auto resp = do_azure_client_call([&]() { _client->SubmitBatch(batch); 
}, opts);
+        if (resp.status.code != ErrorCode::OK) {
+            return resp;
+        }
+    }
+    return {};
+}
+
+ObjectStorageResponse AzureObjStorageClient::delete_object(const 
ObjectStoragePathOptions& opts) {
+    return do_azure_client_call([&]() { _client->DeleteBlob(opts.key); }, 
opts);
+}
+
+ObjectStorageResponse AzureObjStorageClient::delete_objects_recursively(
+        const ObjectStoragePathOptions& opts) {
+    Azure::Storage::Blobs::ListBlobsOptions list_opts;
+    list_opts.Prefix = opts.prefix;
+    list_opts.PageSizeHint = BlobBatchMaxOperations;
+    auto resp = _client->ListBlobs(list_opts);
+    auto batch = _client->CreateBatch();
+    for (auto&& blob_item : resp.Blobs) {
+        batch.DeleteBlob(blob_item.Name);
+    }
+    auto response = do_azure_client_call([&]() { _client->SubmitBatch(batch); 
}, opts);
+    if (response.status.code != ErrorCode::OK) {
+        return response;
+    }
+    while (!resp.NextPageToken->empty()) {
+        batch = _client->CreateBatch();
+        list_opts.ContinuationToken = resp.NextPageToken;
+        resp = _client->ListBlobs(list_opts);
+        for (auto&& blob_item : resp.Blobs) {
+            batch.DeleteBlob(blob_item.Name);
+        }
+        auto response = do_azure_client_call([&]() { 
_client->SubmitBatch(batch); }, opts);
+        if (response.status.code != ErrorCode::OK) {
+            return response;
+        }
+    }
+    return {};
+}
+} // namespace doris::io
\ No newline at end of file
diff --git a/be/src/io/fs/azure_obj_storage_client.h 
b/be/src/io/fs/azure_obj_storage_client.h
new file mode 100644
index 00000000000..a8c2db9d4db
--- /dev/null
+++ b/be/src/io/fs/azure_obj_storage_client.h
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "io/fs/obj_storage_client.h"
+
+namespace Azure::Storage::Blobs {
+class BlobContainerClient;
+} // namespace Azure::Storage::Blobs
+
+namespace doris::io {
+
+struct AzureCompleteMultiParts : public ObjectCompleteMultiParts {
+    std::vector<int> block_ids;
+};
+
+class ObjClientHolder;
+
+class AzureObjStorageClient final : public ObjStorageClient {
+public:
+    
AzureObjStorageClient(std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient>
 client)
+            : _client(std::move(client)) {}
+    ~AzureObjStorageClient() override = default;
+    ObjectStorageUploadResponse create_multipart_upload(
+            const ObjectStoragePathOptions& opts) override;
+    ObjectStorageResponse put_object(const ObjectStoragePathOptions& opts,
+                                     std::string_view stream) override;
+    ObjectStorageUploadResponse upload_part(const ObjectStoragePathOptions& 
opts, std::string_view,
+                                            int partNum) override;
+    ObjectStorageResponse complete_multipart_upload(
+            const ObjectStoragePathOptions& opts,
+            const ObjectCompleteMultiParts& completed_parts) override;
+    ObjectStorageHeadResponse head_object(const ObjectStoragePathOptions& 
opts) override;
+    ObjectStorageResponse get_object(const ObjectStoragePathOptions& opts, 
void* buffer,
+                                     size_t offset, size_t bytes_read,
+                                     size_t* size_return) override;
+    ObjectStorageResponse list_objects(const ObjectStoragePathOptions& opts,
+                                       std::vector<FileInfo>* files) override;
+    ObjectStorageResponse delete_objects(const ObjectStoragePathOptions& opts,
+                                         std::vector<std::string> objs) 
override;
+    ObjectStorageResponse delete_object(const ObjectStoragePathOptions& opts) 
override;
+    ObjectStorageResponse delete_objects_recursively(const 
ObjectStoragePathOptions& opts) override;
+
+private:
+    std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> _client;
+};
+
+} // namespace doris::io
\ No newline at end of file
diff --git a/be/src/io/fs/obj_storage_client.h 
b/be/src/io/fs/obj_storage_client.h
index 3ab0a8e2dea..2a99bde80f1 100644
--- a/be/src/io/fs/obj_storage_client.h
+++ b/be/src/io/fs/obj_storage_client.h
@@ -27,7 +27,8 @@ namespace io {
 
 // Names are in lexico order.
 enum class ObjStorageType : uint8_t {
-    AWS = 0,
+    UNKNOWN = 0,
+    AWS = 1,
     AZURE,
     BOS,
     COS,
diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp
index 27aff992f4c..648a80cda8c 100644
--- a/be/src/io/fs/s3_file_system.cpp
+++ b/be/src/io/fs/s3_file_system.cpp
@@ -79,12 +79,11 @@ ObjClientHolder::ObjClientHolder(S3ClientConf conf) : 
_conf(std::move(conf)) {}
 ObjClientHolder::~ObjClientHolder() = default;
 
 Status ObjClientHolder::init() {
-    auto client = S3ClientFactory::instance().create(_conf);
-    if (!client) {
+    _client = S3ClientFactory::instance().create(_conf);
+    if (!_client) {
         return Status::InternalError("failed to init s3 client with conf {}", 
_conf.to_string());
     }
 
-    _client = std::make_shared<S3ObjStorageClient>(std::move(client));
     return Status::OK();
 }
 
@@ -100,6 +99,7 @@ Status ObjClientHolder::reset(const S3ClientConf& conf) {
         reset_conf.ak = conf.ak;
         reset_conf.sk = conf.sk;
         reset_conf.token = conf.token;
+        reset_conf.bucket = conf.bucket;
         // Should check endpoint here?
     }
 
@@ -112,7 +112,7 @@ Status ObjClientHolder::reset(const S3ClientConf& conf) {
 
     {
         std::lock_guard lock(_mtx);
-        _client = std::make_shared<S3ObjStorageClient>(std::move(client));
+        _client = std::move(client);
         _conf = std::move(reset_conf);
     }
 
diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp
index 6cf8e97962e..e89366f3ab8 100644
--- a/be/src/util/s3_util.cpp
+++ b/be/src/util/s3_util.cpp
@@ -29,6 +29,7 @@
 #include <util/string_util.h>
 
 #include <atomic>
+#include <azure/storage/blobs/blob_container_client.hpp>
 #include <cstdlib>
 #include <filesystem>
 #include <functional>
@@ -40,6 +41,9 @@
 #include "common/logging.h"
 #include "common/status.h"
 #include "common/sync_point.h"
+#include "io/fs/azure_obj_storage_client.h"
+#include "io/fs/obj_storage_client.h"
+#include "io/fs/s3_obj_storage_client.h"
 #include "runtime/exec_env.h"
 #include "s3_uri.h"
 #include "vec/exec/scan/scanner_scheduler.h"
@@ -138,7 +142,7 @@ S3ClientFactory::S3ClientFactory() {
 
 string S3ClientFactory::get_valid_ca_cert_path() {
     vector<std::string> vec_ca_file_path = 
doris::split(config::ca_cert_file_paths, ";");
-    vector<std::string>::iterator it = vec_ca_file_path.begin();
+    auto it = vec_ca_file_path.begin();
     for (; it != vec_ca_file_path.end(); ++it) {
         if (std::filesystem::exists(*it)) {
             return *it;
@@ -156,9 +160,7 @@ S3ClientFactory& S3ClientFactory::instance() {
     return ret;
 }
 
-std::shared_ptr<Aws::S3::S3Client> S3ClientFactory::create(const S3ClientConf& 
s3_conf) {
-    TEST_SYNC_POINT_RETURN_WITH_VALUE("s3_client_factory::create",
-                                      std::make_shared<Aws::S3::S3Client>());
+std::shared_ptr<io::ObjStorageClient> S3ClientFactory::create(const 
S3ClientConf& s3_conf) {
     if (!is_s3_conf_valid(s3_conf)) {
         return nullptr;
     }
@@ -172,6 +174,36 @@ std::shared_ptr<Aws::S3::S3Client> 
S3ClientFactory::create(const S3ClientConf& s
         }
     }
 
+    auto obj_client = (s3_conf.provider == io::ObjStorageType::AZURE)
+                              ? _create_azure_client(s3_conf)
+                              : _create_s3_client(s3_conf);
+
+    {
+        uint64_t hash = s3_conf.get_hash();
+        std::lock_guard l(_lock);
+        _cache[hash] = obj_client;
+    }
+    return obj_client;
+}
+
+std::shared_ptr<io::ObjStorageClient> S3ClientFactory::_create_azure_client(
+        const S3ClientConf& s3_conf) {
+    auto cred =
+            
std::make_shared<Azure::Storage::StorageSharedKeyCredential>(s3_conf.ak, 
s3_conf.sk);
+
+    const std::string container_name = s3_conf.bucket;
+    const std::string uri = fmt::format("{}://{}.blob.core.windows.net/{}",
+                                        config::s3_client_http_scheme, 
s3_conf.ak, container_name);
+
+    auto containerClient = 
std::make_shared<Azure::Storage::Blobs::BlobContainerClient>(uri, cred);
+    return 
std::make_shared<io::AzureObjStorageClient>(std::move(containerClient));
+}
+
+std::shared_ptr<io::ObjStorageClient> S3ClientFactory::_create_s3_client(
+        const S3ClientConf& s3_conf) {
+    TEST_SYNC_POINT_RETURN_WITH_VALUE(
+            "s3_client_factory::create",
+            
std::make_shared<io::S3ObjStorageClient>(std::make_shared<Aws::S3::S3Client>()));
     Aws::Client::ClientConfiguration aws_config = 
S3ClientFactory::getClientConfiguration();
     aws_config.endpointOverride = s3_conf.endpoint;
     aws_config.region = s3_conf.region;
@@ -231,11 +263,8 @@ std::shared_ptr<Aws::S3::S3Client> 
S3ClientFactory::create(const S3ClientConf& s
                 s3_conf.use_virtual_addressing);
     }
 
-    {
-        std::lock_guard l(_lock);
-        _cache[hash] = new_client;
-    }
-    return new_client;
+    auto obj_client = 
std::make_shared<io::S3ObjStorageClient>(std::move(new_client));
+    return obj_client;
 }
 
 Status S3ClientFactory::convert_properties_to_s3_conf(
@@ -293,4 +322,105 @@ Status S3ClientFactory::convert_properties_to_s3_conf(
     return Status::OK();
 }
 
+S3Conf S3Conf::get_s3_conf(const cloud::ObjectStoreInfoPB& info) {
+    S3Conf ret {
+            .bucket = info.bucket(),
+            .prefix = info.prefix(),
+            .client_conf {
+                    .endpoint = info.endpoint(),
+                    .region = info.region(),
+                    .ak = info.ak(),
+                    .sk = info.sk(),
+                    .bucket = info.bucket(),
+                    .provider = io::ObjStorageType::AWS,
+            },
+            .sse_enabled = info.sse_enabled(),
+    };
+
+    io::ObjStorageType type = io::ObjStorageType::AWS;
+    switch (info.provider()) {
+    case cloud::ObjectStoreInfoPB_Provider_OSS:
+        type = io::ObjStorageType::OSS;
+        break;
+    case cloud::ObjectStoreInfoPB_Provider_S3:
+        type = io::ObjStorageType::AWS;
+        break;
+    case cloud::ObjectStoreInfoPB_Provider_COS:
+        type = io::ObjStorageType::COS;
+        break;
+    case cloud::ObjectStoreInfoPB_Provider_OBS:
+        type = io::ObjStorageType::OBS;
+        break;
+    case cloud::ObjectStoreInfoPB_Provider_BOS:
+        type = io::ObjStorageType::BOS;
+        break;
+    case cloud::ObjectStoreInfoPB_Provider_GCP:
+        type = io::ObjStorageType::GCP;
+        break;
+    case cloud::ObjectStoreInfoPB_Provider_AZURE:
+        type = io::ObjStorageType::AZURE;
+        break;
+    default:
+        LOG_FATAL("unknown provider type {}, info {}", info.provider(), 
ret.to_string());
+        __builtin_unreachable();
+    }
+    ret.client_conf.provider = type;
+    return ret;
+}
+
+S3Conf S3Conf::get_s3_conf(const TS3StorageParam& param) {
+    S3Conf ret {
+            .bucket = param.bucket,
+            .prefix = param.root_path,
+            .client_conf = {
+                    .endpoint = param.endpoint,
+                    .region = param.region,
+                    .ak = param.ak,
+                    .sk = param.sk,
+                    .token = param.token,
+                    .bucket = param.bucket,
+                    .provider = io::ObjStorageType::AWS,
+                    .max_connections = param.max_conn,
+                    .request_timeout_ms = param.request_timeout_ms,
+                    .connect_timeout_ms = param.conn_timeout_ms,
+                    // When using cold heat separation in minio, user might 
use ip address directly,
+                    // which needs enable use_virtual_addressing to true
+                    .use_virtual_addressing = !param.use_path_style,
+            }};
+    io::ObjStorageType type = io::ObjStorageType::AWS;
+    switch (param.provider) {
+    case TObjStorageType::UNKNOWN:
+        LOG_INFO("Receive one legal storage resource, set provider type to 
aws, param detail {}",
+                 ret.to_string());
+        type = io::ObjStorageType::AWS;
+        break;
+    case TObjStorageType::AWS:
+        type = io::ObjStorageType::AWS;
+        break;
+    case TObjStorageType::AZURE:
+        type = io::ObjStorageType::AZURE;
+        break;
+    case TObjStorageType::BOS:
+        type = io::ObjStorageType::BOS;
+        break;
+    case TObjStorageType::COS:
+        type = io::ObjStorageType::COS;
+        break;
+    case TObjStorageType::OBS:
+        type = io::ObjStorageType::OBS;
+        break;
+    case TObjStorageType::OSS:
+        type = io::ObjStorageType::OSS;
+        break;
+    case TObjStorageType::GCP:
+        type = io::ObjStorageType::GCP;
+        break;
+    default:
+        LOG_FATAL("unknown provider type {}, info {}", param.provider, 
ret.to_string());
+        __builtin_unreachable();
+    }
+    ret.client_conf.provider = type;
+    return ret;
+}
+
 } // end namespace doris
diff --git a/be/src/util/s3_util.h b/be/src/util/s3_util.h
index 5dd68069759..1764b1b8b86 100644
--- a/be/src/util/s3_util.h
+++ b/be/src/util/s3_util.h
@@ -22,6 +22,7 @@
 #include <aws/s3/S3Errors.h>
 #include <bvar/bvar.h>
 #include <fmt/format.h>
+#include <gen_cpp/AgentService_types.h>
 #include <gen_cpp/cloud.pb.h>
 #include <stdint.h>
 
@@ -32,14 +33,14 @@
 #include <unordered_map>
 
 #include "common/status.h"
+#include "io/fs/obj_storage_client.h"
 #include "util/s3_rate_limiter.h"
 #include "vec/common/string_ref.h"
 
-namespace Aws {
-namespace S3 {
+namespace Aws::S3 {
 class S3Client;
-} // namespace S3
-} // namespace Aws
+} // namespace Aws::S3
+
 namespace bvar {
 template <typename T>
 class Adder;
@@ -95,6 +96,9 @@ struct S3ClientConf {
     std::string ak;
     std::string sk;
     std::string token;
+    // For azure we'd better support the bucket at the first time init azure 
blob container client
+    std::string bucket;
+    io::ObjStorageType provider = io::ObjStorageType::AWS;
     int max_connections = -1;
     int request_timeout_ms = -1;
     int connect_timeout_ms = -1;
@@ -107,6 +111,7 @@ struct S3ClientConf {
         hash_code ^= crc32_hash(token);
         hash_code ^= crc32_hash(endpoint);
         hash_code ^= crc32_hash(region);
+        hash_code ^= crc32_hash(bucket);
         hash_code ^= max_connections;
         hash_code ^= request_timeout_ms;
         hash_code ^= connect_timeout_ms;
@@ -116,9 +121,9 @@ struct S3ClientConf {
 
     std::string to_string() const {
         return fmt::format(
-                "(ak={}, token={}, endpoint={}, region={}, max_connections={}, 
"
+                "(ak={}, token={}, endpoint={}, region={}, bucket={}, 
max_connections={}, "
                 "request_timeout_ms={}, connect_timeout_ms={}, 
use_virtual_addressing={}",
-                ak, token, endpoint, region, max_connections, 
request_timeout_ms,
+                ak, token, endpoint, region, bucket, max_connections, 
request_timeout_ms,
                 connect_timeout_ms, use_virtual_addressing);
     }
 };
@@ -129,7 +134,8 @@ struct S3Conf {
     S3ClientConf client_conf;
 
     bool sse_enabled = false;
-    cloud::ObjectStoreInfoPB::Provider provider;
+    static S3Conf get_s3_conf(const cloud::ObjectStoreInfoPB&);
+    static S3Conf get_s3_conf(const TS3StorageParam&);
 
     std::string to_string() const {
         return fmt::format("(bucket={}, prefix={}, client_conf={}, 
sse_enabled={})", bucket, prefix,
@@ -143,7 +149,7 @@ public:
 
     static S3ClientFactory& instance();
 
-    std::shared_ptr<Aws::S3::S3Client> create(const S3ClientConf& s3_conf);
+    std::shared_ptr<io::ObjStorageClient> create(const S3ClientConf& s3_conf);
 
     static Status convert_properties_to_s3_conf(const std::map<std::string, 
std::string>& prop,
                                                 const S3URI& s3_uri, S3Conf* 
s3_conf);
@@ -161,12 +167,14 @@ public:
     S3RateLimiterHolder* rate_limiter(S3RateLimitType type);
 
 private:
+    std::shared_ptr<io::ObjStorageClient> _create_s3_client(const 
S3ClientConf& s3_conf);
+    std::shared_ptr<io::ObjStorageClient> _create_azure_client(const 
S3ClientConf& s3_conf);
     S3ClientFactory();
     static std::string get_valid_ca_cert_path();
 
     Aws::SDKOptions _aws_options;
     std::mutex _lock;
-    std::unordered_map<uint64_t, std::shared_ptr<Aws::S3::S3Client>> _cache;
+    std::unordered_map<uint64_t, std::shared_ptr<io::ObjStorageClient>> _cache;
     std::string _ca_cert_file_path;
     std::array<std::unique_ptr<S3RateLimiterHolder>, 2> _rate_limiters;
 };
diff --git a/be/test/io/fs/azure_test.cpp b/be/test/io/fs/azure_test.cpp
new file mode 100644
index 00000000000..f158cf0a7b4
--- /dev/null
+++ b/be/test/io/fs/azure_test.cpp
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <fmt/core.h>
+#include <gtest/gtest.h>
+
+#include <azure/storage/blobs.hpp>
+#include <azure/storage/blobs/blob_client.hpp>
+#include <azure/storage/blobs/blob_container_client.hpp>
+#include <azure/storage/common/storage_credential.hpp>
+#include <cstdio>
+#include <iostream>
+#include <stdexcept>
+#include <utility>
+
+#include "common/config.h"
+
+namespace doris {
+
+std::string GetConnectionString() {
+    const static std::string ConnectionString = "";
+
+    if (!ConnectionString.empty()) {
+        return ConnectionString;
+    }
+    const static std::string envConnectionString = 
std::getenv("AZURE_STORAGE_CONNECTION_STRING");
+    if (!envConnectionString.empty()) {
+        return envConnectionString;
+    }
+    throw std::runtime_error("Cannot find connection string.");
+}
+
+TEST(AzureTest, Write) {
+    GTEST_SKIP() << "Skipping Azure test, because this test it to test the 
compile and linkage";
+    using namespace Azure::Storage::Blobs;
+
+    std::string accountName = config::test_s3_ak;
+    std::string accountKey = config::test_s3_sk;
+
+    auto cred =
+            
std::make_shared<Azure::Storage::StorageSharedKeyCredential>(accountName, 
accountKey);
+
+    const std::string containerName = config::test_s3_bucket;
+    const std::string blobName = "sample-blob";
+    const std::string blobContent = "Fuck Azure!";
+    const std::string uri =
+            fmt::format("https://{}.blob.core.windows.net/{}";, accountName, 
containerName);
+
+    // auto containerClient =
+    //         
BlobContainerClient::CreateFromConnectionString(GetConnectionString(), 
containerName);
+
+    auto containerClient = BlobContainerClient(uri, cred);
+    containerClient.CreateIfNotExists();
+
+    std::vector<int> blockIds1;
+
+    auto blockBlobContainer = containerClient.GetBlockBlobClient(blobName);
+
+    // Azure::Storage::StorageException exception;
+
+    BlockBlobClient blobClient = containerClient.GetBlockBlobClient(blobName);
+    std::vector<std::string> blockIds;
+
+    std::vector<uint8_t> buffer(blobContent.begin(), blobContent.end());
+    auto aresp = blobClient.UploadFrom(buffer.data(), buffer.size());
+
+    Azure::Storage::Metadata blobMetadata = {{"key1", "value1"}, {"key2", 
"value2"}};
+    blobClient.SetMetadata(blobMetadata);
+
+    auto properties = blobClient.GetProperties().Value;
+    for (auto metadata : properties.Metadata) {
+        std::cout << metadata.first << ":" << metadata.second << std::endl;
+    }
+    // We know blob size is small, so it's safe to cast here.
+    buffer.resize(static_cast<size_t>(properties.BlobSize));
+
+    blobClient.DownloadTo(buffer.data(), buffer.size());
+
+    std::cout << std::string(buffer.begin(), buffer.end()) << std::endl;
+}
+
+} // namespace doris
diff --git a/be/test/io/fs/s3_file_writer_test.cpp 
b/be/test/io/fs/s3_file_writer_test.cpp
index 75a49d813a4..4f3594dc5f0 100644
--- a/be/test/io/fs/s3_file_writer_test.cpp
+++ b/be/test/io/fs/s3_file_writer_test.cpp
@@ -263,7 +263,8 @@ static auto test_mock_callbacks = std::array {
                           pair->first = mock_client->head_object(req);
                       }},
         MockCallback {"s3_client_factory::create", [](auto&& outcome) {
-                          auto pair = 
try_any_cast_ret<std::shared_ptr<Aws::S3::S3Client>>(outcome);
+                          auto pair = 
try_any_cast_ret<std::shared_ptr<io::S3ObjStorageClient>>(
+                                  outcome);
                           pair->second = true;
                       }}};
 
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 99c8c2647f2..e1c3c9be5ab 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -172,12 +172,14 @@ message ObjectStoreInfoPB {
     // presigned url use
     // oss,aws,cos,obs,bos
     enum Provider {
-        OSS = 0;
-        S3 =  1;
-        COS = 2;
-        OBS = 3;
-        BOS = 4;
-        GCP = 5;
+        UNKONWN = -1;
+        OSS     = 0;
+        S3      = 1;
+        COS     = 2;
+        OBS     = 3;
+        BOS     = 4;
+        GCP     = 5;
+        AZURE   = 6;
     }
     optional int64 ctime = 1;
     optional int64 mtime = 2;
diff --git a/gensrc/thrift/AgentService.thrift 
b/gensrc/thrift/AgentService.thrift
index 104adca70fa..cc5dc367915 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -61,6 +61,16 @@ enum TTabletType {
     TABLET_TYPE_MEMORY = 1
 }
 
+enum TObjStorageType {
+    UNKNOWN = 0,
+    AWS = 1,
+    AZURE = 2,
+    BOS = 3,
+    COS = 4,
+    OBS = 5,
+    OSS = 6,
+    GCP = 7
+}
 
 struct TS3StorageParam {
     1: optional string endpoint
@@ -74,6 +84,7 @@ struct TS3StorageParam {
     9: optional string bucket
     10: optional bool use_path_style = false
     11: optional string token
+    12: optional TObjStorageType provider
 }
 
 struct TStoragePolicy {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to