This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 8ffb1704620 [feature](Cloud) Implement gcs accessor for compatibility (#34081) 8ffb1704620 is described below commit 8ffb170462028abd40fe166b9df8687f032bb020 Author: AlexYue <yj976240...@gmail.com> AuthorDate: Thu Apr 25 11:20:41 2024 +0800 [feature](Cloud) Implement gcs accessor for compatibility (#34081) --- cloud/src/recycler/s3_accessor.cpp | 33 +++++++++++- cloud/src/recycler/s3_accessor.h | 9 ++++ cloud/test/s3_accessor_test.cpp | 103 ++++++++++++++++++++++++++++++++++++- 3 files changed, 142 insertions(+), 3 deletions(-) diff --git a/cloud/src/recycler/s3_accessor.cpp b/cloud/src/recycler/s3_accessor.cpp index 543f84f87fc..d1ebfe62a1d 100644 --- a/cloud/src/recycler/s3_accessor.cpp +++ b/cloud/src/recycler/s3_accessor.cpp @@ -30,6 +30,8 @@ #include <aws/s3/model/ListObjectsV2Request.h> #include <aws/s3/model/PutObjectRequest.h> +#include <algorithm> +#include <execution> #include <utility> #include "common/logging.h" @@ -226,7 +228,21 @@ int S3Accessor::delete_objects(const std::vector<std::string>& relative_paths) { } int S3Accessor::delete_object(const std::string& relative_path) { - // TODO(cyx) + Aws::S3::Model::DeleteObjectRequest request; + auto key = get_key(relative_path); + request.WithBucket(conf_.bucket).WithKey(key); + auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->DeleteObject(request), + "s3_client::delete_object", request); + if (!outcome.IsSuccess()) { + LOG_WARNING("failed to delete object") + .tag("endpoint", conf_.endpoint) + .tag("bucket", conf_.bucket) + .tag("key", key) + .tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode())) + .tag("error", outcome.GetError().GetMessage()) + .tag("exception", outcome.GetError().GetExceptionName()); + return -1; + } return 0; } @@ -422,5 +438,20 @@ int S3Accessor::check_bucket_versioning() { return 0; } +int GcsAccessor::delete_objects(const std::vector<std::string>& relative_paths) { + std::vector<int> delete_rets(relative_paths.size()); + std::transform(std::execution::par, relative_paths.begin(), relative_paths.end(), + delete_rets.begin(), + [this](const std::string& path) { return delete_object(path); }); + int ret = 0; + for (int delete_ret : delete_rets) { + if (delete_ret != 0) { + ret = delete_ret; + break; + } + } + return ret; +} + #undef HELP_MACRO } // namespace doris::cloud diff --git a/cloud/src/recycler/s3_accessor.h b/cloud/src/recycler/s3_accessor.h index 10291cfd4ba..1025ceab52e 100644 --- a/cloud/src/recycler/s3_accessor.h +++ b/cloud/src/recycler/s3_accessor.h @@ -90,4 +90,13 @@ private: std::string path_; }; +class GcsAccessor final : public S3Accessor { +public: + explicit GcsAccessor(S3Conf conf) : S3Accessor(std::move(conf)) {} + ~GcsAccessor() override = default; + + // returns 0 for success otherwise error + int delete_objects(const std::vector<std::string>& relative_paths) override; +}; + } // namespace doris::cloud diff --git a/cloud/test/s3_accessor_test.cpp b/cloud/test/s3_accessor_test.cpp index bb8b7c27bd9..972505c3999 100644 --- a/cloud/test/s3_accessor_test.cpp +++ b/cloud/test/s3_accessor_test.cpp @@ -58,6 +58,8 @@ public: const Aws::S3::Model::ListObjectsV2Request& req) = 0; virtual Aws::S3::Model::DeleteObjectsOutcome DeleteObjects( const Aws::S3::Model::DeleteObjectsRequest& req) = 0; + virtual Aws::S3::Model::DeleteObjectOutcome DeleteObject( + const Aws::S3::Model::DeleteObjectRequest& req) = 0; virtual Aws::S3::Model::PutObjectOutcome PutObject( const Aws::S3::Model::PutObjectRequest& req) = 0; virtual Aws::S3::Model::HeadObjectOutcome HeadObject( @@ -122,6 +124,13 @@ public: return Aws::S3::Model::DeleteObjectsOutcome(std::move(result)); } + Aws::S3::Model::DeleteObjectOutcome DeleteObject( + const Aws::S3::Model::DeleteObjectRequest& req) override { + Aws::S3::Model::DeleteObjectResult result; + _mock_fs->delete_object(req.GetKey()); + return Aws::S3::Model::DeleteObjectOutcome(std::move(result)); + } + Aws::S3::Model::PutObjectOutcome PutObject( const Aws::S3::Model::PutObjectRequest& req) override { Aws::S3::Model::PutObjectResult result; @@ -207,6 +216,18 @@ public: return Aws::S3::Model::DeleteObjectsOutcome(std::move(err)); } + Aws::S3::Model::DeleteObjectOutcome DeleteObject( + const Aws::S3::Model::DeleteObjectRequest& req) override { + if (!return_error_for_error_s3_client) { + return _correct_impl->DeleteObject(req); + } + auto err = Aws::Client::AWSError<Aws::S3::S3Errors>(Aws::S3::S3Errors::RESOURCE_NOT_FOUND, + false); + err.SetResponseCode(Aws::Http::HttpResponseCode::NOT_FOUND); + // return -1 + return Aws::S3::Model::DeleteObjectOutcome(std::move(err)); + } + Aws::S3::Model::PutObjectOutcome PutObject( const Aws::S3::Model::PutObjectRequest& req) override { if (!return_error_for_error_s3_client) { @@ -267,6 +288,10 @@ public: return _impl->DeleteObjects(req); } + auto DeleteObject(const Aws::S3::Model::DeleteObjectRequest& req) { + return _impl->DeleteObject(req); + } + auto PutObject(const Aws::S3::Model::PutObjectRequest& req) { return _impl->PutObject(req); } auto HeadObject(const Aws::S3::Model::HeadObjectRequest& req) { return _impl->HeadObject(req); } @@ -304,6 +329,12 @@ static auto callbacks = std::array { Aws::S3::Model::DeleteObjectsRequest*>*)p; *pair.first = (*_mock_client).DeleteObjects(*pair.second); }}, + MockCallable {"s3_client::delete_object", + [](void* p) { + auto pair = *(std::pair<Aws::S3::Model::DeleteObjectOutcome*, + Aws::S3::Model::DeleteObjectRequest*>*)p; + *pair.first = (*_mock_client).DeleteObject(*pair.second); + }}, MockCallable {"s3_client::put_object", [](void* p) { auto pair = *(std::pair<Aws::S3::Model::PutObjectOutcome*, @@ -614,8 +645,7 @@ TEST(S3AccessorTest, exist_error) { ASSERT_EQ(-1, accessor->exist(prefix)); } -// function is not implemented -TEST(S3AccessorTest, DISABLED_delete_object) { +TEST(S3AccessorTest, delete_object) { _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {}); _mock_client = std::make_unique<MockS3Client>(); auto accessor = std::make_unique<S3Accessor>(S3Conf {}); @@ -641,6 +671,75 @@ TEST(S3AccessorTest, DISABLED_delete_object) { } } +TEST(S3AccessorTest, gcs_delete_objects) { + _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {}); + _mock_client = std::make_unique<MockS3Client>(); + auto accessor = std::make_unique<GcsAccessor>(S3Conf {}); + auto sp = SyncPoint::get_instance(); + std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) { + sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name), + [](void* p) { *((bool*)p) = true; }); + sp->set_call_back(mock_callback.point_name, mock_callback.func); + }); + sp->enable_processing(); + std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) { + sp->disable_processing(); + std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) { + sp->clear_call_back(mock_callback.point_name); + }); + }); + std::string prefix = "test_delete_object"; + std::vector<std::string> paths; + size_t num = 300; + for (size_t i = 0; i < num; i++) { + auto path = fmt::format("{}{}", prefix, i); + _mock_fs->put_object(path, ""); + paths.emplace_back(std::move(path)); + } + ASSERT_EQ(0, accessor->delete_objects(paths)); + for (size_t i = 0; i < num; i++) { + auto path = fmt::format("{}{}", prefix, i); + ASSERT_EQ(1, accessor->exist(path)); + } +} + +TEST(S3AccessorTest, gcs_delete_objects_error) { + _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {}); + _mock_client = std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>()); + auto accessor = std::make_unique<GcsAccessor>(S3Conf {}); + auto sp = SyncPoint::get_instance(); + std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) { + sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name), + [](void* p) { *((bool*)p) = true; }); + sp->set_call_back(mock_callback.point_name, mock_callback.func); + }); + sp->enable_processing(); + std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) { + sp->disable_processing(); + std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) { + sp->clear_call_back(mock_callback.point_name); + }); + return_error_for_error_s3_client = false; + }); + std::string prefix = "test_delete_objects"; + std::vector<std::string> paths_first_half; + std::vector<std::string> paths_second_half; + size_t num = 300; + for (size_t i = 0; i < num; i++) { + auto path = fmt::format("{}{}", prefix, i); + _mock_fs->put_object(path, ""); + if (i < 150) { + paths_first_half.emplace_back(std::move(path)); + } else { + paths_second_half.emplace_back(std::move(path)); + } + } + std::vector<std::string> empty; + ASSERT_EQ(0, accessor->delete_objects(empty)); + return_error_for_error_s3_client = true; + ASSERT_EQ(-1, accessor->delete_objects(paths_first_half)); +} + TEST(S3AccessorTest, delete_objects) { _mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {}); _mock_client = std::make_unique<MockS3Client>(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org