This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 39b1bd40f07 [test](Cloud) Add more storage vault ut (#33269) 39b1bd40f07 is described below commit 39b1bd40f073f0174994bc76e2410deaf05e763d Author: AlexYue <yj976240...@gmail.com> AuthorDate: Sat Apr 6 13:30:52 2024 +0800 [test](Cloud) Add more storage vault ut (#33269) --- cloud/src/meta-service/meta_service.cpp | 30 ++- cloud/src/meta-service/meta_service_resource.cpp | 13 +- cloud/test/meta_service_test.cpp | 275 ++++++++++++++++++++- .../org/apache/doris/catalog/StorageVault.java | 4 +- gensrc/proto/cloud.proto | 4 +- 5 files changed, 308 insertions(+), 18 deletions(-) diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 4dffef76a02..5cf6e67a937 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -539,11 +539,24 @@ void MetaServiceImpl::create_tablets(::google::protobuf::RpcController* controll return; } - std::shared_ptr<Transaction> txn(txn0.release()); - auto [c0, m0] = resource_mgr_->get_instance(txn, instance_id, &instance); - if (c0 != TxnErrorCode::TXN_OK) { + InstanceKeyInfo key_info {instance_id}; + std::string key; + std::string val; + instance_key(key_info, &key); + + err = txn0->get(key, &val); + LOG(INFO) << "get instance_key=" << hex(key); + + if (err != TxnErrorCode::TXN_OK) { code = cast_as<ErrCategory::READ>(err); - msg = fmt::format("failed to get instance, info={}", m0); + ss << "failed to get instance, instance_id=" << instance_id << " err=" << err; + msg = ss.str(); + return; + } + + if (!instance.ParseFromString(val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = "failed to parse InstanceInfoPB"; return; } @@ -573,15 +586,15 @@ void MetaServiceImpl::create_tablets(::google::protobuf::RpcController* controll // The S3 vault would be stored inside the instance.obj_info auto s3_obj = std::find_if(instance.obj_info().begin(), instance.obj_info().end(), [&](const ObjectStoreInfoPB& obj) { - if (!obj.has_vault_name()) { + if (!obj.has_name()) { return false; } - return obj.vault_name() == name; + return obj.name() == name; }); if (s3_obj != instance.obj_info().end()) { response->set_storage_vault_id(s3_obj->id()); - response->set_storage_vault_name(s3_obj->vault_name()); + response->set_storage_vault_name(s3_obj->name()); break; } @@ -591,6 +604,7 @@ void MetaServiceImpl::create_tablets(::google::protobuf::RpcController* controll } // [index_id, schema_version] std::set<std::pair<int64_t, int32_t>> saved_schema; + TEST_SYNC_POINT_RETURN_WITH_VOID("create_tablets"); for (auto& tablet_meta : request->tablet_metas()) { internal_create_tablet(code, msg, tablet_meta, txn_kv_, instance_id, saved_schema); if (code != MetaServiceCode::OK) { @@ -1942,7 +1956,7 @@ std::pair<MetaServiceCode, std::string> MetaServiceImpl::get_instance_info( std::shared_ptr<Transaction> txn(txn0.release()); auto [c0, m0] = resource_mgr_->get_instance(txn, cloned_instance_id, instance); if (c0 != TxnErrorCode::TXN_OK) { - return {cast_as<ErrCategory::READ>(err), "failed to get instance, info=" + m0}; + return {cast_as<ErrCategory::READ>(c0), "failed to get instance, info=" + m0}; } // maybe do not decrypt ak/sk? diff --git a/cloud/src/meta-service/meta_service_resource.cpp b/cloud/src/meta-service/meta_service_resource.cpp index 3465c765595..9b2028bab5e 100644 --- a/cloud/src/meta-service/meta_service_resource.cpp +++ b/cloud/src/meta-service/meta_service_resource.cpp @@ -460,8 +460,13 @@ void MetaServiceImpl::alter_obj_store_info(google::protobuf::RpcController* cont std::string plain_ak = obj.has_ak() ? obj.ak() : ""; std::string plain_sk = obj.has_sk() ? obj.sk() : ""; - if (encrypt_ak_sk_helper(plain_ak, plain_sk, &encryption_info, &cipher_ak_sk_pair, code, - msg) != 0) { + auto ret = encrypt_ak_sk_helper(plain_ak, plain_sk, &encryption_info, &cipher_ak_sk_pair, + code, msg); + { + [[maybe_unused]] std::tuple ak_sk_ret {&ret, &code, &msg}; + TEST_SYNC_POINT_CALLBACK("alter_obj_store_info_encrypt_ak_sk_helper", &ak_sk_ret); + } + if (ret != 0) { return; } ak = cipher_ak_sk_pair.first; @@ -649,7 +654,7 @@ void MetaServiceImpl::alter_obj_store_info(google::protobuf::RpcController* cont last_item.set_provider(request->obj().provider()); last_item.set_sse_enabled(instance.sse_enabled()); if (last_item.id() == BUILT_IN_STORAGE_VAULT_ID) { - last_item.set_vault_name(BUILT_IN_STORAGE_VAULT_NAME); + last_item.set_name(BUILT_IN_STORAGE_VAULT_NAME); instance.set_default_storage_vault_name(BUILT_IN_STORAGE_VAULT_NAME); instance.set_default_storage_vault_id(BUILT_IN_STORAGE_VAULT_ID); } @@ -982,7 +987,7 @@ static int create_instance_with_object_info(InstanceInfoPB& instance, const Obje obj_info.set_mtime(time); obj_info.set_sse_enabled(sse_enabled); if (obj_info.id() == BUILT_IN_STORAGE_VAULT_ID) { - obj_info.set_vault_name(BUILT_IN_STORAGE_VAULT_NAME); + obj_info.set_name(BUILT_IN_STORAGE_VAULT_NAME); instance.set_default_storage_vault_name(BUILT_IN_STORAGE_VAULT_NAME); instance.set_default_storage_vault_id(BUILT_IN_STORAGE_VAULT_ID); } diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index 93d4ce830f8..a9b40ad1543 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -336,7 +336,7 @@ TEST(MetaServiceTest, CreateInstanceTest) { req.set_name("test_name"); HdfsVaultInfo hdfs; HdfsBuildConf conf; - conf.set_fs_name("test_name_node"); + conf.set_fs_name("hdfs://127.0.0.1:8020"); conf.set_user("test_user"); hdfs.mutable_build_conf()->CopyFrom(conf); req.mutable_hdfs_info()->CopyFrom(hdfs); @@ -5109,6 +5109,99 @@ TEST(MetaServiceTest, NormalizeHdfsConfTest) { EXPECT_EQ(fs_name, "hdfs://127.0.0.1:8020"); } +TEST(MetaServiceTest, AddObjInfoTest) { + auto meta_service = get_meta_service(); + + auto sp = cloud::SyncPoint::get_instance(); + sp->enable_processing(); + sp->set_call_back("encrypt_ak_sk:get_encryption_key_ret", + [](void* p) { *reinterpret_cast<int*>(p) = 0; }); + sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](void* p) { + *reinterpret_cast<std::string*>(p) = "selectdbselectdbselectdbselectdb"; + }); + sp->set_call_back("encrypt_ak_sk:get_encryption_key_id", + [](void* p) { *reinterpret_cast<int*>(p) = 1; }); + + std::unique_ptr<Transaction> txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + std::string key; + std::string val; + InstanceKeyInfo key_info {"test_instance"}; + instance_key(key_info, &key); + + InstanceInfoPB instance; + val = instance.SerializeAsString(); + txn->put(key, val); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + + auto get_test_instance = [&](InstanceInfoPB& i) { + std::string key; + std::string val; + std::unique_ptr<Transaction> txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + InstanceKeyInfo key_info {"test_instance"}; + instance_key(key_info, &key); + ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK); + i.ParseFromString(val); + }; + + // update failed + { + AlterObjStoreInfoRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_op(AlterObjStoreInfoRequest::ADD_OBJ_INFO); + + brpc::Controller cntl; + AlterObjStoreInfoResponse res; + meta_service->alter_obj_store_info( + reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) << res.status().msg(); + } + + // update successful + { + AlterObjStoreInfoRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_op(AlterObjStoreInfoRequest::ADD_OBJ_INFO); + auto sp = SyncPoint::get_instance(); + sp->set_call_back("create_instance_with_object_info", + [](void* p) { *reinterpret_cast<int*>(p) = 0; }); + sp->set_call_back("create_instance_with_object_info::pred", + [](void* p) { *((bool*)p) = true; }); + sp->enable_processing(); + + ObjectStoreInfoPB obj_info; + obj_info.set_ak("ak"); + obj_info.set_sk("sk"); + obj_info.set_bucket("bucket"); + obj_info.set_prefix("prefix"); + obj_info.set_endpoint("endpoint"); + obj_info.set_region("region"); + obj_info.set_provider(ObjectStoreInfoPB::Provider::ObjectStoreInfoPB_Provider_COS); + obj_info.set_external_endpoint("external"); + req.mutable_obj()->MergeFrom(obj_info); + + brpc::Controller cntl; + AlterObjStoreInfoResponse res; + meta_service->alter_obj_store_info( + reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); + + InstanceInfoPB instance; + get_test_instance(instance); + const auto& obj = instance.obj_info().at(0); + ASSERT_EQ(obj.id(), "1"); + ASSERT_EQ(obj.name(), "built_in_storage_vault"); + + sp->clear_all_call_backs(); + sp->clear_trace(); + sp->disable_processing(); + } + + SyncPoint::get_instance()->disable_processing(); + SyncPoint::get_instance()->clear_all_call_backs(); +} + TEST(MetaServiceTest, AddHdfsInfoTest) { auto meta_service = get_meta_service(); @@ -5193,6 +5286,26 @@ TEST(MetaServiceTest, AddHdfsInfoTest) { ASSERT_EQ(*(instance.storage_vault_names().begin()), "test_alter_add_hdfs_info"); } + // update failed because duplicate name + { + AlterObjStoreInfoRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_op(AlterObjStoreInfoRequest::ADD_HDFS_INFO); + StorageVaultPB hdfs; + hdfs.set_name("test_alter_add_hdfs_info"); + HdfsVaultInfo params; + params.mutable_build_conf()->set_fs_name("hdfs://ip:port"); + + hdfs.mutable_hdfs_info()->CopyFrom(params); + req.mutable_hdfs()->CopyFrom(hdfs); + + brpc::Controller cntl; + AlterObjStoreInfoResponse res; + meta_service->alter_obj_store_info( + reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED) << res.status().msg(); + } + // to test if the vault id is expected { AlterObjStoreInfoRequest req; @@ -5339,6 +5452,9 @@ TEST(MetaServiceTest, DropHdfsInfoTest) { StorageVaultPB hdfs; hdfs.set_name("test_alter_add_hdfs_info"); HdfsVaultInfo params; + HdfsBuildConf conf; + conf.set_fs_name("hdfs://127.0.0.1:8020"); + params.mutable_build_conf()->MergeFrom(conf); hdfs.mutable_hdfs_info()->CopyFrom(params); req.mutable_hdfs()->CopyFrom(hdfs); @@ -5363,6 +5479,9 @@ TEST(MetaServiceTest, DropHdfsInfoTest) { StorageVaultPB hdfs; hdfs.set_name("test_alter_add_hdfs_info_1"); HdfsVaultInfo params; + HdfsBuildConf conf; + conf.set_fs_name("hdfs://127.0.0.1:8020"); + params.mutable_build_conf()->MergeFrom(conf); hdfs.mutable_hdfs_info()->CopyFrom(params); req.mutable_hdfs()->CopyFrom(hdfs); @@ -5386,6 +5505,9 @@ TEST(MetaServiceTest, DropHdfsInfoTest) { StorageVaultPB hdfs; hdfs.set_name("test_alter_add_hdfs_info_2"); HdfsVaultInfo params; + HdfsBuildConf conf; + conf.set_fs_name("hdfs://127.0.0.1:8020"); + params.mutable_build_conf()->MergeFrom(conf); hdfs.mutable_hdfs_info()->CopyFrom(params); req.mutable_hdfs()->CopyFrom(hdfs); @@ -5444,6 +5566,9 @@ TEST(MetaServiceTest, DropHdfsInfoTest) { StorageVaultPB hdfs; hdfs.set_name("test_alter_add_hdfs_info_3"); HdfsVaultInfo params; + HdfsBuildConf conf; + conf.set_fs_name("hdfs://127.0.0.1:8020"); + params.mutable_build_conf()->MergeFrom(conf); hdfs.mutable_hdfs_info()->CopyFrom(params); req.mutable_hdfs()->CopyFrom(hdfs); @@ -5487,7 +5612,7 @@ TEST(MetaServiceTest, GetDefaultVaultTest) { req.set_name("test_name"); HdfsVaultInfo hdfs; HdfsBuildConf conf; - conf.set_fs_name("test_name_node"); + conf.set_fs_name("hdfs://127.0.0.1:8020"); conf.set_user("test_user"); hdfs.mutable_build_conf()->CopyFrom(conf); req.mutable_hdfs_info()->CopyFrom(hdfs); @@ -5558,6 +5683,7 @@ TEST(MetaServiceTest, GetDefaultVaultTest) { get_test_instance(i, instance_id); ASSERT_EQ(i.default_storage_vault_id(), "1"); ASSERT_EQ(i.default_storage_vault_name(), "built_in_storage_vault"); + ASSERT_EQ(i.obj_info().at(0).name(), "built_in_storage_vault"); sp->clear_all_call_backs(); sp->clear_trace(); sp->disable_processing(); @@ -5586,7 +5712,7 @@ TEST(MetaServiceTest, SetDefaultVaultTest) { req.set_name("test_name"); HdfsVaultInfo hdfs; HdfsBuildConf conf; - conf.set_fs_name("test_name_node"); + conf.set_fs_name("hdfs://127.0.0.1:8020"); conf.set_user("test_user"); hdfs.mutable_build_conf()->CopyFrom(conf); req.mutable_hdfs_info()->CopyFrom(hdfs); @@ -5616,6 +5742,9 @@ TEST(MetaServiceTest, SetDefaultVaultTest) { auto name = fmt::format("test_alter_add_hdfs_info_{}", i); hdfs.set_name(name); HdfsVaultInfo params; + HdfsBuildConf conf; + conf.set_fs_name("hdfs://127.0.0.1:8020"); + params.mutable_build_conf()->MergeFrom(conf); hdfs.mutable_hdfs_info()->CopyFrom(params); req.mutable_hdfs()->CopyFrom(hdfs); @@ -5724,6 +5853,9 @@ TEST(MetaServiceTest, GetObjStoreInfoTest) { auto name = fmt::format("test_alter_add_hdfs_info_{}", i); hdfs.set_name(name); HdfsVaultInfo params; + HdfsBuildConf conf; + conf.set_fs_name("hdfs://127.0.0.1:8020"); + params.mutable_build_conf()->MergeFrom(conf); hdfs.mutable_hdfs_info()->CopyFrom(params); req.mutable_hdfs()->CopyFrom(hdfs); @@ -5770,6 +5902,143 @@ TEST(MetaServiceTest, GetObjStoreInfoTest) { SyncPoint::get_instance()->clear_all_call_backs(); } +TEST(MetaServiceTest, CreateTabletsVaultsTest) { + auto meta_service = get_meta_service(); + + auto sp = cloud::SyncPoint::get_instance(); + sp->enable_processing(); + sp->set_call_back("encrypt_ak_sk:get_encryption_key_ret", + [](void* p) { *reinterpret_cast<int*>(p) = 0; }); + sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](void* p) { + *reinterpret_cast<std::string*>(p) = "selectdbselectdbselectdbselectdb"; + }); + sp->set_call_back("encrypt_ak_sk:get_encryption_key_id", + [](void* p) { *reinterpret_cast<int*>(p) = 1; }); + + std::unique_ptr<Transaction> txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + std::string key; + std::string val; + InstanceKeyInfo key_info {"test_instance"}; + instance_key(key_info, &key); + + InstanceInfoPB instance; + val = instance.SerializeAsString(); + txn->put(key, val); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + + auto get_test_instance = [&](InstanceInfoPB& i) { + std::string key; + std::string val; + std::unique_ptr<Transaction> txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + InstanceKeyInfo key_info {"test_instance"}; + instance_key(key_info, &key); + ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK); + i.ParseFromString(val); + }; + + // tablet_metas_size is 0 + { + CreateTabletsRequest request; + request.set_cloud_unique_id("test_cloud_unique_id"); + + brpc::Controller cntl; + CreateTabletsResponse response; + meta_service->create_tablets(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &request, &response, nullptr); + ASSERT_EQ(response.status().code(), MetaServiceCode::INVALID_ARGUMENT) + << response.status().msg(); + } + + // try to use default + { + CreateTabletsRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_storage_vault_name(""); + req.add_tablet_metas(); + + brpc::Controller cntl; + CreateTabletsResponse res; + meta_service->create_tablets(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + // failed because no default + ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) << res.status().msg(); + } + + // Create One Hdfs info as built_in vault + { + AlterObjStoreInfoRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_op(AlterObjStoreInfoRequest::ADD_HDFS_INFO); + StorageVaultPB hdfs; + hdfs.set_name("test_alter_add_hdfs_info"); + HdfsVaultInfo params; + params.mutable_build_conf()->set_fs_name("hdfs://ip:port"); + + hdfs.mutable_hdfs_info()->CopyFrom(params); + req.mutable_hdfs()->CopyFrom(hdfs); + + brpc::Controller cntl; + AlterObjStoreInfoResponse res; + meta_service->alter_obj_store_info( + reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); + + InstanceInfoPB i; + get_test_instance(i); + ASSERT_EQ(i.default_storage_vault_id(), "1"); + ASSERT_EQ(i.default_storage_vault_name(), "built_in_storage_vault"); + } + + // try to use default vault + { + CreateTabletsRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_storage_vault_name(""); + req.add_tablet_metas(); + + auto sp = SyncPoint::get_instance(); + sp->set_call_back("create_tablets::pred", + [](void* pred) { *reinterpret_cast<bool*>(pred) = true; }); + sp->enable_processing(); + + brpc::Controller cntl; + CreateTabletsResponse res; + meta_service->create_tablets(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); + ASSERT_EQ(res.storage_vault_id(), "1"); + ASSERT_EQ(res.storage_vault_name(), "built_in_storage_vault"); + + sp->clear_call_back("create_tablets::pred"); + } + + // try to use one non-existent vault + { + CreateTabletsRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_storage_vault_name("non-existent"); + req.add_tablet_metas(); + + auto sp = SyncPoint::get_instance(); + sp->set_call_back("create_tablets::pred", + [](void* pred) { *reinterpret_cast<bool*>(pred) = true; }); + sp->enable_processing(); + + brpc::Controller cntl; + CreateTabletsResponse res; + meta_service->create_tablets(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) << res.status().msg(); + + sp->clear_call_back("create_tablets::pred"); + } + + SyncPoint::get_instance()->disable_processing(); + SyncPoint::get_instance()->clear_all_call_backs(); +} + TEST(MetaServiceTest, UpdateAkSkTest) { auto meta_service = get_meta_service(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVault.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVault.java index 34668f3cd06..481ca8d2a84 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVault.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVault.java @@ -170,11 +170,11 @@ public abstract class StorageVault { Cloud.ObjectStoreInfoPB.Builder builder = Cloud.ObjectStoreInfoPB.newBuilder(); builder.mergeFrom(info); List<String> row = new ArrayList<>(); - row.add(info.getVaultName()); + row.add(info.getName()); row.add(info.getId()); TextFormat.Printer printer = TextFormat.printer(); builder.clearId(); - builder.clearVaultName(); + builder.clearName(); builder.setSk("xxxxxxx"); row.add(printer.shortDebugString(builder)); return row; diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 95d79b18e9f..f9080a978b1 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -190,13 +190,15 @@ message ObjectStoreInfoPB { optional string user_id = 13; optional EncryptionInfoPB encryption_info = 14; optional bool sse_enabled = 15; - optional string vault_name = 16; + optional string name = 16; } +// The legacy OObjectStoreInfoPBbjectStoreInfoPB is stored in instnaceinfopb message StorageVaultPB { optional string id = 1; optional string name = 2; optional HdfsVaultInfo hdfs_info = 3; // HdfsResource + reserved 4; // reserved for S3. } message DefaultStorageVaultInfo { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org