This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ffb1704620 [feature](Cloud) Implement gcs accessor for compatibility 
(#34081)
8ffb1704620 is described below

commit 8ffb170462028abd40fe166b9df8687f032bb020
Author: AlexYue <yj976240...@gmail.com>
AuthorDate: Thu Apr 25 11:20:41 2024 +0800

    [feature](Cloud) Implement gcs accessor for compatibility (#34081)
---
 cloud/src/recycler/s3_accessor.cpp |  33 +++++++++++-
 cloud/src/recycler/s3_accessor.h   |   9 ++++
 cloud/test/s3_accessor_test.cpp    | 103 ++++++++++++++++++++++++++++++++++++-
 3 files changed, 142 insertions(+), 3 deletions(-)

diff --git a/cloud/src/recycler/s3_accessor.cpp 
b/cloud/src/recycler/s3_accessor.cpp
index 543f84f87fc..d1ebfe62a1d 100644
--- a/cloud/src/recycler/s3_accessor.cpp
+++ b/cloud/src/recycler/s3_accessor.cpp
@@ -30,6 +30,8 @@
 #include <aws/s3/model/ListObjectsV2Request.h>
 #include <aws/s3/model/PutObjectRequest.h>
 
+#include <algorithm>
+#include <execution>
 #include <utility>
 
 #include "common/logging.h"
@@ -226,7 +228,21 @@ int S3Accessor::delete_objects(const 
std::vector<std::string>& relative_paths) {
 }
 
 int S3Accessor::delete_object(const std::string& relative_path) {
-    // TODO(cyx)
+    Aws::S3::Model::DeleteObjectRequest request;
+    auto key = get_key(relative_path);
+    request.WithBucket(conf_.bucket).WithKey(key);
+    auto outcome = 
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->DeleteObject(request),
+                                                "s3_client::delete_object", 
request);
+    if (!outcome.IsSuccess()) {
+        LOG_WARNING("failed to delete object")
+                .tag("endpoint", conf_.endpoint)
+                .tag("bucket", conf_.bucket)
+                .tag("key", key)
+                .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
+                .tag("error", outcome.GetError().GetMessage())
+                .tag("exception", outcome.GetError().GetExceptionName());
+        return -1;
+    }
     return 0;
 }
 
@@ -422,5 +438,20 @@ int S3Accessor::check_bucket_versioning() {
     return 0;
 }
 
+int GcsAccessor::delete_objects(const std::vector<std::string>& 
relative_paths) {
+    std::vector<int> delete_rets(relative_paths.size());
+    std::transform(std::execution::par, relative_paths.begin(), 
relative_paths.end(),
+                   delete_rets.begin(),
+                   [this](const std::string& path) { return 
delete_object(path); });
+    int ret = 0;
+    for (int delete_ret : delete_rets) {
+        if (delete_ret != 0) {
+            ret = delete_ret;
+            break;
+        }
+    }
+    return ret;
+}
+
 #undef HELP_MACRO
 } // namespace doris::cloud
diff --git a/cloud/src/recycler/s3_accessor.h b/cloud/src/recycler/s3_accessor.h
index 10291cfd4ba..1025ceab52e 100644
--- a/cloud/src/recycler/s3_accessor.h
+++ b/cloud/src/recycler/s3_accessor.h
@@ -90,4 +90,13 @@ private:
     std::string path_;
 };
 
+class GcsAccessor final : public S3Accessor {
+public:
+    explicit GcsAccessor(S3Conf conf) : S3Accessor(std::move(conf)) {}
+    ~GcsAccessor() override = default;
+
+    // returns 0 for success otherwise error
+    int delete_objects(const std::vector<std::string>& relative_paths) 
override;
+};
+
 } // namespace doris::cloud
diff --git a/cloud/test/s3_accessor_test.cpp b/cloud/test/s3_accessor_test.cpp
index bb8b7c27bd9..972505c3999 100644
--- a/cloud/test/s3_accessor_test.cpp
+++ b/cloud/test/s3_accessor_test.cpp
@@ -58,6 +58,8 @@ public:
             const Aws::S3::Model::ListObjectsV2Request& req) = 0;
     virtual Aws::S3::Model::DeleteObjectsOutcome DeleteObjects(
             const Aws::S3::Model::DeleteObjectsRequest& req) = 0;
+    virtual Aws::S3::Model::DeleteObjectOutcome DeleteObject(
+            const Aws::S3::Model::DeleteObjectRequest& req) = 0;
     virtual Aws::S3::Model::PutObjectOutcome PutObject(
             const Aws::S3::Model::PutObjectRequest& req) = 0;
     virtual Aws::S3::Model::HeadObjectOutcome HeadObject(
@@ -122,6 +124,13 @@ public:
         return Aws::S3::Model::DeleteObjectsOutcome(std::move(result));
     }
 
+    Aws::S3::Model::DeleteObjectOutcome DeleteObject(
+            const Aws::S3::Model::DeleteObjectRequest& req) override {
+        Aws::S3::Model::DeleteObjectResult result;
+        _mock_fs->delete_object(req.GetKey());
+        return Aws::S3::Model::DeleteObjectOutcome(std::move(result));
+    }
+
     Aws::S3::Model::PutObjectOutcome PutObject(
             const Aws::S3::Model::PutObjectRequest& req) override {
         Aws::S3::Model::PutObjectResult result;
@@ -207,6 +216,18 @@ public:
         return Aws::S3::Model::DeleteObjectsOutcome(std::move(err));
     }
 
