This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 1ab8dc1aede [feat](s3client) Add role-based authorization for s3client (#49541) (#50924) 1ab8dc1aede is described below commit 1ab8dc1aede9be2ea21e3a79b46eb549435542e2 Author: Lei Zhang <zhang...@selectdb.com> AuthorDate: Thu May 15 16:31:55 2025 +0800 [feat](s3client) Add role-based authorization for s3client (#49541) (#50924) --- be/cmake/thirdparty.cmake | 2 + be/src/io/fs/s3_file_system.cpp | 4 + be/src/util/s3_util.cpp | 133 ++++++++--- be/src/util/s3_util.h | 19 +- be/test/io/fs/s3_obj_storage_client_role_test.cpp | 185 +++++++++++++++ cloud/cmake/thirdparty.cmake | 3 + cloud/src/meta-service/meta_service_resource.cpp | 160 ++++++++----- cloud/src/recycler/s3_accessor.cpp | 63 +++-- cloud/src/recycler/s3_accessor.h | 9 + cloud/test/meta_service_http_test.cpp | 89 +++++++ cloud/test/meta_service_test.cpp | 264 +++++++++++++++++++++ cloud/test/s3_accessor_test.cpp | 102 +++++++- common/cpp/aws_common.cpp | 40 ++++ common/cpp/aws_common.h | 27 +++ common/cpp/aws_logger.h | 2 + .../java/org/apache/doris/common/util/S3Util.java | 98 +++++++- .../datasource/property/S3ClientBEProperties.java | 8 + .../property/constants/S3Properties.java | 35 +++ .../java/org/apache/doris/fs/obj/S3ObjStorage.java | 12 +- .../doris/tablefunction/S3TableValuedFunction.java | 8 + .../org/apache/doris/catalog/S3ResourceTest.java | 31 +++ gensrc/proto/cloud.proto | 11 + gensrc/thrift/AgentService.thrift | 11 + .../org/apache/doris/regression/Config.groovy | 34 +++ .../apache/doris/regression/suite/Syncer.groovy | 23 ++ .../test_backup_restore_with_role.groovy | 97 ++++++++ .../aws_iam_role_p0/test_export_with_role.groovy | 117 +++++++++ .../test_external_catalog_with_role.groovy | 86 +++++++ .../aws_iam_role_p0/test_resource_with_role.groovy | 144 +++++++++++ .../aws_iam_role_p0/test_s3_load_with_role.groovy | 146 ++++++++++++ .../aws_iam_role_p0/test_s3_vault_with_role.groovy | 73 ++++++ .../test_select_into_outfile_with_role.groovy | 70 ++++++ .../aws_iam_role_p0/test_tvf_with_role.groovy | 49 ++++ 33 files changed, 2025 insertions(+), 130 deletions(-) diff --git a/be/cmake/thirdparty.cmake b/be/cmake/thirdparty.cmake index 764719ae00c..81e59539dbd 100644 --- a/be/cmake/thirdparty.cmake +++ b/be/cmake/thirdparty.cmake @@ -137,6 +137,8 @@ add_thirdparty(aws-c-mqtt LIB64) add_thirdparty(aws-checksums LIB64) add_thirdparty(aws-c-s3 LIB64) add_thirdparty(aws-c-sdkutils LIB64) +add_thirdparty(aws-cpp-sdk-identity-management LIB64) +add_thirdparty(aws-cpp-sdk-sts LIB64) if (NOT OS_MACOSX) add_thirdparty(aws-s2n LIBNAME "lib/libs2n.a") endif() diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp index bc6aefd92cc..ec1c63c9106 100644 --- a/be/src/io/fs/s3_file_system.cpp +++ b/be/src/io/fs/s3_file_system.cpp @@ -97,6 +97,10 @@ Status ObjClientHolder::reset(const S3ClientConf& conf) { reset_conf.token = conf.token; reset_conf.bucket = conf.bucket; reset_conf.use_virtual_addressing = conf.use_virtual_addressing; + + reset_conf.role_arn = conf.role_arn; + reset_conf.external_id = conf.external_id; + reset_conf.cred_provider_type = conf.cred_provider_type; // Should check endpoint here? } diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp index 0edce85e192..39887625942 100644 --- a/be/src/util/s3_util.cpp +++ b/be/src/util/s3_util.cpp @@ -24,7 +24,9 @@ #include <aws/core/utils/logging/LogLevel.h> #include <aws/core/utils/logging/LogSystemInterface.h> #include <aws/core/utils/memory/stl/AWSStringStream.h> +#include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h> #include <aws/s3/S3Client.h> +#include <aws/sts/STSClient.h> #include <bvar/reducer.h> #include <util/string_util.h> @@ -77,11 +79,14 @@ doris::Status is_s3_conf_valid(const S3ClientConf& conf) { if (conf.region.empty()) { return Status::InvalidArgument<false>("Invalid s3 conf, empty region"); } - if (conf.ak.empty()) { - return Status::InvalidArgument<false>("Invalid s3 conf, empty ak"); - } - if (conf.sk.empty()) { - return Status::InvalidArgument<false>("Invalid s3 conf, empty sk"); + + if (conf.role_arn.empty()) { + if (conf.ak.empty()) { + return Status::InvalidArgument<false>("Invalid s3 conf, empty ak"); + } + if (conf.sk.empty()) { + return Status::InvalidArgument<false>("Invalid s3 conf, empty sk"); + } } return Status::OK(); } @@ -106,6 +111,8 @@ constexpr char S3_REQUEST_TIMEOUT_MS[] = "AWS_REQUEST_TIMEOUT_MS"; constexpr char S3_CONN_TIMEOUT_MS[] = "AWS_CONNECTION_TIMEOUT_MS"; constexpr char S3_NEED_OVERRIDE_ENDPOINT[] = "AWS_NEED_OVERRIDE_ENDPOINT"; +constexpr char S3_ROLE_ARN[] = "AWS_ROLE_ARN"; +constexpr char S3_EXTERNAL_ID[] = "AWS_EXTERNAL_ID"; } // namespace bvar::Adder<int64_t> get_rate_limit_ns("get_rate_limit_ns"); @@ -217,6 +224,36 @@ std::shared_ptr<io::ObjStorageClient> S3ClientFactory::_create_azure_client( #endif } +std::shared_ptr<Aws::Auth::AWSCredentialsProvider> S3ClientFactory::get_aws_credentials_provider( + const S3ClientConf& s3_conf) { + if (!s3_conf.ak.empty() && !s3_conf.sk.empty()) { + Aws::Auth::AWSCredentials aws_cred(s3_conf.ak, s3_conf.sk); + DCHECK(!aws_cred.IsExpiredOrEmpty()); + if (!s3_conf.token.empty()) { + aws_cred.SetSessionToken(s3_conf.token); + } + return std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(std::move(aws_cred)); + } + + if (s3_conf.cred_provider_type == CredProviderType::InstanceProfile) { + if (s3_conf.role_arn.empty()) { + return std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>(); + } + + Aws::Client::ClientConfiguration clientConfiguration = + S3ClientFactory::getClientConfiguration(); + + auto stsClient = std::make_shared<Aws::STS::STSClient>( + std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>(), + clientConfiguration); + + return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>( + s3_conf.role_arn, Aws::String(), s3_conf.external_id, + Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, stsClient); + } + return std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>(); +} + std::shared_ptr<io::ObjStorageClient> S3ClientFactory::_create_s3_client( const S3ClientConf& s3_conf) { TEST_SYNC_POINT_RETURN_WITH_VALUE( @@ -265,25 +302,11 @@ std::shared_ptr<io::ObjStorageClient> S3ClientFactory::_create_s3_client( aws_config.retryStrategy = std::make_shared<S3CustomRetryStrategy>( config::max_s3_client_retry /*scaleFactor = 25*/); - std::shared_ptr<Aws::S3::S3Client> new_client; - if (!s3_conf.ak.empty() && !s3_conf.sk.empty()) { - Aws::Auth::AWSCredentials aws_cred(s3_conf.ak, s3_conf.sk); - DCHECK(!aws_cred.IsExpiredOrEmpty()); - if (!s3_conf.token.empty()) { - aws_cred.SetSessionToken(s3_conf.token); - } - new_client = std::make_shared<Aws::S3::S3Client>( - std::move(aws_cred), std::move(aws_config), - Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, - s3_conf.use_virtual_addressing); - } else { - std::shared_ptr<Aws::Auth::AWSCredentialsProvider> aws_provider_chain = - std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>(); - new_client = std::make_shared<Aws::S3::S3Client>( - std::move(aws_provider_chain), std::move(aws_config), - Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, - s3_conf.use_virtual_addressing); - } + + std::shared_ptr<Aws::S3::S3Client> new_client = std::make_shared<Aws::S3::S3Client>( + get_aws_credentials_provider(s3_conf), std::move(aws_config), + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, + s3_conf.use_virtual_addressing); auto obj_client = std::make_shared<io::S3ObjStorageClient>(std::move(new_client)); LOG_INFO("create one s3 client with {}", s3_conf.to_string()); @@ -350,28 +373,62 @@ Status S3ClientFactory::convert_properties_to_s3_conf( s3_conf->client_conf.use_virtual_addressing = it->second != "true"; } + if (auto it = properties.find(S3_ROLE_ARN); it != properties.end()) { + s3_conf->client_conf.cred_provider_type = CredProviderType::InstanceProfile; + s3_conf->client_conf.role_arn = it->second; + } + + if (auto it = properties.find(S3_EXTERNAL_ID); it != properties.end()) { + s3_conf->client_conf.external_id = it->second; + } + if (auto st = is_s3_conf_valid(s3_conf->client_conf); !st.ok()) { return st; } return Status::OK(); } +static CredProviderType cred_provider_type_from_thrift(TCredProviderType::type cred_provider_type) { + switch (cred_provider_type) { + case TCredProviderType::DEFAULT: + return CredProviderType::Default; + case TCredProviderType::SIMPLE: + return CredProviderType::Simple; + case TCredProviderType::INSTANCE_PROFILE: + return CredProviderType::InstanceProfile; + default: + __builtin_unreachable(); + LOG(WARNING) << "Invalid TCredProviderType value: " << cred_provider_type + << ", use default instead."; + return CredProviderType::Default; + } +} + S3Conf S3Conf::get_s3_conf(const cloud::ObjectStoreInfoPB& info) { S3Conf ret { .bucket = info.bucket(), .prefix = info.prefix(), - .client_conf {.endpoint = info.endpoint(), - .region = info.region(), - .ak = info.ak(), - .sk = info.sk(), - .token {}, - .bucket = info.bucket(), - .provider = io::ObjStorageType::AWS, - .use_virtual_addressing = - info.has_use_path_style() ? !info.use_path_style() : true}, + .client_conf { + .endpoint = info.endpoint(), + .region = info.region(), + .ak = info.ak(), + .sk = info.sk(), + .token {}, + .bucket = info.bucket(), + .provider = io::ObjStorageType::AWS, + .use_virtual_addressing = + info.has_use_path_style() ? !info.use_path_style() : true, + + .role_arn = info.role_arn(), + .external_id = info.external_id(), + }, .sse_enabled = info.sse_enabled(), }; + if (info.has_cred_provider_type()) { + ret.client_conf.cred_provider_type = cred_provider_type_from_pb(info.cred_provider_type()); + } + io::ObjStorageType type = io::ObjStorageType::AWS; switch (info.provider()) { case cloud::ObjectStoreInfoPB_Provider_OSS: @@ -396,8 +453,8 @@ S3Conf S3Conf::get_s3_conf(const cloud::ObjectStoreInfoPB& info) { type = io::ObjStorageType::AZURE; break; default: - LOG_FATAL("unknown provider type {}, info {}", info.provider(), ret.to_string()); __builtin_unreachable(); + LOG_FATAL("unknown provider type {}, info {}", info.provider(), ret.to_string()); } ret.client_conf.provider = type; return ret; @@ -421,7 +478,15 @@ S3Conf S3Conf::get_s3_conf(const TS3StorageParam& param) { // When using cold heat separation in minio, user might use ip address directly, // which needs enable use_virtual_addressing to true .use_virtual_addressing = !param.use_path_style, + .role_arn = param.role_arn, + .external_id = param.external_id, }}; + + if (param.__isset.cred_provider_type) { + ret.client_conf.cred_provider_type = + cred_provider_type_from_thrift(param.cred_provider_type); + } + io::ObjStorageType type = io::ObjStorageType::AWS; switch (param.provider) { case TObjStorageType::UNKNOWN: diff --git a/be/src/util/s3_util.h b/be/src/util/s3_util.h index 793d0b8a956..c45c6afa6ef 100644 --- a/be/src/util/s3_util.h +++ b/be/src/util/s3_util.h @@ -32,6 +32,7 @@ #include <unordered_map> #include "common/status.h" +#include "cpp/aws_common.h" #include "cpp/s3_rate_limiter.h" #include "io/fs/obj_storage_client.h" #include "vec/common/string_ref.h" @@ -61,7 +62,6 @@ extern bvar::LatencyRecorder s3_copy_object_latency; }; // namespace s3_bvar class S3URI; - struct S3ClientConf { std::string endpoint; std::string region; @@ -78,6 +78,10 @@ struct S3ClientConf { // For aws s3, no need to override endpoint bool need_override_endpoint = true; + CredProviderType cred_provider_type = CredProviderType::Default; + std::string role_arn; + std::string external_id; + uint64_t get_hash() const { uint64_t hash_code = 0; hash_code ^= crc32_hash(ak); @@ -91,15 +95,21 @@ struct S3ClientConf { hash_code ^= connect_timeout_ms; hash_code ^= use_virtual_addressing; hash_code ^= static_cast<int>(provider); + + hash_code ^= static_cast<int>(cred_provider_type); + hash_code ^= crc32_hash(role_arn); + hash_code ^= crc32_hash(external_id); return hash_code; } std::string to_string() const { return fmt::format( "(ak={}, token={}, endpoint={}, region={}, bucket={}, max_connections={}, " - "request_timeout_ms={}, connect_timeout_ms={}, use_virtual_addressing={}", + "request_timeout_ms={}, connect_timeout_ms={}, use_virtual_addressing={}, " + "cred_provider_type={},role_arn={}, external_id={}", ak, token, endpoint, region, bucket, max_connections, request_timeout_ms, - connect_timeout_ms, use_virtual_addressing); + connect_timeout_ms, use_virtual_addressing, cred_provider_type, role_arn, + external_id); } }; @@ -144,6 +154,9 @@ public: private: std::shared_ptr<io::ObjStorageClient> _create_s3_client(const S3ClientConf& s3_conf); std::shared_ptr<io::ObjStorageClient> _create_azure_client(const S3ClientConf& s3_conf); + std::shared_ptr<Aws::Auth::AWSCredentialsProvider> get_aws_credentials_provider( + const S3ClientConf& s3_conf); + S3ClientFactory(); static std::string get_valid_ca_cert_path(); diff --git a/be/test/io/fs/s3_obj_storage_client_role_test.cpp b/be/test/io/fs/s3_obj_storage_client_role_test.cpp new file mode 100644 index 00000000000..5760e6e4df3 --- /dev/null +++ b/be/test/io/fs/s3_obj_storage_client_role_test.cpp @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> + +#include "io/fs/obj_storage_client.h" +#include "util/s3_util.h" + +namespace doris { + +class S3ObjStorageClientRoleTest : public testing::Test { +protected: + static std::shared_ptr<io::ObjStorageClient> obj_storage_client; + static std::string bucket; + static std::string prefix; + + static void SetUpTestSuite() { + if (!std::getenv("AWS_ROLE_ARN") || !std::getenv("AWS_EXTERNAL_ID") || + !std::getenv("AWS_ENDPOINT") || !std::getenv("AWS_REGION")) { + return; + } + + std::string role_arn = std::getenv("AWS_ROLE_ARN"); + std::string external_id = std::getenv("AWS_EXTERNAL_ID"); + std::string endpoint = std::getenv("AWS_ENDPOINT"); + std::string region = std::getenv("AWS_REGION"); + + S3ObjStorageClientRoleTest::bucket = std::getenv("AWS_BUCKET"); + + S3ObjStorageClientRoleTest::bucket = std::getenv("AWS_BUCKET"); + if (!std::getenv("AWS_PREFIX")) { + S3ObjStorageClientRoleTest::prefix = ""; + } else { + S3ObjStorageClientRoleTest::prefix = std::getenv("AWS_PREFIX"); + } + + S3ObjStorageClientRoleTest::obj_storage_client = S3ClientFactory::instance().create( + {.endpoint = endpoint, + .region = region, + .bucket = bucket, + .provider = io::ObjStorageType::AWS, + .use_virtual_addressing = false, + .cred_provider_type = CredProviderType::InstanceProfile, + .role_arn = role_arn, + .external_id = external_id}); + + ASSERT_TRUE(S3ObjStorageClientRoleTest::obj_storage_client != nullptr); + } + + void SetUp() override { + if (S3ObjStorageClientRoleTest::obj_storage_client == nullptr) { + GTEST_SKIP() << "Skipping S3 test, because AWS environment not set"; + } + } +}; + +std::shared_ptr<io::ObjStorageClient> S3ObjStorageClientRoleTest::obj_storage_client = nullptr; +std::string S3ObjStorageClientRoleTest::bucket; +std::string S3ObjStorageClientRoleTest::prefix; + +TEST_F(S3ObjStorageClientRoleTest, put_list_delete_object) { + LOG(INFO) << "S3ObjStorageClientRoleTest::put_list_delete_object, prefix:" << prefix; + + auto response = S3ObjStorageClientRoleTest::obj_storage_client->put_object( + {.bucket = bucket, .key = prefix + "S3ObjStorageClientRoleTest/put_list_delete_object"}, + std::string("aaaa")); + EXPECT_EQ(response.status.code, ErrorCode::OK); + + std::vector<io::FileInfo> files; + // clang-format off + response = S3ObjStorageClientRoleTest::obj_storage_client->list_objects({.bucket = bucket, + .prefix = prefix + "S3ObjStorageClientRoleTest/put_list_delete_object",}, &files); + // clang-format on + EXPECT_EQ(response.status.code, ErrorCode::OK); + EXPECT_EQ(files.size(), 1); + files.clear(); + + response = S3ObjStorageClientRoleTest::obj_storage_client->delete_object( + {.bucket = bucket, + .key = prefix + "S3ObjStorageClientRoleTest/put_list_delete_object"}); + EXPECT_EQ(response.status.code, ErrorCode::OK); + + // clang-format off + response = S3ObjStorageClientRoleTest::obj_storage_client->list_objects({.bucket = bucket, + .prefix = prefix + "S3ObjStorageClientRoleTest/put_list_delete_object",}, &files); + // clang-format on + EXPECT_EQ(response.status.code, ErrorCode::OK); + EXPECT_EQ(files.size(), 0); +} + +TEST_F(S3ObjStorageClientRoleTest, delete_objects_recursively) { + LOG(INFO) << "S3ObjStorageClientRoleTest::delete_objects_recursively"; + + for (int i = 0; i < 22; i++) { + std::string key = prefix + "S3ObjStorageClientRoleTest/delete_objects_recursively" + + std::to_string(i); + + auto response = S3ObjStorageClientRoleTest::obj_storage_client->put_object( + {.bucket = bucket, .key = key}, std::string("aaaa")); + EXPECT_EQ(response.status.code, ErrorCode::OK); + LOG(INFO) << "put " << key << " OK"; + } + + std::vector<io::FileInfo> files; + // clang-format off + auto response = S3ObjStorageClientRoleTest::obj_storage_client->list_objects({.bucket = bucket, + .prefix = prefix + "S3ObjStorageClientRoleTest/delete_objects_recursively",}, &files); + // clang-format on + EXPECT_EQ(response.status.code, ErrorCode::OK); + EXPECT_EQ(files.size(), 22); + files.clear(); + + response = S3ObjStorageClientRoleTest::obj_storage_client->delete_objects_recursively( + {.bucket = bucket, + .prefix = prefix + "S3ObjStorageClientRoleTest/delete_objects_recursively"}); + EXPECT_EQ(response.status.code, ErrorCode::OK); + + // clang-format off + response = S3ObjStorageClientRoleTest::obj_storage_client->list_objects({.bucket = bucket, + .prefix = prefix + "S3ObjStorageClientRoleTest/delete_objects_recursively",}, &files); + // clang-format on + EXPECT_EQ(response.status.code, ErrorCode::OK); + EXPECT_EQ(files.size(), 0); +} + +TEST_F(S3ObjStorageClientRoleTest, multipart_upload) { + LOG(INFO) << "S3ObjStorageClientRoleTest::multipart_upload"; + + auto response = S3ObjStorageClientRoleTest::obj_storage_client->create_multipart_upload( + {.bucket = bucket, .key = prefix + "S3ObjStorageClientRoleTest/multipart_upload"}); + EXPECT_EQ(response.resp.status.code, ErrorCode::OK); + auto upload_id = response.upload_id; + + std::string body = "S3ObjStorageClientRoleTest::multipart_upload"; + body.resize(5 * 1024 * 1024); + + std::vector<doris::io::ObjectCompleteMultiPart> completed_parts; + + response = S3ObjStorageClientRoleTest::obj_storage_client->upload_part( + {.bucket = bucket, + .key = prefix + "S3ObjStorageClientRoleTest/multipart_upload", + .upload_id = upload_id}, + body, 1); + + EXPECT_EQ(response.resp.status.code, ErrorCode::OK); + doris::io::ObjectCompleteMultiPart completed_part { + 1, response.etag.has_value() ? std::move(response.etag.value()) : ""}; + + completed_parts.emplace_back(std::move(completed_part)); + + response = S3ObjStorageClientRoleTest::obj_storage_client->upload_part( + {.bucket = bucket, + .key = prefix + "S3ObjStorageClientRoleTest/multipart_upload", + .upload_id = upload_id}, + body, 2); + + EXPECT_EQ(response.resp.status.code, ErrorCode::OK); + doris::io::ObjectCompleteMultiPart completed_part2 { + 2, response.etag.has_value() ? std::move(response.etag.value()) : ""}; + completed_parts.emplace_back(std::move(completed_part2)); + + auto response2 = S3ObjStorageClientRoleTest::obj_storage_client->complete_multipart_upload( + {.bucket = bucket, + .key = prefix + "S3ObjStorageClientRoleTest/multipart_upload", + .upload_id = upload_id}, + completed_parts); + + EXPECT_EQ(response2.status.code, ErrorCode::OK); +} + +} // namespace doris \ No newline at end of file diff --git a/cloud/cmake/thirdparty.cmake b/cloud/cmake/thirdparty.cmake index 80e70c87eca..6b1a614b395 100644 --- a/cloud/cmake/thirdparty.cmake +++ b/cloud/cmake/thirdparty.cmake @@ -89,6 +89,9 @@ add_thirdparty(aws-checksums LIB64) add_thirdparty(aws-c-s3 LIB64) add_thirdparty(aws-c-sdkutils LIB64) add_thirdparty(aws-s2n LIBNAME "lib/libs2n.a") +add_thirdparty(aws-cpp-sdk-identity-management LIB64) +add_thirdparty(aws-cpp-sdk-sts LIB64) + # end aws libs add_thirdparty(jsoncpp LIB64) add_thirdparty(uuid LIB64) diff --git a/cloud/src/meta-service/meta_service_resource.cpp b/cloud/src/meta-service/meta_service_resource.cpp index 6b1c1c63c55..96a00a48e97 100644 --- a/cloud/src/meta-service/meta_service_resource.cpp +++ b/cloud/src/meta-service/meta_service_resource.cpp @@ -421,25 +421,37 @@ static void create_object_info_with_encrypt(const InstanceInfoPB& instance, Obje std::string external_endpoint = obj->has_external_endpoint() ? obj->external_endpoint() : ""; std::string region = obj->has_region() ? obj->region() : ""; - // ATTN: prefix may be empty - if (plain_ak.empty() || plain_sk.empty() || bucket.empty() || endpoint.empty() || - region.empty() || !obj->has_provider() || external_endpoint.empty()) { - code = MetaServiceCode::INVALID_ARGUMENT; - msg = "s3 conf info err, please check it"; - return; - } - EncryptionInfoPB encryption_info; - AkSkPair cipher_ak_sk_pair; - auto ret = encrypt_ak_sk_helper(plain_ak, plain_sk, &encryption_info, &cipher_ak_sk_pair, code, - msg); - TEST_SYNC_POINT_CALLBACK("create_object_info_with_encrypt", &ret, &code, &msg); - if (ret != 0) { - return; + if (obj->has_role_arn()) { + if (obj->role_arn().empty() || !obj->has_cred_provider_type() || + obj->cred_provider_type() != CredProviderTypePB::INSTANCE_PROFILE || + !obj->has_provider() || obj->provider() != ObjectStoreInfoPB::S3 || bucket.empty() || + endpoint.empty() || region.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "s3 conf info err with role_arn, please check it"; + return; + } + } else { + // ATTN: prefix may be empty + if (plain_ak.empty() || plain_sk.empty() || bucket.empty() || endpoint.empty() || + region.empty() || !obj->has_provider() || external_endpoint.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "s3 conf info err, please check it"; + return; + } + + EncryptionInfoPB encryption_info; + AkSkPair cipher_ak_sk_pair; + auto ret = encrypt_ak_sk_helper(plain_ak, plain_sk, &encryption_info, &cipher_ak_sk_pair, + code, msg); + TEST_SYNC_POINT_CALLBACK("create_object_info_with_encrypt", &ret, &code, &msg); + if (ret != 0) { + return; + } + obj->set_ak(std::move(cipher_ak_sk_pair.first)); + obj->set_sk(std::move(cipher_ak_sk_pair.second)); + obj->mutable_encryption_info()->CopyFrom(encryption_info); } - obj->set_ak(std::move(cipher_ak_sk_pair.first)); - obj->set_sk(std::move(cipher_ak_sk_pair.second)); - obj->mutable_encryption_info()->CopyFrom(encryption_info); obj->set_bucket(bucket); obj->set_prefix(prefix); obj->set_endpoint(endpoint); @@ -781,6 +793,9 @@ struct ObjectStorageDesc { std::string& external_endpoint; std::string& region; bool& use_path_style; + + std::string& role_arn; + std::string& external_id; }; static int extract_object_storage_info(const AlterObjStoreInfoRequest* request, @@ -793,39 +808,54 @@ static int extract_object_storage_info(const AlterObjStoreInfoRequest* request, msg = "s3 obj info err " + proto_to_json(*request); return -1; } - auto& [ak, sk, bucket, prefix, endpoint, external_endpoint, region, use_path_style] = obj_desc; + const auto& obj = request->has_obj() ? request->obj() : request->vault().obj_info(); - // Prepare data - if (!obj.has_ak() || !obj.has_sk()) { + + // obj size > 1k, refuse + if (obj.ByteSizeLong() > 1024) { code = MetaServiceCode::INVALID_ARGUMENT; - msg = "s3 obj info err " + proto_to_json(*request); + msg = "s3 obj info greater than 1k " + proto_to_json(*request); return -1; - } + }; - std::string plain_ak = obj.has_ak() ? obj.ak() : ""; - std::string plain_sk = obj.has_sk() ? obj.sk() : ""; + auto& [ak, sk, bucket, prefix, endpoint, external_endpoint, region, use_path_style, role_arn, + external_id] = obj_desc; - auto ret = encrypt_ak_sk_helper(plain_ak, plain_sk, &encryption_info, &cipher_ak_sk_pair, code, - msg); - if (ret != 0) { - return -1; + if (!obj.has_role_arn()) { + if (!obj.has_ak() || !obj.has_sk()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "s3 obj info err " + proto_to_json(*request); + return -1; + } + + std::string plain_ak = obj.has_ak() ? obj.ak() : ""; + std::string plain_sk = obj.has_sk() ? obj.sk() : ""; + auto ret = encrypt_ak_sk_helper(plain_ak, plain_sk, &encryption_info, &cipher_ak_sk_pair, + code, msg); + if (ret != 0) { + return -1; + } + + ak = cipher_ak_sk_pair.first; + sk = cipher_ak_sk_pair.second; + } else { + if (!obj.has_cred_provider_type() || + obj.cred_provider_type() != CredProviderTypePB::INSTANCE_PROFILE || + !obj.has_provider() || obj.provider() != ObjectStoreInfoPB::S3) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "s3 conf info err with role_arn, please check it"; + return -1; + } + role_arn = obj.has_role_arn() ? obj.role_arn() : ""; + external_id = obj.has_external_id() ? obj.external_id() : ""; } TEST_SYNC_POINT_CALLBACK("extract_object_storage_info:get_aksk_pair", &cipher_ak_sk_pair); - - ak = cipher_ak_sk_pair.first; - sk = cipher_ak_sk_pair.second; bucket = obj.has_bucket() ? obj.bucket() : ""; prefix = obj.has_prefix() ? obj.prefix() : ""; endpoint = obj.has_endpoint() ? obj.endpoint() : ""; external_endpoint = obj.has_external_endpoint() ? obj.external_endpoint() : ""; region = obj.has_region() ? obj.region() : ""; use_path_style = obj.use_path_style(); - // obj size > 1k, refuse - if (obj.ByteSizeLong() > 1024) { - code = MetaServiceCode::INVALID_ARGUMENT; - msg = "s3 obj info greater than 1k " + proto_to_json(*request); - return -1; - }; return 0; } @@ -835,7 +865,8 @@ static ObjectStoreInfoPB object_info_pb_factory(ObjectStorageDesc& obj_desc, EncryptionInfoPB& encryption_info, AkSkPair& cipher_ak_sk_pair) { ObjectStoreInfoPB last_item; - auto& [ak, sk, bucket, prefix, endpoint, external_endpoint, region, use_path_style] = obj_desc; + auto& [ak, sk, bucket, prefix, endpoint, external_endpoint, region, use_path_style, role_arn, + external_id] = obj_desc; auto now_time = std::chrono::system_clock::now(); uint64_t time = std::chrono::duration_cast<std::chrono::seconds>(now_time.time_since_epoch()).count(); @@ -845,9 +876,16 @@ static ObjectStoreInfoPB object_info_pb_factory(ObjectStorageDesc& obj_desc, if (obj.has_user_id()) { last_item.set_user_id(obj.user_id()); } - last_item.set_ak(std::move(cipher_ak_sk_pair.first)); - last_item.set_sk(std::move(cipher_ak_sk_pair.second)); - last_item.mutable_encryption_info()->CopyFrom(encryption_info); + + if (!obj.has_role_arn()) { + last_item.set_ak(std::move(cipher_ak_sk_pair.first)); + last_item.set_sk(std::move(cipher_ak_sk_pair.second)); + last_item.mutable_encryption_info()->CopyFrom(encryption_info); + } else { + last_item.set_role_arn(role_arn); + last_item.set_external_id(external_id); + last_item.set_cred_provider_type(CredProviderTypePB::INSTANCE_PROFILE); + } last_item.set_bucket(bucket); // format prefix, such as `/aa/bb/`, `aa/bb//`, `//aa/bb`, ` /aa/bb` -> `aa/bb` trim(prefix); @@ -858,6 +896,7 @@ static ObjectStoreInfoPB object_info_pb_factory(ObjectStorageDesc& obj_desc, last_item.set_provider(obj.provider()); last_item.set_sse_enabled(instance.sse_enabled()); last_item.set_use_path_style(use_path_style); + return last_item; } @@ -865,7 +904,7 @@ void MetaServiceImpl::alter_storage_vault(google::protobuf::RpcController* contr const AlterObjStoreInfoRequest* request, AlterObjStoreInfoResponse* response, ::google::protobuf::Closure* done) { - std::string ak, sk, bucket, prefix, endpoint, external_endpoint, region; + std::string ak, sk, bucket, prefix, endpoint, external_endpoint, region, role_arn, external_id; bool use_path_style; EncryptionInfoPB encryption_info; AkSkPair cipher_ak_sk_pair; @@ -873,8 +912,11 @@ void MetaServiceImpl::alter_storage_vault(google::protobuf::RpcController* contr switch (request->op()) { case AlterObjStoreInfoRequest::ADD_S3_VAULT: case AlterObjStoreInfoRequest::DROP_S3_VAULT: { - auto tmp_desc = ObjectStorageDesc { - ak, sk, bucket, prefix, endpoint, external_endpoint, region, use_path_style}; + auto tmp_desc = ObjectStorageDesc {ak, sk, + bucket, prefix, + endpoint, external_endpoint, + region, use_path_style, + role_arn, external_id}; if (0 != extract_object_storage_info(request, code, msg, tmp_desc, encryption_info, cipher_ak_sk_pair)) { return; @@ -995,7 +1037,8 @@ void MetaServiceImpl::alter_storage_vault(google::protobuf::RpcController* contr return; } // ATTN: prefix may be empty - if (ak.empty() || sk.empty() || bucket.empty() || endpoint.empty() || region.empty()) { + if (((ak.empty() || sk.empty()) && role_arn.empty()) || bucket.empty() || + endpoint.empty() || region.empty()) { code = MetaServiceCode::INVALID_ARGUMENT; msg = "s3 conf info err, please check it"; return; @@ -1013,8 +1056,11 @@ void MetaServiceImpl::alter_storage_vault(google::protobuf::RpcController* contr } } // calc id - auto tmp_tuple = ObjectStorageDesc { - ak, sk, bucket, prefix, endpoint, external_endpoint, region, use_path_style}; + auto tmp_tuple = ObjectStorageDesc {ak, sk, + bucket, prefix, + endpoint, external_endpoint, + region, use_path_style, + role_arn, external_id}; ObjectStoreInfoPB last_item = object_info_pb_factory(tmp_tuple, obj, instance, encryption_info, cipher_ak_sk_pair); if (instance.storage_vault_names().end() != @@ -1156,7 +1202,7 @@ void MetaServiceImpl::alter_obj_store_info(google::protobuf::RpcController* cont const AlterObjStoreInfoRequest* request, AlterObjStoreInfoResponse* response, ::google::protobuf::Closure* done) { - std::string ak, sk, bucket, prefix, endpoint, external_endpoint, region; + std::string ak, sk, bucket, prefix, endpoint, external_endpoint, region, role_arn, external_id; bool use_path_style; EncryptionInfoPB encryption_info; AkSkPair cipher_ak_sk_pair; @@ -1165,8 +1211,11 @@ void MetaServiceImpl::alter_obj_store_info(google::protobuf::RpcController* cont case AlterObjStoreInfoRequest::ADD_OBJ_INFO: case AlterObjStoreInfoRequest::LEGACY_UPDATE_AK_SK: case AlterObjStoreInfoRequest::UPDATE_AK_SK: { - auto tmp_desc = ObjectStorageDesc { - ak, sk, bucket, prefix, endpoint, external_endpoint, region, use_path_style}; + auto tmp_desc = ObjectStorageDesc {ak, sk, + bucket, prefix, + endpoint, external_endpoint, + region, use_path_style, + role_arn, external_id}; if (0 != extract_object_storage_info(request, code, msg, tmp_desc, encryption_info, cipher_ak_sk_pair)) { return; @@ -1287,8 +1336,8 @@ void MetaServiceImpl::alter_obj_store_info(google::protobuf::RpcController* cont return; } // ATTN: prefix may be empty - if (ak.empty() || sk.empty() || bucket.empty() || endpoint.empty() || region.empty() || - prefix.empty()) { + if (((ak.empty() || sk.empty()) && role_arn.empty()) || bucket.empty() || + endpoint.empty() || region.empty() || prefix.empty()) { code = MetaServiceCode::INVALID_ARGUMENT; msg = "s3 conf info err, please check it"; return; @@ -1306,8 +1355,11 @@ void MetaServiceImpl::alter_obj_store_info(google::protobuf::RpcController* cont } } // calc id - auto tmp_tuple = ObjectStorageDesc { - ak, sk, bucket, prefix, endpoint, external_endpoint, region, use_path_style}; + auto tmp_tuple = ObjectStorageDesc {ak, sk, + bucket, prefix, + endpoint, external_endpoint, + region, use_path_style, + role_arn, external_id}; ObjectStoreInfoPB last_item = object_info_pb_factory(tmp_tuple, obj, instance, encryption_info, cipher_ak_sk_pair); instance.add_obj_info()->CopyFrom(last_item); diff --git a/cloud/src/recycler/s3_accessor.cpp b/cloud/src/recycler/s3_accessor.cpp index 9d365cffa04..63844665e3a 100644 --- a/cloud/src/recycler/s3_accessor.cpp +++ b/cloud/src/recycler/s3_accessor.cpp @@ -17,10 +17,13 @@ #include "recycler/s3_accessor.h" -#include <aws/core/Aws.h> +#include <aws/core/auth/AWSAuthSigner.h> #include <aws/core/auth/AWSCredentials.h> +#include <aws/core/auth/AWSCredentialsProviderChain.h> #include <aws/core/client/DefaultRetryStrategy.h> +#include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h> #include <aws/s3/S3Client.h> +#include <aws/sts/STSClient.h> #include <bvar/reducer.h> #include <gen_cpp/cloud.pb.h> @@ -181,20 +184,28 @@ std::optional<S3Conf> S3Conf::from_obj_store_info(const ObjectStoreInfoPB& obj_i } if (!skip_aksk) { - if (obj_info.has_encryption_info()) { - AkSkPair plain_ak_sk_pair; - int ret = decrypt_ak_sk_helper(obj_info.ak(), obj_info.sk(), obj_info.encryption_info(), - &plain_ak_sk_pair); - if (ret != 0) { - LOG_WARNING("fail to decrypt ak sk").tag("obj_info", proto_to_json(obj_info)); - return std::nullopt; + if (!obj_info.ak().empty() && !obj_info.sk().empty()) { + if (obj_info.has_encryption_info()) { + AkSkPair plain_ak_sk_pair; + int ret = decrypt_ak_sk_helper(obj_info.ak(), obj_info.sk(), + obj_info.encryption_info(), &plain_ak_sk_pair); + if (ret != 0) { + LOG_WARNING("fail to decrypt ak sk").tag("obj_info", proto_to_json(obj_info)); + return std::nullopt; + } else { + s3_conf.ak = std::move(plain_ak_sk_pair.first); + s3_conf.sk = std::move(plain_ak_sk_pair.second); + } } else { - s3_conf.ak = std::move(plain_ak_sk_pair.first); - s3_conf.sk = std::move(plain_ak_sk_pair.second); + s3_conf.ak = obj_info.ak(); + s3_conf.sk = obj_info.sk(); } - } else { - s3_conf.ak = obj_info.ak(); - s3_conf.sk = obj_info.sk(); + } + + if (obj_info.has_role_arn() && !obj_info.role_arn().empty()) { + s3_conf.role_arn = obj_info.role_arn(); + s3_conf.external_id = obj_info.external_id(); + s3_conf.cred_provider_type = CredProviderType::InstanceProfile; } } @@ -236,6 +247,29 @@ int S3Accessor::create(S3Conf conf, std::shared_ptr<S3Accessor>* accessor) { static std::shared_ptr<SimpleThreadPool> worker_pool; +std::shared_ptr<Aws::Auth::AWSCredentialsProvider> S3Accessor::get_aws_credentials_provider( + const S3Conf& s3_conf) { + if (!s3_conf.ak.empty() && !s3_conf.sk.empty()) { + Aws::Auth::AWSCredentials aws_cred(s3_conf.ak, s3_conf.sk); + DCHECK(!aws_cred.IsExpiredOrEmpty()); + return std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(std::move(aws_cred)); + } + + if (s3_conf.cred_provider_type == CredProviderType::InstanceProfile) { + if (s3_conf.role_arn.empty()) { + return std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>(); + } + + auto stsClient = std::make_shared<Aws::STS::STSClient>( + std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>()); + + return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>( + s3_conf.role_arn, Aws::String(), s3_conf.external_id, + Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, stsClient); + } + return std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>(); +} + int S3Accessor::init() { static std::once_flag log_annotated_tags_key_once; std::call_once(log_annotated_tags_key_once, [&]() { @@ -287,7 +321,6 @@ int S3Accessor::init() { static S3Environment s3_env; // S3Conf::S3 - Aws::Auth::AWSCredentials aws_cred(conf_.ak, conf_.sk); Aws::Client::ClientConfiguration aws_config; aws_config.endpointOverride = conf_.endpoint; aws_config.region = conf_.region; @@ -302,7 +335,7 @@ int S3Accessor::init() { aws_config.retryStrategy = std::make_shared<S3CustomRetryStrategy>( config::max_s3_client_retry /*scaleFactor = 25*/); auto s3_client = std::make_shared<Aws::S3::S3Client>( - std::move(aws_cred), std::move(aws_config), + get_aws_credentials_provider(conf_), std::move(aws_config), Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, conf_.use_virtual_addressing /* useVirtualAddressing */); obj_client_ = std::make_shared<S3ObjClient>(std::move(s3_client), conf_.endpoint); diff --git a/cloud/src/recycler/s3_accessor.h b/cloud/src/recycler/s3_accessor.h index e9640b5693a..faa8392373c 100644 --- a/cloud/src/recycler/s3_accessor.h +++ b/cloud/src/recycler/s3_accessor.h @@ -17,12 +17,14 @@ #pragma once +#include <aws/core/Aws.h> #include <bvar/latency_recorder.h> #include <array> #include <cstdint> #include <memory> +#include "cpp/aws_common.h" #include "recycler/obj_storage_client.h" #include "recycler/storage_vault_accessor.h" @@ -71,6 +73,10 @@ struct S3Conf { std::string prefix; bool use_virtual_addressing {true}; + CredProviderType cred_provider_type = CredProviderType::Default; + std::string role_arn; + std::string external_id; + enum Provider : uint8_t { S3, GCS, @@ -125,6 +131,9 @@ protected: virtual int delete_prefix_impl(const std::string& path_prefix, int64_t expiration_time = 0); + std::shared_ptr<Aws::Auth::AWSCredentialsProvider> get_aws_credentials_provider( + const S3Conf& s3_conf); + std::string get_key(const std::string& relative_path) const; std::string to_uri(const std::string& relative_path) const; diff --git a/cloud/test/meta_service_http_test.cpp b/cloud/test/meta_service_http_test.cpp index e51e6fa819b..e9ff2956307 100644 --- a/cloud/test/meta_service_http_test.cpp +++ b/cloud/test/meta_service_http_test.cpp @@ -1834,4 +1834,93 @@ TEST(HttpEncodeKeyTest, ProcessHttpSetValue) { EXPECT_EQ(updated_rowset_meta.data_disk_size(), 2048); } +TEST(MetaServiceHttpTest, CreateInstanceWithIamRoleTest) { + HttpContext ctx; + + brpc::Controller cntl; + std::string instance_id = "iam_role_test_instance_id"; + + { + ObjectStoreInfoPB obj; + obj.set_endpoint("s3.us-east-1.amazonaws.com"); + obj.set_region("us-east-1"); + obj.set_prefix("/test-prefix"); + obj.set_provider(ObjectStoreInfoPB::S3); + + // create instance without ram user + CreateInstanceRequest create_instance_req; + create_instance_req.set_instance_id(instance_id); + create_instance_req.set_user_id("test_user"); + create_instance_req.set_name("test_name"); + create_instance_req.mutable_obj_info()->CopyFrom(obj); + CreateInstanceResponse create_instance_res; + ctx.meta_service_->create_instance( + reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &create_instance_req, + &create_instance_res, nullptr); + LOG(INFO) << create_instance_res.DebugString(); + ASSERT_EQ(create_instance_res.status().code(), MetaServiceCode::INVALID_ARGUMENT); + } + + { + ObjectStoreInfoPB obj; + obj.set_endpoint("s3.us-east-1.amazonaws.com"); + obj.set_region("us-east-1"); + obj.set_prefix("/test-prefix"); + obj.set_provider(ObjectStoreInfoPB::S3); + + // create instance without ram user + CreateInstanceRequest create_instance_req; + create_instance_req.set_instance_id(instance_id); + create_instance_req.set_user_id("test_user"); + create_instance_req.set_name("test_name"); + create_instance_req.mutable_obj_info()->CopyFrom(obj); + CreateInstanceResponse create_instance_res; + ctx.meta_service_->create_instance( + reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &create_instance_req, + &create_instance_res, nullptr); + LOG(INFO) << create_instance_res.DebugString(); + ASSERT_EQ(create_instance_res.status().code(), MetaServiceCode::INVALID_ARGUMENT); + } + + { + ObjectStoreInfoPB obj; + obj.set_endpoint("s3.us-east-1.amazonaws.com"); + obj.set_region("us-east-1"); + obj.set_bucket("test-bucket"); + obj.set_prefix("test-prefix"); + obj.set_provider(ObjectStoreInfoPB::S3); + obj.set_role_arn("arn:aws:iam::123456789012:role/test-role"); + obj.set_external_id("test-external-id"); + obj.set_cred_provider_type(CredProviderTypePB::INSTANCE_PROFILE); + + CreateInstanceRequest create_instance_req; + create_instance_req.set_instance_id(instance_id); + create_instance_req.set_user_id("test_user"); + create_instance_req.set_name("test_name"); + create_instance_req.mutable_obj_info()->CopyFrom(obj); + CreateInstanceResponse create_instance_res; + ctx.meta_service_->create_instance( + reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &create_instance_req, + &create_instance_res, nullptr); + LOG(INFO) << create_instance_res.DebugString(); + ASSERT_EQ(create_instance_res.status().code(), MetaServiceCode::OK); + + InstanceInfoPB instance = ctx.get_instance_info(instance_id); + LOG(INFO) << instance.DebugString(); + + ASSERT_EQ(instance.obj_info().Get(0).endpoint(), "s3.us-east-1.amazonaws.com"); + ASSERT_EQ(instance.obj_info().Get(0).region(), "us-east-1"); + ASSERT_EQ(instance.obj_info().Get(0).bucket(), "test-bucket"); + ASSERT_EQ(instance.obj_info().Get(0).prefix(), "test-prefix"); + ASSERT_EQ(instance.obj_info().Get(0).provider(), ObjectStoreInfoPB::S3); + ASSERT_EQ(instance.obj_info().Get(0).role_arn(), + "arn:aws:iam::123456789012:role/test-role"); + ASSERT_EQ(instance.obj_info().Get(0).external_id(), "test-external-id"); + ASSERT_EQ(instance.obj_info().Get(0).cred_provider_type(), + CredProviderTypePB::INSTANCE_PROFILE); + ASSERT_EQ(instance.obj_info().Get(0).has_ak(), false); + ASSERT_EQ(instance.obj_info().Get(0).has_sk(), false); + } +} + } // namespace doris::cloud diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index 0a1be69e1eb..0e8e543c494 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -8666,4 +8666,268 @@ TEST(MetaServiceTest, UpdateTmpRowsetTest) { } } +TEST(MetaServiceTest, CreateS3VaultWithIamRole) { + auto sp = SyncPoint::get_instance(); + sp->enable_processing(); + sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) { + auto* ret = try_any_cast<int*>(args[0]); + *ret = 0; + auto* key = try_any_cast<std::string*>(args[1]); + *key = "selectdbselectdbselectdbselectdb"; + auto* key_id = try_any_cast<int64_t*>(args[2]); + *key_id = 1; + }); + std::pair<std::string, std::string> pair; + sp->set_call_back("extract_object_storage_info:get_aksk_pair", [&](auto&& args) { + auto* ret = try_any_cast<std::pair<std::string, std::string>*>(args[0]); + pair = *ret; + }); + + auto meta_service = get_meta_service(); + + std::unique_ptr<Transaction> txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + std::string key; + std::string val; + InstanceKeyInfo key_info {"test_instance"}; + instance_key(key_info, &key); + + ObjectStoreInfoPB obj_info; + obj_info.set_id("1"); + obj_info.set_ak("ak"); + obj_info.set_sk("sk"); + StorageVaultPB vault; + constexpr char vault_name[] = "test_alter_s3_vault"; + vault.mutable_obj_info()->MergeFrom(obj_info); + vault.set_name(vault_name); + vault.set_id("2"); + InstanceInfoPB instance; + instance.add_storage_vault_names(vault.name()); + instance.add_resource_ids(vault.id()); + instance.set_instance_id("GetObjStoreInfoTestInstance"); + instance.set_enable_storage_vault(true); + val = instance.SerializeAsString(); + txn->put(key, val); + txn->put(storage_vault_key({instance.instance_id(), "2"}), vault.SerializeAsString()); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + txn = nullptr; + + auto get_test_instance = [&](InstanceInfoPB& i) { + std::string key; + std::string val; + std::unique_ptr<Transaction> txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + InstanceKeyInfo key_info {"test_instance"}; + instance_key(key_info, &key); + ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK); + i.ParseFromString(val); + }; + + { + AlterObjStoreInfoRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_op(AlterObjStoreInfoRequest::ADD_S3_VAULT); + StorageVaultPB vault; + vault.mutable_obj_info()->set_endpoint("s3.us-east-1.amazonaws.com"); + vault.mutable_obj_info()->set_region("us-east-1"); + vault.mutable_obj_info()->set_bucket("test_bucket"); + vault.mutable_obj_info()->set_prefix("test_prefix"); + vault.mutable_obj_info()->set_ak("new_ak"); + vault.mutable_obj_info()->set_sk("new_sk"); + vault.mutable_obj_info()->set_provider( + ObjectStoreInfoPB::Provider::ObjectStoreInfoPB_Provider_S3); + + vault.set_name("ak_sk_s3_vault"); + req.mutable_vault()->CopyFrom(vault); + + brpc::Controller cntl; + AlterObjStoreInfoResponse res; + meta_service->alter_storage_vault( + reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + + { + InstanceInfoPB instance; + get_test_instance(instance); + std::unique_ptr<Transaction> txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + std::string val; + ASSERT_EQ(txn->get(storage_vault_key({instance.instance_id(), "2"}), &val), + TxnErrorCode::TXN_OK); + StorageVaultPB get_obj; + get_obj.ParseFromString(val); + ASSERT_EQ(get_obj.obj_info().ak(), "ak") << get_obj.obj_info().ak(); + } + + { + InstanceInfoPB instance; + get_test_instance(instance); + std::unique_ptr<Transaction> txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + std::string val; + ASSERT_EQ(txn->get(storage_vault_key({instance.instance_id(), "3"}), &val), + TxnErrorCode::TXN_OK); + StorageVaultPB get_obj; + get_obj.ParseFromString(val); + ASSERT_EQ(get_obj.obj_info().ak(), "new_ak") << get_obj.obj_info().ak(); + ASSERT_NE(get_obj.obj_info().sk(), "new_sk") << get_obj.obj_info().sk(); + } + } + + { + AlterObjStoreInfoRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_op(AlterObjStoreInfoRequest::ADD_S3_VAULT); + StorageVaultPB vault; + vault.mutable_obj_info()->set_endpoint("s3.us-east-1.amazonaws.com"); + vault.mutable_obj_info()->set_region("us-east-1"); + vault.mutable_obj_info()->set_bucket("test_bucket"); + vault.mutable_obj_info()->set_prefix("test_prefix"); + vault.mutable_obj_info()->set_role_arn("arn:aws:iam::123456789012:role/test-role"); + vault.mutable_obj_info()->set_external_id("external_id"); + vault.mutable_obj_info()->set_provider( + ObjectStoreInfoPB::Provider::ObjectStoreInfoPB_Provider_S3); + vault.mutable_obj_info()->set_cred_provider_type(CredProviderTypePB::INSTANCE_PROFILE); + + vault.set_name("ak_sk_s3_vault_with_role"); + req.mutable_vault()->CopyFrom(vault); + + brpc::Controller cntl; + AlterObjStoreInfoResponse res; + meta_service->alter_storage_vault( + reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + + { + InstanceInfoPB instance; + get_test_instance(instance); + std::unique_ptr<Transaction> txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + std::string val; + ASSERT_EQ(txn->get(storage_vault_key({instance.instance_id(), "4"}), &val), + TxnErrorCode::TXN_OK); + StorageVaultPB get_obj; + get_obj.ParseFromString(val); + ASSERT_EQ(get_obj.obj_info().ak().empty(), true) << get_obj.obj_info().ak(); + ASSERT_EQ(get_obj.obj_info().sk().empty(), true) << get_obj.obj_info().sk(); + + ASSERT_EQ(get_obj.obj_info().role_arn(), "arn:aws:iam::123456789012:role/test-role") + << get_obj.obj_info().role_arn(); + ASSERT_EQ(get_obj.obj_info().external_id(), "external_id") + << get_obj.obj_info().external_id(); + + ASSERT_EQ(get_obj.obj_info().endpoint(), "s3.us-east-1.amazonaws.com") + << get_obj.obj_info().endpoint(); + ASSERT_EQ(get_obj.obj_info().region(), "us-east-1") << get_obj.obj_info().region(); + ASSERT_EQ(get_obj.obj_info().bucket(), "test_bucket") << get_obj.obj_info().bucket(); + ASSERT_EQ(get_obj.obj_info().prefix(), "test_prefix") << get_obj.obj_info().prefix(); + ASSERT_EQ(get_obj.obj_info().provider(), + ObjectStoreInfoPB::Provider::ObjectStoreInfoPB_Provider_S3) + << get_obj.obj_info().provider(); + ASSERT_EQ(get_obj.name(), "ak_sk_s3_vault_with_role") << get_obj.name(); + ASSERT_EQ(get_obj.id(), "4") << get_obj.id(); + ASSERT_EQ(get_obj.obj_info().cred_provider_type(), CredProviderTypePB::INSTANCE_PROFILE) + << get_obj.obj_info().cred_provider_type(); + } + } + + LOG(INFO) << "instance:" << instance.ShortDebugString(); + SyncPoint::get_instance()->disable_processing(); + SyncPoint::get_instance()->clear_all_call_backs(); +} + +TEST(MetaServiceTest, AddObjInfoWithRole) { + auto meta_service = get_meta_service(); + + auto sp = SyncPoint::get_instance(); + sp->enable_processing(); + + sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) { + auto* ret = try_any_cast<int*>(args[0]); + *ret = 0; + auto* key = try_any_cast<std::string*>(args[1]); + *key = "selectdbselectdbselectdbselectdb"; + auto* key_id = try_any_cast<int64_t*>(args[2]); + *key_id = 1; + }); + + std::unique_ptr<Transaction> txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + std::string key; + std::string val; + InstanceKeyInfo key_info {"test_instance"}; + instance_key(key_info, &key); + + InstanceInfoPB instance; + val = instance.SerializeAsString(); + txn->put(key, val); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + + auto get_test_instance = [&](InstanceInfoPB& i) { + std::string key; + std::string val; + std::unique_ptr<Transaction> txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + InstanceKeyInfo key_info {"test_instance"}; + instance_key(key_info, &key); + ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK); + i.ParseFromString(val); + }; + + { + AlterObjStoreInfoRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_op(AlterObjStoreInfoRequest::ADD_OBJ_INFO); + auto sp = SyncPoint::get_instance(); + sp->set_call_back("create_object_info_with_encrypt", [](auto&& args) { + auto* ret = try_any_cast<int*>(args[0]); + *ret = 0; + }); + sp->enable_processing(); + + ObjectStoreInfoPB obj_info; + obj_info.set_endpoint("s3.us-east-1.amazonaws.com"); + obj_info.set_region("us-east-1"); + obj_info.set_bucket("test_bucket"); + obj_info.set_prefix("test_prefix"); + obj_info.set_role_arn("arn:aws:iam::123456789012:role/test-role"); + obj_info.set_external_id("external_id"); + obj_info.set_provider(ObjectStoreInfoPB::Provider::ObjectStoreInfoPB_Provider_S3); + obj_info.set_cred_provider_type(CredProviderTypePB::INSTANCE_PROFILE); + + req.mutable_obj()->MergeFrom(obj_info); + + brpc::Controller cntl; + AlterObjStoreInfoResponse res; + meta_service->alter_obj_store_info( + reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); + + InstanceInfoPB instance; + get_test_instance(instance); + const auto& obj = instance.obj_info().at(0); + ASSERT_EQ(obj.id(), "1"); + ASSERT_EQ(obj.ak().empty(), true) << obj.ak(); + ASSERT_EQ(obj.sk().empty(), true) << obj.sk(); + + ASSERT_EQ(obj.role_arn(), "arn:aws:iam::123456789012:role/test-role") << obj.role_arn(); + ASSERT_EQ(obj.external_id(), "external_id") << obj.external_id(); + + ASSERT_EQ(obj.endpoint(), "s3.us-east-1.amazonaws.com") << obj.endpoint(); + ASSERT_EQ(obj.region(), "us-east-1") << obj.region(); + ASSERT_EQ(obj.bucket(), "test_bucket") << obj.bucket(); + ASSERT_EQ(obj.prefix(), "test_prefix") << obj.prefix(); + ASSERT_EQ(obj.provider(), ObjectStoreInfoPB::Provider::ObjectStoreInfoPB_Provider_S3) + << obj.provider(); + ASSERT_EQ(obj.cred_provider_type(), CredProviderTypePB::INSTANCE_PROFILE) + << obj.cred_provider_type(); + + sp->clear_all_call_backs(); + sp->clear_trace(); + sp->disable_processing(); + } + + SyncPoint::get_instance()->disable_processing(); + SyncPoint::get_instance()->clear_all_call_backs(); +} } // namespace doris::cloud diff --git a/cloud/test/s3_accessor_test.cpp b/cloud/test/s3_accessor_test.cpp index c19f5f6a1df..f95fb2dba18 100644 --- a/cloud/test/s3_accessor_test.cpp +++ b/cloud/test/s3_accessor_test.cpp @@ -30,6 +30,7 @@ #include "common/config.h" #include "common/configbase.h" #include "common/logging.h" +#include "cpp/aws_common.h" #include "cpp/sync_point.h" using namespace doris; @@ -41,15 +42,11 @@ int main(int argc, char** argv) { return -1; } - if (cloud::config::test_s3_ak.empty()) { - std::cout << "empty test_s3_ak, skip S3AccessorTest" << std::endl; - return 0; - } - if (!cloud::init_glog("s3_accessor_test")) { std::cerr << "failed to init glog" << std::endl; return -1; } + doris::cloud::config::aws_log_level = 5; ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } @@ -57,6 +54,14 @@ int main(int argc, char** argv) { namespace doris::cloud { namespace { +class S3AccessorTest : public testing::Test { + void SetUp() override { + if (cloud::config::test_s3_ak.empty()) { + GTEST_SKIP() << "empty test_s3_ak, skip S3AccessorTest"; + } + } +}; + void test_s3_accessor(S3Accessor& accessor) { std::string file1 = "data/10000/1_0.dat"; @@ -196,7 +201,7 @@ void test_s3_accessor(S3Accessor& accessor) { } // namespace -TEST(S3AccessorTest, s3) { +TEST_F(S3AccessorTest, s3) { std::shared_ptr<S3Accessor> accessor; int ret = S3Accessor::create( S3Conf { @@ -238,7 +243,7 @@ TEST(S3AccessorTest, s3) { test_s3_accessor(*accessor); } -TEST(S3AccessorTest, azure) { +TEST_F(S3AccessorTest, azure) { std::shared_ptr<S3Accessor> accessor; int ret = S3Accessor::create( S3Conf { @@ -280,7 +285,7 @@ TEST(S3AccessorTest, azure) { test_s3_accessor(*accessor); } -TEST(S3AccessorTest, gcs) { +TEST_F(S3AccessorTest, gcs) { std::shared_ptr<S3Accessor> accessor; int ret = S3Accessor::create( S3Conf { @@ -322,7 +327,7 @@ TEST(S3AccessorTest, gcs) { test_s3_accessor(*accessor); } -TEST(S3AccessorTest, path_style_test) { +TEST_F(S3AccessorTest, path_style_test) { ObjectStoreInfoPB obj_info; obj_info.set_prefix("doris-debug-instance-prefix"); obj_info.set_provider(ObjectStoreInfoPB_Provider_S3); @@ -388,4 +393,83 @@ TEST(S3AccessorTest, path_style_test) { } } +class S3AccessorRoleTest : public testing::Test { + static void SetUpTestSuite() { + if (!std::getenv("AWS_ROLE_ARN") || !std::getenv("AWS_EXTERNAL_ID") || + !std::getenv("AWS_ENDPOINT") || !std::getenv("AWS_REGION") || + !std::getenv("AWS_BUCKET") || !std::getenv("AWS_PREFIX")) { + return; + } + + role_arn = std::getenv("AWS_ROLE_ARN"); + external_id = std::getenv("AWS_EXTERNAL_ID"); + endpoint = std::getenv("AWS_ENDPOINT"); + region = std::getenv("AWS_REGION"); + bucket = std::getenv("AWS_BUCKET"); + prefix = std::getenv("AWS_PREFIX"); + } + + void SetUp() override { + if (role_arn.empty() || external_id.empty() || endpoint.empty() || region.empty() || + bucket.empty() || prefix.empty()) { + GTEST_SKIP() << "Skipping S3 test, because AWS environment not set"; + } + } + +public: + static std::string endpoint; + static std::string region; + static std::string bucket; + static std::string prefix; + static std::string role_arn; + static std::string external_id; +}; + +std::string S3AccessorRoleTest::endpoint; +std::string S3AccessorRoleTest::region; +std::string S3AccessorRoleTest::bucket; +std::string S3AccessorRoleTest::prefix; +std::string S3AccessorRoleTest::role_arn; +std::string S3AccessorRoleTest::external_id; + +TEST_F(S3AccessorRoleTest, s3) { + std::shared_ptr<S3Accessor> accessor; + int ret = S3Accessor::create( + S3Conf {.endpoint = endpoint, + .region = region, + .bucket = bucket, + .prefix = prefix + "/S3AccessorRoleTest/" + butil::GenerateGUID(), + .cred_provider_type = CredProviderType::InstanceProfile, + .role_arn = role_arn, + .external_id = external_id, + .provider = S3Conf::S3}, + &accessor); + ASSERT_EQ(ret, 0); + + auto* sp = SyncPoint::get_instance(); + std::vector<SyncPoint::CallbackGuard> guards; + sp->set_call_back( + "S3ObjListIterator", + [](auto&& args) { + auto* req = try_any_cast<Aws::S3::Model::ListObjectsV2Request*>(args[0]); + req->SetMaxKeys(7); + }, + &guards.emplace_back()); + sp->set_call_back( + "S3ObjClient::delete_objects", + [](auto&& args) { + auto* delete_batch_size = try_any_cast<size_t*>(args[0]); + *delete_batch_size = 7; + }, + &guards.emplace_back()); + sp->set_call_back( + "ObjStorageClient::delete_objects_recursively_", + [](auto&& args) { + auto* delete_batch_size = try_any_cast<size_t*>(args); + *delete_batch_size = 7; + }, + &guards.emplace_back()); + + test_s3_accessor(*accessor); +} } // namespace doris::cloud diff --git a/common/cpp/aws_common.cpp b/common/cpp/aws_common.cpp new file mode 100644 index 00000000000..15a34f7c11a --- /dev/null +++ b/common/cpp/aws_common.cpp @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "aws_common.h" + +#include <glog/logging.h> + +namespace doris { + +CredProviderType cred_provider_type_from_pb(cloud::CredProviderTypePB cred_provider_type) { + switch (cred_provider_type) { + case cloud::CredProviderTypePB::DEFAULT: + return CredProviderType::Default; + case cloud::CredProviderTypePB::SIMPLE: + return CredProviderType::Simple; + case cloud::CredProviderTypePB::INSTANCE_PROFILE: + return CredProviderType::InstanceProfile; + default: + __builtin_unreachable(); + LOG(WARNING) << "Invalid CredProviderTypePB value: " << cred_provider_type + << ", use default instead."; + return CredProviderType::Default; + } +} + +} \ No newline at end of file diff --git a/common/cpp/aws_common.h b/common/cpp/aws_common.h new file mode 100644 index 00000000000..895ba7a6736 --- /dev/null +++ b/common/cpp/aws_common.h @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <gen_cpp/cloud.pb.h> + +namespace doris { + //AWS Credentials Provider Type + enum class CredProviderType { Default = 0, Simple = 1, InstanceProfile = 2 }; + + CredProviderType cred_provider_type_from_pb(cloud::CredProviderTypePB cred_provider_type); +} \ No newline at end of file diff --git a/common/cpp/aws_logger.h b/common/cpp/aws_logger.h index 8fb74c437db..ca607cab056 100644 --- a/common/cpp/aws_logger.h +++ b/common/cpp/aws_logger.h @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#pragma once + #include <aws/core/utils/logging/LogLevel.h> #include <aws/core/utils/logging/LogSystemInterface.h> #include <glog/logging.h> // IWYU pragma: export diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java index 9e5e258a3ea..d0764385201 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java @@ -19,6 +19,7 @@ package org.apache.doris.common.util; import org.apache.doris.common.credentials.CloudCredential; +import com.google.common.base.Strings; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -39,32 +40,114 @@ import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3Configuration; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; import java.net.URI; import java.time.Duration; public class S3Util { - - public static S3Client buildS3Client(URI endpoint, String region, CloudCredential credential, - boolean isUsePathStyle) { - AwsCredentialsProvider scp; + private static AwsCredentialsProvider getAwsCredencialsProvider(CloudCredential credential) { AwsCredentials awsCredential; + AwsCredentialsProvider awsCredentialsProvider; if (!credential.isTemporary()) { awsCredential = AwsBasicCredentials.create(credential.getAccessKey(), credential.getSecretKey()); } else { awsCredential = AwsSessionCredentials.create(credential.getAccessKey(), credential.getSecretKey(), credential.getSessionToken()); } + if (!credential.isWhole()) { - scp = AwsCredentialsProviderChain.of( + awsCredentialsProvider = AwsCredentialsProviderChain.of( SystemPropertyCredentialsProvider.create(), EnvironmentVariableCredentialsProvider.create(), WebIdentityTokenFileCredentialsProvider.create(), ProfileCredentialsProvider.create(), InstanceProfileCredentialsProvider.create()); } else { - scp = StaticCredentialsProvider.create(awsCredential); + awsCredentialsProvider = StaticCredentialsProvider.create(awsCredential); + } + + return awsCredentialsProvider; + } + + @Deprecated + public static S3Client buildS3Client(URI endpoint, String region, CloudCredential credential, + boolean isUsePathStyle) { + EqualJitterBackoffStrategy backoffStrategy = EqualJitterBackoffStrategy + .builder() + .baseDelay(Duration.ofSeconds(1)) + .maxBackoffTime(Duration.ofMinutes(1)) + .build(); + // retry 3 time with Equal backoff + RetryPolicy retryPolicy = RetryPolicy + .builder() + .numRetries(3) + .backoffStrategy(backoffStrategy) + .build(); + ClientOverrideConfiguration clientConf = ClientOverrideConfiguration + .builder() + // set retry policy + .retryPolicy(retryPolicy) + // using AwsS3V4Signer + .putAdvancedOption(SdkAdvancedClientOption.SIGNER, AwsS3V4Signer.create()) + .build(); + return S3Client.builder() + .httpClient(UrlConnectionHttpClient.create()) + .endpointOverride(endpoint) + .credentialsProvider(getAwsCredencialsProvider(credential)) + .region(Region.of(region)) + .overrideConfiguration(clientConf) + // disable chunkedEncoding because of bos not supported + .serviceConfiguration(S3Configuration.builder() + .chunkedEncodingEnabled(false) + .pathStyleAccessEnabled(isUsePathStyle) + .build()) + .build(); + } + + /** + * Using (accessKey, secretKey) or roleArn for creating different credentials provider when creating s3client + * @param endpoint AWS endpoint (eg: "https://s3.us-east-1.amazonaws.com") + * @param region AWS region identifier (eg: "us-east-1") + * @param accessKey AWS access key ID + * @param secretKey AWS secret access key, paired with accessKey + * @param sessionToken AWS temporary session token for short-term credentials + * @param roleArn AWS iam role arn to assume (format: "arn:aws:iam::123456789012:role/role-name") + * @param externalId AWS External ID for cross-account role assumption security + * @return + */ + private static AwsCredentialsProvider getAwsCredencialsProvider(URI endpoint, String region, String accessKey, + String secretKey, String sessionToken, String roleArn, String externalId) { + + if (!Strings.isNullOrEmpty(accessKey) && !Strings.isNullOrEmpty(secretKey)) { + if (Strings.isNullOrEmpty(sessionToken)) { + return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey)); + } else { + return StaticCredentialsProvider.create(AwsSessionCredentials.create(accessKey, + secretKey, sessionToken)); + } + } + + if (!Strings.isNullOrEmpty(roleArn)) { + StsClient stsClient = StsClient.builder() + .credentialsProvider(InstanceProfileCredentialsProvider.create()) + .build(); + return StsAssumeRoleCredentialsProvider.builder() + .stsClient(stsClient) + .refreshRequest(r -> r.roleArn(roleArn).externalId(externalId) + .roleSessionName("aws-sdk-java-v2-fe")) + .build(); } + return AwsCredentialsProviderChain.of(SystemPropertyCredentialsProvider.create(), + EnvironmentVariableCredentialsProvider.create(), + WebIdentityTokenFileCredentialsProvider.create(), + ProfileCredentialsProvider.create(), + InstanceProfileCredentialsProvider.create()); + } + + public static S3Client buildS3Client(URI endpoint, String region, boolean isUsePathStyle, String accessKey, + String secretKey, String sessionToken, String roleArn, String externalId) { EqualJitterBackoffStrategy backoffStrategy = EqualJitterBackoffStrategy .builder() .baseDelay(Duration.ofSeconds(1)) @@ -86,7 +169,8 @@ public class S3Util { return S3Client.builder() .httpClient(UrlConnectionHttpClient.create()) .endpointOverride(endpoint) - .credentialsProvider(scp) + .credentialsProvider(getAwsCredencialsProvider(endpoint, region, accessKey, secretKey, + sessionToken, roleArn, externalId)) .region(Region.of(region)) .overrideConfiguration(clientConf) // disable chunkedEncoding because of bos not supported diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/S3ClientBEProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/S3ClientBEProperties.java index 5cc19339ce0..c4f4c01b0b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/S3ClientBEProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/S3ClientBEProperties.java @@ -96,6 +96,14 @@ public class S3ClientBEProperties { if (properties.containsKey(S3Properties.PROVIDER)) { beProperties.put(S3Properties.PROVIDER, properties.get(S3Properties.PROVIDER)); } + + if (properties.containsKey(S3Properties.ROLE_ARN)) { + beProperties.put(S3Properties.Env.ROLE_ARN, properties.get(S3Properties.ROLE_ARN)); + } + + if (properties.containsKey(S3Properties.EXTERNAL_ID)) { + beProperties.put(S3Properties.Env.EXTERNAL_ID, properties.get(S3Properties.EXTERNAL_ID)); + } return beProperties; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java index 3f8e875ae68..41f4de716e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java @@ -18,6 +18,7 @@ package org.apache.doris.datasource.property.constants; import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.proto.Cloud.CredProviderTypePB; import org.apache.doris.cloud.proto.Cloud.ObjectStoreInfoPB.Provider; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; @@ -25,6 +26,7 @@ import org.apache.doris.common.credentials.CloudCredential; import org.apache.doris.common.credentials.CloudCredentialWithEndpoint; import org.apache.doris.common.credentials.DataLakeAWSCredentialsProvider; import org.apache.doris.datasource.property.PropertyConverter; +import org.apache.doris.thrift.TCredProviderType; import org.apache.doris.thrift.TS3StorageParam; import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; @@ -57,6 +59,10 @@ public class S3Properties extends BaseProperties { public static final String ACCESS_KEY = "s3.access_key"; public static final String SECRET_KEY = "s3.secret_key"; public static final String SESSION_TOKEN = "s3.session_token"; + + public static final String ROLE_ARN = "s3.role_arn"; + public static final String EXTERNAL_ID = "s3.external_id"; + public static final String MAX_CONNECTIONS = "s3.connection.maximum"; public static final String REQUEST_TIMEOUT_MS = "s3.connection.request.timeout"; public static final String CONNECTION_TIMEOUT_MS = "s3.connection.timeout"; @@ -121,6 +127,9 @@ public class S3Properties extends BaseProperties { public static final String DEFAULT_CONNECTION_TIMEOUT_MS = "1000"; public static final String NEED_OVERRIDE_ENDPOINT = "AWS_NEED_OVERRIDE_ENDPOINT"; + public static final String ROLE_ARN = "AWS_ROLE_ARN"; + public static final String EXTERNAL_ID = "AWS_EXTERNAL_ID"; + public static final List<String> REQUIRED_FIELDS = Arrays.asList(ENDPOINT); public static final List<String> FS_KEYS = Arrays.asList(ENDPOINT, REGION, ACCESS_KEY, SECRET_KEY, TOKEN, ROOT_PATH, BUCKET, MAX_CONNECTIONS, REQUEST_TIMEOUT_MS, CONNECTION_TIMEOUT_MS); @@ -284,10 +293,27 @@ public class S3Properties extends BaseProperties { if (properties.containsKey(PropertyConverter.USE_PATH_STYLE)) { properties.putIfAbsent(PropertyConverter.USE_PATH_STYLE, properties.get(PropertyConverter.USE_PATH_STYLE)); } + + if (properties.containsKey(S3Properties.Env.ROLE_ARN)) { + properties.putIfAbsent(S3Properties.ROLE_ARN, properties.get(S3Properties.Env.ROLE_ARN)); + } + + if (properties.containsKey(S3Properties.Env.EXTERNAL_ID)) { + properties.putIfAbsent(S3Properties.EXTERNAL_ID, properties.get(S3Properties.Env.EXTERNAL_ID)); + } } public static TS3StorageParam getS3TStorageParam(Map<String, String> properties) { TS3StorageParam s3Info = new TS3StorageParam(); + + if (properties.containsKey(S3Properties.ROLE_ARN)) { + s3Info.setRoleArn(properties.get(S3Properties.ROLE_ARN)); + if (properties.containsKey(S3Properties.EXTERNAL_ID)) { + s3Info.setExternalId(properties.get(S3Properties.EXTERNAL_ID)); + } + s3Info.setCredProviderType(TCredProviderType.INSTANCE_PROFILE); + } + s3Info.setEndpoint(properties.get(S3Properties.ENDPOINT)); s3Info.setRegion(properties.get(S3Properties.REGION)); s3Info.setAk(properties.get(S3Properties.ACCESS_KEY)); @@ -348,6 +374,15 @@ public class S3Properties extends BaseProperties { "Invalid use_path_style value: %s only 'true' or 'false' is acceptable", value); builder.setUsePathStyle(value.equalsIgnoreCase("true")); } + + if (properties.containsKey(S3Properties.ROLE_ARN)) { + builder.setRoleArn(properties.get(S3Properties.ROLE_ARN)); + if (properties.containsKey(S3Properties.EXTERNAL_ID)) { + builder.setExternalId(properties.get(S3Properties.EXTERNAL_ID)); + } + builder.setCredProviderType(CredProviderTypePB.INSTANCE_PROFILE); + } + return builder; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java index 5249c9f49d8..6adb98f1d7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java @@ -20,7 +20,6 @@ package org.apache.doris.fs.obj; import org.apache.doris.backup.Status; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; -import org.apache.doris.common.credentials.CloudCredential; import org.apache.doris.common.util.S3URI; import org.apache.doris.common.util.S3Util; import org.apache.doris.datasource.property.PropertyConverter; @@ -127,13 +126,10 @@ public class S3ObjStorage implements ObjStorage<S3Client> { endpointStr = "http://" + endpointStr; } URI endpoint = URI.create(endpointStr); - CloudCredential credential = new CloudCredential(); - credential.setAccessKey(properties.get(S3Properties.ACCESS_KEY)); - credential.setSecretKey(properties.get(S3Properties.SECRET_KEY)); - if (properties.containsKey(S3Properties.SESSION_TOKEN)) { - credential.setSessionToken(properties.get(S3Properties.SESSION_TOKEN)); - } - client = S3Util.buildS3Client(endpoint, properties.get(S3Properties.REGION), credential, isUsePathStyle); + client = S3Util.buildS3Client(endpoint, properties.get(S3Properties.REGION), isUsePathStyle, + properties.get(S3Properties.ACCESS_KEY), properties.get(S3Properties.SECRET_KEY), + properties.get(S3Properties.SESSION_TOKEN), properties.get(S3Properties.ROLE_ARN), + properties.get(S3Properties.EXTERNAL_ID)); } return client; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java index 56c438c303e..93dc28d84a3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java @@ -104,6 +104,14 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction { // For Azure's compatibility, we need bucket to connect to the blob storage's container locationProperties.put(S3Properties.BUCKET, s3uri.getBucket()); } + + if (properties.containsKey(S3Properties.ROLE_ARN)) { + locationProperties.put(S3Properties.ROLE_ARN, properties.get(S3Properties.ROLE_ARN)); + if (properties.containsKey(S3Properties.EXTERNAL_ID)) { + locationProperties.put(S3Properties.EXTERNAL_ID, properties.get(S3Properties.EXTERNAL_ID)); + } + } + locationProperties.putAll(S3ClientBEProperties.getBeFSProperties(locationProperties)); locationProperties.putAll(otherProps); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/S3ResourceTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/S3ResourceTest.java index b7d14ab7017..b39e2fbfef8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/S3ResourceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/S3ResourceTest.java @@ -277,4 +277,35 @@ public class S3ResourceTest { Assert.assertTrue(e.getMessage(), false); } } + + @Test + public void testPingS3WithRoleArn() { + try { + String endpoint = System.getenv("ENDPOINT"); + String region = System.getenv("REGION"); + String provider = System.getenv("PROVIDER"); + + String roleArn = System.getenv("ROLE_ARN"); + String externalId = System.getenv("EXTERNAL_ID"); + String bucket = System.getenv("BUCKET"); + + Assume.assumeTrue("ENDPOINT isNullOrEmpty.", !Strings.isNullOrEmpty(endpoint)); + Assume.assumeTrue("REGION isNullOrEmpty.", !Strings.isNullOrEmpty(region)); + Assume.assumeTrue("PROVIDER isNullOrEmpty.", !Strings.isNullOrEmpty(provider)); + Assume.assumeTrue("ROLE_ARN isNullOrEmpty.", !Strings.isNullOrEmpty(roleArn)); + Assume.assumeTrue("EXTERNAL_ID isNullOrEmpty.", !Strings.isNullOrEmpty(externalId)); + Assume.assumeTrue("BUCKET isNullOrEmpty.", !Strings.isNullOrEmpty(bucket)); + + Map<String, String> properties = new HashMap<>(); + properties.put("s3.endpoint", endpoint); + properties.put("s3.region", region); + properties.put("s3.role_arn", roleArn); + properties.put("s3.external_id", externalId); + properties.put("provider", provider); + S3Resource.pingS3(bucket, "fe_ut_role_prefix", properties); + } catch (DdlException e) { + LOG.info("testPingS3WithRoleArn exception:", e); + Assert.assertTrue(e.getMessage(), false); + } + } } diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index c18b35ce15f..1b6cdd9bd5a 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -170,6 +170,13 @@ enum NodeStatusPB { NODE_STATUS_DECOMMISSIONED = 4; } +enum CredProviderTypePB { + // used for creating different credentials provider when creating s3client + DEFAULT = 1; // DefaultAWSCredentialsProviderChain + SIMPLE = 2; // SimpleAWSCredentialsProvider, corresponding to (ak, sk) + INSTANCE_PROFILE = 3; // InstanceProfileCredentialsProvider +} + message ObjectStoreInfoPB { // presigned url use // oss,aws,cos,obs,bos @@ -198,6 +205,10 @@ message ObjectStoreInfoPB { optional EncryptionInfoPB encryption_info = 14; optional bool sse_enabled = 15; optional bool use_path_style = 16; + + optional CredProviderTypePB cred_provider_type = 17; + optional string role_arn = 18; // aws assumed role's arn + optional string external_id = 19; // aws assumed role's external_id if configure } // The legacy ObjectStoreInfoPB is stored in InstanceInfoPB diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index aa1ee2f5b9f..cf206f5c564 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -77,6 +77,13 @@ enum TObjStorageType { GCP = 7 } +enum TCredProviderType { + // used for creating different credentials provider when creating s3client + DEFAULT = 0, // DefaultAWSCredentialsProviderChain + SIMPLE = 1, // SimpleAWSCredentialsProvider, corresponding to (ak, sk) + INSTANCE_PROFILE = 2 // InstanceProfileCredentialsProvider +} + struct TS3StorageParam { 1: optional string endpoint 2: optional string region @@ -90,6 +97,10 @@ struct TS3StorageParam { 10: optional bool use_path_style = false 11: optional string token 12: optional TObjStorageType provider + + 13: optional TCredProviderType cred_provider_type + 14: optional string role_arn // aws assumed role's arn + 15: optional string external_id // aws assumed role's external_id if configure } struct TStoragePolicy { diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy index b57976c0578..da76c682c57 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy @@ -150,6 +150,23 @@ class Config { public String s3Source + // for aws role arn regression test + public String awsRoleArn + public String awsExternalId + public String awsEndpoint + public String awsRegion + public String awsBucket + public String awsPrefix + public String awsAccessKey + public String awsSecretKey + + public String regressionAwsRoleArn + public String regressionAwsExternalId + public String regressionAwsEndpoint + public String regressionAwsRegion + public String regressionAwsBucket + public String regressionAwsPrefix + Config() {} Config( @@ -599,6 +616,23 @@ class Config { config.dockerEndNoKill = configToBoolean(obj.dockerEndNoKill) config.excludeDockerTest = configToBoolean(obj.excludeDockerTest) + config.awsRoleArn = configToString(obj.awsRoleArn) + config.awsExternalId = configToString(obj.awsExternalId) + config.awsPrefix = configToString(obj.awsPrefix) + config.awsEndpoint = configToString(obj.awsEndpoint) + config.awsRegion = configToString(obj.awsRegion) + config.awsBucket = configToString(obj.awsBucket) + config.awsAccessKey = configToString(obj.awsAccessKey) + config.awsSecretKey = configToString(obj.awsSecretKey) + config.awsPrefix = configToString(obj.awsPrefix) + + config.regressionAwsRoleArn = configToString(obj.regressionAwsRoleArn) + config.regressionAwsExternalId = configToString(obj.regressionAwsExternalId) + config.regressionAwsEndpoint = configToString(obj.regressionAwsEndpoint) + config.regressionAwsRegion = configToString(obj.regressionAwsRegion) + config.regressionAwsBucket = configToString(obj.regressionAwsBucket) + config.regressionAwsPrefix = configToString(obj.regressionAwsPrefix) + def declareFileNames = config.getClass() .getDeclaredFields() .collect({f -> f.name}) diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy index 8f6a66d50eb..49046a0807f 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy @@ -977,4 +977,27 @@ class Syncer { ) """ } + + void createS3RepositoryWithRole(String name, boolean readOnly = false) { + String roleArn = suite.context.config.awsRoleArn + String externalId = suite.context.config.awsExternalId + String endpoint = suite.context.config.awsEndpoint + String region = suite.context.config.awsRegion + String bucket = suite.context.config.awsBucket + String prefix = suite.context.config.awsPrefix + + suite.try_sql "DROP REPOSITORY `${name}`" + suite.sql """ + CREATE ${readOnly ? "READ ONLY" : ""} REPOSITORY `${name}` + WITH S3 + ON LOCATION "s3://${bucket}/${prefix}/aws_iam_role_p0/${name}" + PROPERTIES + ( + "s3.endpoint" = "${endpoint}", + "s3.region" = "${region}", + "s3.role_arn" = "${roleArn}", + "s3.external_id" = "${externalId}" + ) + """ + } } diff --git a/regression-test/suites/aws_iam_role_p0/test_backup_restore_with_role.groovy b/regression-test/suites/aws_iam_role_p0/test_backup_restore_with_role.groovy new file mode 100644 index 00000000000..33349bdfbbf --- /dev/null +++ b/regression-test/suites/aws_iam_role_p0/test_backup_restore_with_role.groovy @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import com.google.common.base.Strings; + +suite("test_backup_restore_with_role") { + if (Strings.isNullOrEmpty(context.config.awsRoleArn)) { + logger.info("skip ${name} case, because awsRoleArn is null or empty") + return + } + + if (isCloudMode()) { + logger.info("skip ${name} case, because cloud mode not support") + return + } + + logger.info("role info: ${context.config.awsRoleArn}|${context.config.awsExternalId}|${context.config.awsPrefix}") + + String suiteName = "test_backup_restore_with_role" + + String dbName = "${suiteName}_db" + String tableName = "${suiteName}_table" + String repoName = "${suiteName}_repo_" + UUID.randomUUID().toString().replace("-", "") + String snapshotName = "${suiteName}_snapshot" + + def syncer = getSyncer() + syncer.createS3RepositoryWithRole(repoName) + + sql "CREATE DATABASE IF NOT EXISTS ${dbName}" + sql "DROP TABLE IF EXISTS ${dbName}.${tableName} FORCE;" + sql """ + CREATE TABLE ${dbName}.${tableName} ( + `id` LARGEINT NOT NULL, + `count` LARGEINT SUM DEFAULT "0") + AGGREGATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 2 + PROPERTIES + ( + "replication_num" = "1" + ) + """ + + List<String> values = [] + for (int i = 1; i <= 10; ++i) { + values.add("(${i}, ${i})") + } + sql "INSERT INTO ${dbName}.${tableName} VALUES ${values.join(",")}" + def result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + sql """ + BACKUP SNAPSHOT ${dbName}.${snapshotName} + TO `${repoName}` + ON (${tableName}) + """ + + syncer.waitSnapshotFinish(dbName) + + def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName) + assertTrue(snapshot != null) + + sql "TRUNCATE TABLE ${dbName}.${tableName}" + + sql """ + RESTORE SNAPSHOT ${dbName}.${snapshotName} + FROM `${repoName}` + ON ( `${tableName}`) + PROPERTIES + ( + "backup_timestamp" = "${snapshot}", + "reserve_replica" = "true" + ) + """ + + syncer.waitAllRestoreFinish(dbName) + + result = sql "SELECT * FROM ${dbName}.${tableName}" + assertEquals(result.size(), values.size()); + + sql "DROP TABLE ${dbName}.${tableName} FORCE" + sql "DROP DATABASE ${dbName} FORCE" + sql "DROP REPOSITORY `${repoName}`" +} \ No newline at end of file diff --git a/regression-test/suites/aws_iam_role_p0/test_export_with_role.groovy b/regression-test/suites/aws_iam_role_p0/test_export_with_role.groovy new file mode 100644 index 00000000000..1fbf4f3a8f0 --- /dev/null +++ b/regression-test/suites/aws_iam_role_p0/test_export_with_role.groovy @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import com.google.common.base.Strings; + +suite("test_export_with_role") { + if (Strings.isNullOrEmpty(context.config.awsRoleArn)) { + logger.info("skip ${name} case, because awsRoleArn is null or empty") + return + } + + + def randomStr = UUID.randomUUID().toString().replace("-", "") + def label = "label_" + randomStr + def tableName = "test_export_with_role" + + def endpoint = context.config.awsEndpoint + def region = context.config.awsRegion + def bucket = context.config.awsBucket + def roleArn = context.config.awsRoleArn + def externalId = context.config.awsExternalId + + def prefix = context.config.awsPrefix + + sql """ + DROP TABLE IF EXISTS ${tableName} FORCE; + """ + + sql """ + CREATE TABLE ${tableName} + ( + siteid INT DEFAULT '10', + citycode SMALLINT NOT NULL, + username VARCHAR(32) DEFAULT '', + pv BIGINT SUM DEFAULT '0' + ) + AGGREGATE KEY(siteid, citycode, username) + DISTRIBUTED BY HASH(siteid) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + sql """insert into ${tableName}(siteid, citycode, username, pv) values (1, 1, "xxx", 1), + (2, 2, "yyy", 2), + (3, 3, "zzz", 3) + """ + + sql """ + EXPORT TABLE ${tableName} TO "s3://${bucket}/${prefix}/aws_iam_role_p0/test_export_with_role" + PROPERTIES( + "label" = "${label}", + "format" = "csv", + "column_separator"=",", + "delete_existing_files"= "true" + ) + WITH s3 ( + "s3.endpoint" = "${endpoint}", + "s3.region" = "${region}", + "s3.role_arn" = "${roleArn}", + "s3.external_id"="${externalId}", + "provider" = "AWS" + ); + """ + + def maxTryMs = 600000 + def outfileUrl = "" + while (maxTryMs > 0) { + String[][] result = sql """ show export where label = "${label}" """ + logger.info("result: ${result}") + if (result[0][2].equals("FINISHED")) { + def json = parseJson(result[0][11]) + logger.info("json: ${json}") + assert json instanceof List + assertEquals("1", json.fileNumber[0][0]) + log.info("outfileUrl: ${json.url[0][0]}") + outfileUrl = json.url[0][0] + break; + } + if (result[0][2].equals("CANCELLED")) { + assertTrue(false, "Export ${label} cancelled: ${result}") + break + } + Thread.sleep(5000) + maxTryMs -= 5000 + if (maxTryMs <= 0) { + assertTrue(false, "Export ${label} timeout") + } + } + + def result = sql """ + select count(*) from s3("uri" = "s3://${bucket}/${outfileUrl.substring(5 + bucket.length())}0.csv", + "s3.endpoint" = "${endpoint}", + "s3.region" = "${region}", + "s3.role_arn"= "${roleArn}", + "s3.external_id" = "${externalId}", + "format" = "csv", + "column_separator" = ",", + "use_path_style" = "false"); + """ + log.info("result: ${result}") + assertEquals(3, result[0][0]) +} \ No newline at end of file diff --git a/regression-test/suites/aws_iam_role_p0/test_external_catalog_with_role.groovy b/regression-test/suites/aws_iam_role_p0/test_external_catalog_with_role.groovy new file mode 100644 index 00000000000..0734c34b4a1 --- /dev/null +++ b/regression-test/suites/aws_iam_role_p0/test_external_catalog_with_role.groovy @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import com.google.common.base.Strings; + +suite("test_external_catalog_with_role") { + if (Strings.isNullOrEmpty(context.config.awsRoleArn)) { + logger.info("skip ${name} case, because awsRoleArn is null or empty") + return + } + + def endpoint = context.config.awsEndpoint + def region = context.config.awsRegion + def bucket = context.config.awsBucket + def roleArn = context.config.awsRoleArn + def externalId = context.config.awsExternalId + def prefix = context.config.awsPrefix + + def randomStr = UUID.randomUUID().toString().replace("-", "") + + def tableName = "test_external_catalog_with_role" + + sql """ drop table if exists ${tableName} force;""" + + sql """ + CREATE TABLE ${tableName} + ( + siteid INT DEFAULT '10', + citycode INT NOT NULL, + username VARCHAR(32) DEFAULT '', + pv BIGINT SUM DEFAULT '0' + ) + AGGREGATE KEY(siteid, citycode, username) + DISTRIBUTED BY HASH(siteid) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + sql """insert into ${tableName}(siteid, citycode, username, pv) values (1, 1, "xxx", 1), + (2, 2, "yyy", 2), + (3, 3, "zzz", 3) + """ + sql """sync;""" + + + sql """ drop catalog if exists aws_iam_role_p0_iceberg;""" + + sql """ + CREATE CATALOG aws_iam_role_p0_iceberg PROPERTIES ( + "type" = "iceberg", + "iceberg.catalog.type" = "hadoop", + "warehouse" = "s3://${bucket}/${prefix}/aws_iam_role_p0/test_external_catalog_with_role/${randomStr}", + "s3.endpoint" = "${endpoint}", + "s3.role_arn" = "${roleArn}", + "s3.external_id" = "${externalId}" + ); + """ + + sql """ create database if not exists aws_iam_role_p0_iceberg.test_role_db; """ + + sql """ drop table if exists aws_iam_role_p0_iceberg.test_role_db.${tableName}_2;""" + + sql """ CREATE TABLE aws_iam_role_p0_iceberg.test_role_db.${tableName}_2 + PROPERTIES("file_format" = "parquet") AS SELECT * FROM ${tableName};""" + + sql """ sync; """ + + def result = sql "SELECT count(*) FROM aws_iam_role_p0_iceberg.test_role_db.${tableName}_2" + logger.info("result: ${result}") + assertEquals(result[0][0], 3); +} \ No newline at end of file diff --git a/regression-test/suites/aws_iam_role_p0/test_resource_with_role.groovy b/regression-test/suites/aws_iam_role_p0/test_resource_with_role.groovy new file mode 100644 index 00000000000..66410ff48d4 --- /dev/null +++ b/regression-test/suites/aws_iam_role_p0/test_resource_with_role.groovy @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import com.google.common.base.Strings; +import groovy.json.JsonSlurper + +suite("test_resource_with_role") { + if (Strings.isNullOrEmpty(context.config.awsRoleArn)) { + logger.info("skip ${name} case, because awsRoleArn is null or empty") + return + } + + + if (isCloudMode()) { + logger.info("skip ${name} case, because cloud mode not support") + return + } + + def tableName = "test_resource_with_role" + def randomStr = UUID.randomUUID().toString().replace("-", "") + def resourceName = "resource_${randomStr}" + def policyName = "policy_${randomStr}" + + def awsEndpoint = context.config.awsEndpoint + def region = context.config.awsRegion + def bucket = context.config.awsBucket + def roleArn = context.config.awsRoleArn + def externalId = context.config.awsExternalId + def prefix = context.config.awsPrefix + + sql """ + CREATE RESOURCE IF NOT EXISTS "${resourceName}" + PROPERTIES( + "type"="s3", + "AWS_ENDPOINT" = "${awsEndpoint}", + "AWS_REGION" = "${region}", + "AWS_BUCKET" = "${bucket}", + "AWS_ROOT_PATH" = "${prefix}/aws_iam_role_p0/test_resource_with_role/${randomStr}", + "AWS_ROLE_ARN" = "${roleArn}", + "AWS_EXTERNAL_ID" = "${externalId}", + "AWS_MAX_CONNECTIONS" = "50", + "AWS_REQUEST_TIMEOUT_MS" = "3000", + "AWS_CONNECTION_TIMEOUT_MS" = "1000", + "s3_validity_check" = "true" + ); + """ + + sql """ + CREATE STORAGE POLICY IF NOT EXISTS ${policyName} + PROPERTIES( + "storage_resource" = "${resourceName}", + "cooldown_ttl" = "1" + ) + """ + + sql """ + DROP TABLE IF EXISTS ${tableName} FORCE; + """ + + sql """ + CREATE TABLE ${tableName} + ( + siteid INT DEFAULT '10', + citycode SMALLINT NOT NULL, + username VARCHAR(32) DEFAULT '', + pv BIGINT SUM DEFAULT '0' + ) + AGGREGATE KEY(siteid, citycode, username) + DISTRIBUTED BY HASH(siteid) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "storage_policy" = "${policyName}" + ) + """ + + sql """insert into ${tableName}(siteid, citycode, username, pv) values (1, 1, "xxx", 1), + (2, 2, "yyy", 2), + (3, 3, "zzz", 3) + """ + + // data_sizes is one arrayList<Long>, t is tablet + def fetchDataSize = {List<Long> data_sizes, Map<String, Object> t -> + def tabletId = t.TabletId + def meta_url = t.MetaUrl + def clos = { respCode, body -> + logger.info("test ttl expired resp Code {}", "${respCode}".toString()) + assertEquals("${respCode}".toString(), "200") + String out = "${body}".toString() + def obj = new JsonSlurper().parseText(out) + data_sizes[0] = obj.local_data_size + data_sizes[1] = obj.remote_data_size + } + meta_url = meta_url.replace("header", "data_size") + + def i = meta_url.indexOf("/api") + def endPoint = meta_url.substring(0, i) + def metaUri = meta_url.substring(i) + logger.info("test fetchBeHttp, endpoint:${endPoint}, metaUri:${metaUri}") + i = endPoint.lastIndexOf('/') + endPoint = endPoint.substring(i + 1) + + httpTest { + endpoint {endPoint} + uri metaUri + op "get" + check clos + } + } + + sleep(60000) + + List<Long> sizes = [-1, -1] + def tablets = sql_return_maparray """ + SHOW TABLETS FROM ${tableName} + """ + log.info( "test tablets not empty:${tablets}") + fetchDataSize(sizes, tablets[0]) + def retry = 100 + while (sizes[1] == 0 && retry-- > 0) { + log.info( "test remote size is zero, sleep 10s") + sleep(10000) + tablets = sql_return_maparray """ + SHOW TABLETS FROM ${tableName} + """ + fetchDataSize(sizes, tablets[0]) + } + assertTrue(sizes[1] != 0, "remote size is still zero, maybe some error occurred") + assertTrue(tablets.size() > 0) + log.info( "test remote size not zero") +} \ No newline at end of file diff --git a/regression-test/suites/aws_iam_role_p0/test_s3_load_with_role.groovy b/regression-test/suites/aws_iam_role_p0/test_s3_load_with_role.groovy new file mode 100644 index 00000000000..25c759802cf --- /dev/null +++ b/regression-test/suites/aws_iam_role_p0/test_s3_load_with_role.groovy @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import com.google.common.base.Strings; + +suite("test_s3_load_with_role") { + if (Strings.isNullOrEmpty(context.config.regressionAwsRoleArn)) { + logger.info("skip ${name} case, because awsRoleArn is null or empty") + return + } + + def randomStr = UUID.randomUUID().toString().replace("-", "") + def loadLabel = "label_" + randomStr + def tableName = "test_s3_load_with_role" + + sql """ + DROP TABLE IF EXISTS ${tableName} FORCE; + """ + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + C_CUSTKEY INTEGER NOT NULL, + C_NAME VARCHAR(25) NOT NULL, + C_ADDRESS VARCHAR(40) NOT NULL, + C_NATIONKEY INTEGER NOT NULL, + C_PHONE CHAR(15) NOT NULL, + C_ACCTBAL DECIMAL(15,2) NOT NULL, + C_MKTSEGMENT CHAR(10) NOT NULL, + C_COMMENT VARCHAR(117) NOT NULL + ) + DUPLICATE KEY(C_CUSTKEY, C_NAME) + DISTRIBUTED BY HASH(C_CUSTKEY) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + def endpoint = context.config.regressionAwsEndpoint + def region = context.config.regressionAwsRegion + def bucket = context.config.regressionAwsBucket + def roleArn = context.config.regressionAwsRoleArn + def externalId = context.config.regressionAwsExternalId + + sql """ + LOAD LABEL ${loadLabel} ( + DATA INFILE("s3://${bucket}/regression/tpch/sf0.01/customer.csv.gz") + INTO TABLE ${tableName} + COLUMNS TERMINATED BY "|" + (c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment) + ) + WITH S3 ( + "AWS_ENDPOINT" = "${endpoint}", + "AWS_REGION" = "${region}", + "AWS_ROLE_ARN" = "${roleArn}", + "AWS_EXTERNAL_ID" = "${externalId}", + "compress_type" = "GZ" + ) + properties( + "timeout" = "28800", + "exec_mem_limit" = "8589934592" + ) + """ + + def maxTryMs = 600000 + while (maxTryMs > 0) { + String[][] result = sql """ show load where label="${loadLabel}" order by createtime desc limit 1; """ + if (result[0][2].equals("FINISHED")) { + logger.info("Load ${loadLabel} finished: $result") + break + } + if (result[0][2].equals("CANCELLED")) { + assertTrue(false, "Load ${loadLabel} cancelled: ${result}") + break + } + Thread.sleep(5000) + maxTryMs -= 5000 + if (maxTryMs <= 0) { + assertTrue(false, "Load ${loadLabel} timeout") + } + } + + def result = sql """ select count(*) from ${tableName}; """ + logger.info("result:${result}"); + assertTrue(result[0][0] == 1500) + + + def randomStr2 = UUID.randomUUID().toString().replace("-", "") + def loadLabel2 = "label_" + randomStr2 + + sql """ + LOAD LABEL ${loadLabel2} ( + DATA INFILE("s3://${bucket}/regression/tpch/sf0.01/customer.csv.gz") + INTO TABLE ${tableName} + COLUMNS TERMINATED BY "|" + (c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment) + ) + WITH S3 ( + "s3.endpoint" = "${endpoint}", + "s3.region" = "${region}", + "s3.role_arn" = "${roleArn}", + "s3.external_id" = "${externalId}", + "compress_type" = "GZ" + ) + properties( + "timeout" = "28800", + "exec_mem_limit" = "8589934592" + ) + """ + + maxTryMs = 600000 + while (maxTryMs > 0) { + String[][] result2 = sql """ show load where label="${loadLabel2}" order by createtime desc limit 1; """ + if (result2[0][2].equals("FINISHED")) { + logger.info("Load ${loadLabel2} finished: $result2") + break + } + if (result2[0][2].equals("CANCELLED")) { + assertTrue(false, "Load ${loadLabel2} cancelled: ${result2}") + break + } + Thread.sleep(5000) + maxTryMs -= 5000 + if (maxTryMs <= 0) { + assertTrue(false, "Load ${loadLabel2} timeout") + } + } + + result = sql """ select count(*) from ${tableName}; """ + logger.info("result:${result}"); + assertTrue(result[0][0] == 3000) + +} \ No newline at end of file diff --git a/regression-test/suites/aws_iam_role_p0/test_s3_vault_with_role.groovy b/regression-test/suites/aws_iam_role_p0/test_s3_vault_with_role.groovy new file mode 100644 index 00000000000..f73a3319c85 --- /dev/null +++ b/regression-test/suites/aws_iam_role_p0/test_s3_vault_with_role.groovy @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import com.google.common.base.Strings; + +suite("test_s3_vault_with_role") { + if (!isCloudMode()) { + logger.info("skip ${name} case, because not cloud mode") + return + } + + if (!enableStoragevault()) { + logger.info("skip ${name} case, because storage vault not enabled") + return + } + + def randomStr = UUID.randomUUID().toString().replace("-", "") + def s3VaultName = "s3_" + randomStr + + def endpoint = context.config.awsEndpoint + def region = context.config.awsRegion + def bucket = context.config.awsBucket + def roleArn = context.config.awsRoleArn + def externalId = context.config.awsExternalId + def prefix = context.config.awsPrefix + + sql """ + CREATE STORAGE VAULT IF NOT EXISTS ${s3VaultName} + PROPERTIES ( + "type"="S3", + "s3.endpoint"="${endpoint}", + "s3.region" = "${region}", + "s3.role_arn" = "${roleArn}", + "s3.external_id" = "${externalId}", + "s3.root.path" = "${prefix}/aws_iam_role_p0/${s3VaultName}", + "s3.bucket" = "${bucket}", + "s3.external_endpoint" = "", + "provider" = "S3", + "use_path_style" = "false" + ); + """ + + sql """ + CREATE TABLE ${s3VaultName} ( + C_CUSTKEY INTEGER NOT NULL, + C_NAME INTEGER NOT NULL + ) + DUPLICATE KEY(C_CUSTKEY, C_NAME) + DISTRIBUTED BY HASH(C_CUSTKEY) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "storage_vault_name" = ${s3VaultName} + ) + """ + sql """ insert into ${s3VaultName} values(1, 1); """ + sql """ sync;""" + def result = sql """ select * from ${s3VaultName}; """ + assertEquals(result.size(), 1); +} \ No newline at end of file diff --git a/regression-test/suites/aws_iam_role_p0/test_select_into_outfile_with_role.groovy b/regression-test/suites/aws_iam_role_p0/test_select_into_outfile_with_role.groovy new file mode 100644 index 00000000000..613d4ee8be9 --- /dev/null +++ b/regression-test/suites/aws_iam_role_p0/test_select_into_outfile_with_role.groovy @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import com.google.common.base.Strings; + +suite("test_select_into_outfile_with_role") { + if (Strings.isNullOrEmpty(context.config.awsRoleArn)) { + logger.info("skip ${name} case, because awsRoleArn is null or empty") + return + } + + def endpoint = context.config.awsEndpoint + def region = context.config.awsRegion + def bucket = context.config.awsBucket + def roleArn = context.config.awsRoleArn + def externalId = context.config.awsExternalId + def prefix = context.config.awsPrefix + + def randomStr = UUID.randomUUID().toString().replace("-", "") + def tableName = "test_select_into_outfile_with_role" + + sql """ drop table if exists ${tableName} force;""" + sql """ + CREATE TABLE ${tableName} + ( + siteid INT DEFAULT '10', + citycode SMALLINT NOT NULL, + username VARCHAR(32) DEFAULT '', + pv BIGINT SUM DEFAULT '0' + ) + AGGREGATE KEY(siteid, citycode, username) + DISTRIBUTED BY HASH(siteid) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + sql """insert into ${tableName}(siteid, citycode, username, pv) values (1, 1, "xxx", 1), + (2, 2, "yyy", 2), + (3, 3, "zzz", 3) + """ + sql """sync;""" + + + sql """ + SELECT * FROM ${tableName} + INTO OUTFILE "s3://${bucket}/${prefix}/aws_iam_role_p0/test_select_into_outfile_with_role" + FORMAT AS CSV + PROPERTIES( + "s3.endpoint" = "${endpoint}", + "s3.region" = "${region}", + "s3.role_arn" = "${roleArn}", + "s3.external_id"="${externalId}" + ); + """ +} \ No newline at end of file diff --git a/regression-test/suites/aws_iam_role_p0/test_tvf_with_role.groovy b/regression-test/suites/aws_iam_role_p0/test_tvf_with_role.groovy new file mode 100644 index 00000000000..ebecc6348ed --- /dev/null +++ b/regression-test/suites/aws_iam_role_p0/test_tvf_with_role.groovy @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import com.google.common.base.Strings; + +suite("test_tvf_with_role") { + if (Strings.isNullOrEmpty(context.config.regressionAwsRoleArn)) { + logger.info("skip ${name} case, because awsRoleArn is null or empty") + return + } + + + if (Strings.isNullOrEmpty(context.config.awsRoleArn)) { + logger.info("skip ${name} case, because awsRoleArn is null or empty") + return + } + + def endpoint = context.config.regressionAwsEndpoint + def region = context.config.regressionAwsRegion + def bucket = context.config.regressionAwsBucket + def roleArn = context.config.regressionAwsRoleArn + def externalId = context.config.regressionAwsExternalId + + sql """ + select count(*) from s3("uri" = "s3://${bucket}/regression/tpch/sf0.01/customer.csv.gz", + "s3.endpoint" = "${endpoint}", + "s3.region" = "${region}", + "s3.role_arn"= "${roleArn}", + "s3.external_id" = "${externalId}", + "format" = "csv", + "compress_type" = "gz", + "column_separator" = "|", + "use_path_style" = "false"); + """ +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org