+    Aws::S3::Model::DeleteObjectOutcome DeleteObject(
+            const Aws::S3::Model::DeleteObjectRequest& req) override {
+        if (!return_error_for_error_s3_client) {
+            return _correct_impl->DeleteObject(req);
+        }
+        auto err = 
Aws::Client::AWSError<Aws::S3::S3Errors>(Aws::S3::S3Errors::RESOURCE_NOT_FOUND,
+                                                            false);
+        err.SetResponseCode(Aws::Http::HttpResponseCode::NOT_FOUND);
+        // return -1
+        return Aws::S3::Model::DeleteObjectOutcome(std::move(err));
+    }
+
     Aws::S3::Model::PutObjectOutcome PutObject(
             const Aws::S3::Model::PutObjectRequest& req) override {
         if (!return_error_for_error_s3_client) {
@@ -267,6 +288,10 @@ public:
         return _impl->DeleteObjects(req);
     }
 
+    auto DeleteObject(const Aws::S3::Model::DeleteObjectRequest& req) {
+        return _impl->DeleteObject(req);
+    }
+
     auto PutObject(const Aws::S3::Model::PutObjectRequest& req) { return 
_impl->PutObject(req); }
 
     auto HeadObject(const Aws::S3::Model::HeadObjectRequest& req) { return 
_impl->HeadObject(req); }
@@ -304,6 +329,12 @@ static auto callbacks = std::array {
                                                   
Aws::S3::Model::DeleteObjectsRequest*>*)p;
                           *pair.first = 
(*_mock_client).DeleteObjects(*pair.second);
                       }},
+        MockCallable {"s3_client::delete_object",
+                      [](void* p) {
+                          auto pair = 
*(std::pair<Aws::S3::Model::DeleteObjectOutcome*,
+                                                  
Aws::S3::Model::DeleteObjectRequest*>*)p;
+                          *pair.first = 
(*_mock_client).DeleteObject(*pair.second);
+                      }},
         MockCallable {"s3_client::put_object",
                       [](void* p) {
                           auto pair = 
*(std::pair<Aws::S3::Model::PutObjectOutcome*,
@@ -614,8 +645,7 @@ TEST(S3AccessorTest, exist_error) {
     ASSERT_EQ(-1, accessor->exist(prefix));
 }
 
-// function is not implemented
-TEST(S3AccessorTest, DISABLED_delete_object) {
+TEST(S3AccessorTest, delete_object) {
     _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
     _mock_client = std::make_unique<MockS3Client>();
     auto accessor = std::make_unique<S3Accessor>(S3Conf {});
@@ -641,6 +671,75 @@ TEST(S3AccessorTest, DISABLED_delete_object) {
     }
 }
 
+TEST(S3AccessorTest, gcs_delete_objects) {
+    _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
+    _mock_client = std::make_unique<MockS3Client>();
+    auto accessor = std::make_unique<GcsAccessor>(S3Conf {});
+    auto sp = SyncPoint::get_instance();
+    std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& 
mock_callback) {
+        sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
+                          [](void* p) { *((bool*)p) = true; });
+        sp->set_call_back(mock_callback.point_name, mock_callback.func);
+    });
+    sp->enable_processing();
+    std::unique_ptr<int, std::function<void(int*)>> 
defer_log_statistics((int*)0x01, [&](int*) {
+        sp->disable_processing();
+        std::for_each(callbacks.begin(), callbacks.end(), [&](const 
MockCallable& mock_callback) {
+            sp->clear_call_back(mock_callback.point_name);
+        });
+    });
+    std::string prefix = "test_delete_object";
+    std::vector<std::string> paths;
+    size_t num = 300;
+    for (size_t i = 0; i < num; i++) {
+        auto path = fmt::format("{}{}", prefix, i);
+        _mock_fs->put_object(path, "");
+        paths.emplace_back(std::move(path));
+    }
+    ASSERT_EQ(0, accessor->delete_objects(paths));
+    for (size_t i = 0; i < num; i++) {
+        auto path = fmt::format("{}{}", prefix, i);
+        ASSERT_EQ(1, accessor->exist(path));
+    }
+}
+
+TEST(S3AccessorTest, gcs_delete_objects_error) {
+    _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
+    _mock_client = 
std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>());
+    auto accessor = std::make_unique<GcsAccessor>(S3Conf {});
+    auto sp = SyncPoint::get_instance();
+    std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& 
mock_callback) {
+        sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
+                          [](void* p) { *((bool*)p) = true; });
+        sp->set_call_back(mock_callback.point_name, mock_callback.func);
+    });
+    sp->enable_processing();
+    std::unique_ptr<int, std::function<void(int*)>> 
defer_log_statistics((int*)0x01, [&](int*) {
+        sp->disable_processing();
+        std::for_each(callbacks.begin(), callbacks.end(), [&](const 
MockCallable& mock_callback) {
+            sp->clear_call_back(mock_callback.point_name);
+        });
+        return_error_for_error_s3_client = false;
+    });
+    std::string prefix = "test_delete_objects";
+    std::vector<std::string> paths_first_half;
+    std::vector<std::string> paths_second_half;
+    size_t num = 300;
+    for (size_t i = 0; i < num; i++) {
+        auto path = fmt::format("{}{}", prefix, i);
+        _mock_fs->put_object(path, "");
+        if (i < 150) {
+            paths_first_half.emplace_back(std::move(path));
+        } else {
+            paths_second_half.emplace_back(std::move(path));
+        }
+    }
+    std::vector<std::string> empty;
+    ASSERT_EQ(0, accessor->delete_objects(empty));
+    return_error_for_error_s3_client = true;
+    ASSERT_EQ(-1, accessor->delete_objects(paths_first_half));
+}
+
 TEST(S3AccessorTest, delete_objects) {
     _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
     _mock_client = std::make_unique<MockS3Client>();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to