This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 44b2e4f5826 [improvement](cloud) Accelerate creating table by batching RPC In cloud (#36786) 44b2e4f5826 is described below commit 44b2e4f5826d265ce1ea3ea6d31ecb93283d78e4 Author: deardeng <565620...@qq.com> AuthorDate: Sun Jul 14 23:07:37 2024 +0800 [improvement](cloud) Accelerate creating table by batching RPC In cloud (#36786) Create dynamic partitions, optimize from calling prepare rpc and commit rpc multiple times to only calling prepare and commit once ``` in one server(fe/ms) before optimize: Query OK, 0 rows affected (16.96 sec) Query OK, 0 rows affected (16.65 sec) Query OK, 0 rows affected (16.26 sec) Query OK, 0 rows affected (16.39 sec) Query OK, 0 rows affected (17.15 sec) after optimize: Query OK, 0 rows affected (13.31 sec) Query OK, 0 rows affected (11.71 sec) Query OK, 0 rows affected (11.81 sec) Query OK, 0 rows affected (11.55 sec) Query OK, 0 rows affected (12.07 sec) CREATE TABLE income_statistics_service ( statistics_date date , lyy_distributor_id bigint , lyy_equipment_group_id bigint , lyy_equipment_id bigint , service_type_id bigint , equipment_value varchar(40), wechat_pay_amount decimal, wechat_refund_amount decimal, alipay_pay_amount decimal, alipay_refund_amount decimal, union_pay_amount decimal, union_refund_amount decimal, app_pay_amount decimal, app_refund_amount decimal, ad_amount decimal, ad_distributor_count bigint, offline_coins bigint, online_coins bigint, online_startup_count bigint, gift_consumption_amount decimal, gift_consumption_count bigint, wechat_pay_coins bigint, alipay_pay_coins bigint, online_red_coins bigint, game_online_amount decimal, game_gift_consumption_amount decimal, game_gift_consumption_count bigint, rd_gift_consumption_count bigint, rd_gift_consumption_amount decimal, online_amount decimal, offline_amount decimal, offline_count bigint, wechat_pay_count bigint, alipay_pay_count bigint, union_pay_count bigint, wechat_refund_count bigint, alipay_refund_count bigint, union_refund_count bigint, app_pay_count bigint, app_refund_count bigint, created datetime, updated datetime, service_type_value varchar(32), service_type_name varchar(32), jd_pay_amount decimal , jd_refund_amount decimal , jd_pay_count bigint , jd_refund_count bigint , lyy_factory_id bigint, purse_amount decimal , purse_refund_amount decimal , purse_count bigint , purse_refund_count bigint , boost_pay_amount decimal , boost_refund_amount decimal , boost_pay_count bigint , boost_refund_count bigint , grabpay_pay_amount decimal , grabpay_refund_amount decimal , grabpay_pay_count bigint , grabpay_refund_count bigint , maybank_pay_amount decimal , maybank_refund_amount decimal , maybank_pay_count bigint , maybank_refund_count bigint , pay_start_amount decimal , exchange_amount decimal , auto_refund_count bigint , auto_refund_amount decimal , manual_refund_count bigint , manual_refund_amount decimal , custom_service_fee decimal, value_added_service_fee decimal, INDEX idx_statistics_date (`statistics_date`) USING BITMAP COMMENT '', INDEX idx_lyy_distributor_id (`lyy_distributor_id`) USING BITMAP COMMENT '', INDEX idx_lyy_equipment_group_id (`lyy_equipment_group_id`) USING BITMAP COMMENT '', INDEX idx_lyy_equipment_id (`lyy_equipment_id`) USING BITMAP COMMENT '', INDEX idx_service_type_id (`service_type_id`) USING BITMAP COMMENT '' ) ENGINE = OLAP UNIQUE KEY( `statistics_date`, `lyy_distributor_id`, `lyy_equipment_group_id`, `lyy_equipment_id`, `service_type_id` ) COMMENT 'OLAP' PARTITION BY RANGE(`statistics_date`)() DISTRIBUTED BY HASH(`lyy_distributor_id`) BUCKETS 6 PROPERTIES ( "file_cache_ttl_seconds" = "0", "bloom_filter_columns" = "statistics_date, lyy_distributor_id, lyy_equipment_group_id, lyy_equipment_id, service_type_id", "dynamic_partition.enable" = "true", "dynamic_partition.time_unit" = "DAY", "dynamic_partition.time_zone" = "Asia/Shanghai", "dynamic_partition.start" = "-730", "dynamic_partition.end" = "3", "dynamic_partition.prefix" = "p", "dynamic_partition.buckets" = "2", "dynamic_partition.create_history_partition" = "true", "dynamic_partition.history_partition_num" = "-1", "dynamic_partition.hot_partition_num" = "0", "dynamic_partition.reserved_history_periods" = "NULL", "enable_unique_key_merge_on_write" = "true", "light_schema_change" = "true" ); show partitions from income_statistics_service; ...... 734 rows in set (1.06 sec) ``` --------- Co-authored-by: Gavin Chou <gavineaglec...@gmail.com> --- cloud/src/common/bvars.cpp | 2 + cloud/src/common/bvars.h | 1 + cloud/src/meta-service/meta_service.h | 8 ++ cloud/src/meta-service/meta_service_partition.cpp | 114 +++++++++++++++++- cloud/test/meta_service_test.cpp | 64 ++++++++++ .../main/java/org/apache/doris/common/Config.java | 12 +- .../main/java/org/apache/doris/alter/Alter.java | 2 +- .../main/java/org/apache/doris/catalog/Env.java | 19 ++- .../doris/clone/DynamicPartitionScheduler.java | 130 ++++++++++++++++++++- .../cloud/datasource/CloudInternalCatalog.java | 88 +++++++++++++- .../apache/doris/cloud/rpc/MetaServiceClient.java | 4 + .../apache/doris/cloud/rpc/MetaServiceProxy.java | 9 ++ .../apache/doris/datasource/InternalCatalog.java | 92 +++++++++------ .../org/apache/doris/mtmv/MTMVPartitionUtil.java | 3 +- .../apache/doris/service/FrontendServiceImpl.java | 2 +- .../doris/alter/InternalSchemaAlterTest.java | 4 +- .../apache/doris/catalog/CreateTableLikeTest.java | 2 + .../org/apache/doris/catalog/CreateTableTest.java | 2 + .../apache/doris/catalog/ModifyBackendTest.java | 2 + .../java/org/apache/doris/catalog/RecoverTest.java | 2 + .../org/apache/doris/planner/QueryPlanTest.java | 1 + gensrc/proto/cloud.proto | 25 ++++ 22 files changed, 537 insertions(+), 51 deletions(-) diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp index 43acb47e365..dc401398f68 100644 --- a/cloud/src/common/bvars.cpp +++ b/cloud/src/common/bvars.cpp @@ -79,6 +79,8 @@ BvarLatencyRecorderWithTag g_bvar_ms_finish_tablet_job("ms", "finish_tablet_job" BvarLatencyRecorderWithTag g_bvar_ms_get_cluster_status("ms", "get_cluster_status"); BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status("ms", "set_cluster_status"); +BvarLatencyRecorderWithTag g_bvar_ms_check_kv("ms", "check_kv"); + // txn_kv's bvars bvar::LatencyRecorder g_bvar_txn_kv_get("txn_kv", "get"); bvar::LatencyRecorder g_bvar_txn_kv_range_get("txn_kv", "range_get"); diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h index e5b50262104..f2957e35940 100644 --- a/cloud/src/common/bvars.h +++ b/cloud/src/common/bvars.h @@ -174,6 +174,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status; extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance; extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach; extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id; +extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv; // txn_kv's bvars extern bvar::LatencyRecorder g_bvar_txn_kv_get; diff --git a/cloud/src/meta-service/meta_service.h b/cloud/src/meta-service/meta_service.h index 0346afc36b3..e2360e9e6ba 100644 --- a/cloud/src/meta-service/meta_service.h +++ b/cloud/src/meta-service/meta_service.h @@ -141,6 +141,9 @@ public: void drop_index(::google::protobuf::RpcController* controller, const IndexRequest* request, IndexResponse* response, ::google::protobuf::Closure* done) override; + void check_kv(::google::protobuf::RpcController* controller, const CheckKVRequest* request, + CheckKVResponse* response, ::google::protobuf::Closure* done) override; + void prepare_partition(::google::protobuf::RpcController* controller, const PartitionRequest* request, PartitionResponse* response, ::google::protobuf::Closure* done) override; @@ -422,6 +425,11 @@ public: call_impl(&cloud::MetaService::drop_index, controller, request, response, done); } + void check_kv(::google::protobuf::RpcController* controller, const CheckKVRequest* request, + CheckKVResponse* response, ::google::protobuf::Closure* done) override { + call_impl(&cloud::MetaService::check_kv, controller, request, response, done); + } + void prepare_partition(::google::protobuf::RpcController* controller, const PartitionRequest* request, PartitionResponse* response, ::google::protobuf::Closure* done) override { diff --git a/cloud/src/meta-service/meta_service_partition.cpp b/cloud/src/meta-service/meta_service_partition.cpp index aae11be5edd..d165b8b5e06 100644 --- a/cloud/src/meta-service/meta_service_partition.cpp +++ b/cloud/src/meta-service/meta_service_partition.cpp @@ -25,7 +25,9 @@ #include "meta_service.h" namespace doris::cloud { - +using check_create_table_type = std::function<const std::tuple< + const ::google::protobuf::RepeatedField<int64_t>, std::string, + std::function<std::string(std::string, int64_t)>>(const CheckKVRequest* request)>; // ATTN: xxx_id MUST NOT be reused // // UNKNOWN @@ -67,6 +69,11 @@ static TxnErrorCode index_exists(Transaction* txn, const std::string& instance_i return it->has_next() ? TxnErrorCode::TXN_OK : TxnErrorCode::TXN_KEY_NOT_FOUND; } +static TxnErrorCode check_recycle_key_exist(Transaction* txn, const std::string& key) { + std::string val; + return txn->get(key, &val); +} + void MetaServiceImpl::prepare_index(::google::protobuf::RpcController* controller, const IndexRequest* request, IndexResponse* response, ::google::protobuf::Closure* done) { @@ -614,4 +621,109 @@ void MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll } } +void check_create_table(std::string instance_id, std::shared_ptr<TxnKv> txn_kv, + const CheckKVRequest* request, CheckKVResponse* response, + MetaServiceCode* code, std::string* msg, + check_create_table_type get_check_info) { + StopWatch watch; + std::unique_ptr<Transaction> txn; + TxnErrorCode err = txn_kv->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + *code = cast_as<ErrCategory::READ>(err); + *msg = "failed to create txn"; + return; + } + auto& [keys, hint, key_func] = get_check_info(request); + if (keys.empty()) { + *code = MetaServiceCode::INVALID_ARGUMENT; + *msg = "empty keys"; + return; + } + + for (int i = 0; i < keys.size();) { + auto key = key_func(instance_id, keys.Get(i)); + err = check_recycle_key_exist(txn.get(), key); + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + i++; + continue; + } else if (err == TxnErrorCode::TXN_OK) { + // find not match, prepare commit + *code = MetaServiceCode::UNDEFINED_ERR; + *msg = "prepare and commit rpc not match, recycle key remained"; + return; + } else if (err == TxnErrorCode::TXN_TOO_OLD) { + // separate it to several txn for rubustness + txn.reset(); + TxnErrorCode err = txn_kv->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + *code = cast_as<ErrCategory::READ>(err); + *msg = "failed to create txn in cycle"; + return; + } + LOG_INFO("meet txn too long err, gen a new txn, and retry, size={} idx={}", keys.size(), + i); + bthread_usleep(50); + continue; + } else { + // err != TXN_OK, fdb read err + *code = cast_as<ErrCategory::READ>(err); + *msg = fmt::format("ms read key error: {}", err); + return; + } + } + LOG_INFO("check {} success key.size={}, cost(us)={}", hint, keys.size(), watch.elapsed_us()); +} + +void MetaServiceImpl::check_kv(::google::protobuf::RpcController* controller, + const CheckKVRequest* request, CheckKVResponse* response, + ::google::protobuf::Closure* done) { + RPC_PREPROCESS(check_kv); + instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); + if (instance_id.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "empty instance_id"; + return; + } + if (!request->has_op()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "op not given"; + return; + } + if (!request->has_check_keys()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "empty check keys"; + return; + } + RPC_RATE_LIMIT(check_kv); + switch (request->op()) { + case CheckKVRequest::CREATE_INDEX_AFTER_FE_COMMIT: { + check_create_table(instance_id, txn_kv_, request, response, &code, &msg, + [](const CheckKVRequest* request) { + return std::make_tuple( + request->check_keys().index_ids(), "index", + [](std::string instance_id, int64_t id) { + return recycle_index_key({std::move(instance_id), id}); + }); + }); + break; + } + case CheckKVRequest::CREATE_PARTITION_AFTER_FE_COMMIT: { + check_create_table( + instance_id, txn_kv_, request, response, &code, &msg, + [](const CheckKVRequest* request) { + return std::make_tuple( + request->check_keys().partition_ids(), "partition", + [](std::string instance_id, int64_t id) { + return recycle_partition_key({std::move(instance_id), id}); + }); + }); + break; + } + default: + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "not support op"; + return; + }; +} + } // namespace doris::cloud diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index a2bf25187aa..850f8cfbabd 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -5295,6 +5295,70 @@ TEST(MetaServiceTest, PartitionRequest) { ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); ASSERT_EQ(txn->get(partition_key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND); req.add_index_ids(index_id); + // ------------Test check partition----------- + // Normal + req.set_db_id(1); + req.set_table_id(table_id + 1); + req.add_index_ids(index_id + 1); + req.add_partition_ids(partition_id + 1); + meta_service->prepare_partition(&ctrl, &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + meta_service->commit_partition(&ctrl, &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + CheckKVRequest req_check; + CheckKVResponse res_check; + meta_service->check_kv(&ctrl, &req_check, &res_check, nullptr); + ASSERT_EQ(res_check.status().code(), MetaServiceCode::INVALID_ARGUMENT); + res_check.Clear(); + req_check.set_op(CheckKVRequest::CREATE_PARTITION_AFTER_FE_COMMIT); + CheckKeyInfos check_keys_pb; + check_keys_pb.add_table_ids(table_id + 1); + check_keys_pb.add_index_ids(index_id + 1); + check_keys_pb.add_partition_ids(partition_id + 1); + req_check.mutable_check_keys()->CopyFrom(check_keys_pb); + meta_service->check_kv(&ctrl, &req_check, &res_check, nullptr); + ASSERT_EQ(res_check.status().code(), MetaServiceCode::OK); + res_check.Clear(); + // AbNomal not commit + req.Clear(); + req.set_db_id(1); + req.set_table_id(table_id + 2); + req.add_index_ids(index_id + 2); + req.add_partition_ids(partition_id + 2); + meta_service->prepare_partition(&ctrl, &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + req_check.Clear(); + req_check.set_op(CheckKVRequest::CREATE_PARTITION_AFTER_FE_COMMIT); + check_keys_pb.Clear(); + check_keys_pb.add_table_ids(table_id + 2); + check_keys_pb.add_index_ids(index_id + 2); + check_keys_pb.add_partition_ids(partition_id + 2); + req_check.mutable_check_keys()->CopyFrom(check_keys_pb); + meta_service->check_kv(&ctrl, &req_check, &res_check, nullptr); + ASSERT_EQ(res_check.status().code(), MetaServiceCode::UNDEFINED_ERR); + + // ------------Test check index----------- + // Normal + IndexRequest req_index; + IndexResponse res_index; + req_index.set_db_id(1); + req_index.set_table_id(table_id + 3); + req_index.add_index_ids(index_id + 3); + meta_service->prepare_index(&ctrl, &req_index, &res_index, nullptr); + ASSERT_EQ(res_index.status().code(), MetaServiceCode::OK); + meta_service->commit_index(&ctrl, &req_index, &res_index, nullptr); + ASSERT_EQ(res_index.status().code(), MetaServiceCode::OK); + req_check.Clear(); + res_check.Clear(); + req_check.set_op(CheckKVRequest::CREATE_INDEX_AFTER_FE_COMMIT); + check_keys_pb.Clear(); + check_keys_pb.add_table_ids(table_id + 3); + check_keys_pb.add_index_ids(index_id + 3); + req_check.mutable_check_keys()->CopyFrom(check_keys_pb); + meta_service->check_kv(&ctrl, &req_check, &res_check, nullptr); + ASSERT_EQ(res_check.status().code(), MetaServiceCode::OK); + res_check.Clear(); + // ------------Test drop partition------------ reset_meta_service(); req.Clear(); diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 0467a4931d2..5756d8fbfc2 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2672,6 +2672,9 @@ public class Config extends ConfigBase { "Should the request content be logged before each request starts, specifically the query statements"}) public static boolean enable_print_request_before_execution = false; + @ConfField(mutable = true) + public static boolean enable_cooldown_replica_affinity = true; + //========================================================================== // begin of cloud config //========================================================================== @@ -2900,9 +2903,12 @@ public class Config extends ConfigBase { "streamload route policy in cloud mode, availale options are public-private and empty string"}) public static String streamload_redirect_policy = ""; - @ConfField(mutable = true) - public static boolean enable_cooldown_replica_affinity = true; - + @ConfField(description = {"存算分离模式下建表是否检查残留recycler key, 默认true", + "create table in cloud mode, check recycler key remained, default true"}) + public static boolean check_create_table_recycle_key_remained = true; + // ATTN: DONOT add any config not related to cloud mode here + // ATTN: DONOT add any config not related to cloud mode here + // ATTN: DONOT add any config not related to cloud mode here //========================================================================== // end of cloud config //========================================================================== diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index 6c2f38ef101..8e8be7c567e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -483,7 +483,7 @@ public class Alter { DynamicPartitionUtil.checkAlterAllowed( (OlapTable) db.getTableOrMetaException(tableName, TableType.OLAP)); } - Env.getCurrentEnv().addPartition(db, tableName, (AddPartitionClause) alterClause); + Env.getCurrentEnv().addPartition(db, tableName, (AddPartitionClause) alterClause, false, 0, true); } else if (alterClause instanceof AddPartitionLikeClause) { if (!((AddPartitionLikeClause) alterClause).getIsTempPartition()) { DynamicPartitionUtil.checkAlterAllowed( diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 559094de442..f01d8e79af2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -3247,8 +3247,23 @@ public class Env { getInternalCatalog().createTableAsSelect(stmt); } - public void addPartition(Database db, String tableName, AddPartitionClause addPartitionClause) throws DdlException { - getInternalCatalog().addPartition(db, tableName, addPartitionClause); + /** + * Adds a partition to a table + * + * @param db + * @param tableName + * @param addPartitionClause clause in the CreateTableStmt + * @param isCreateTable this call is for creating table + * @param generatedPartitionId the preset partition id for the partition to add + * @param writeEditLog whether to write an edit log for this addition + * @return PartitionPersistInfo to be written to editlog. It may be null if no partitions added. + * @throws DdlException + */ + public PartitionPersistInfo addPartition(Database db, String tableName, AddPartitionClause addPartitionClause, + boolean isCreateTable, long generatedPartitionId, + boolean writeEditLog) throws DdlException { + return getInternalCatalog().addPartition(db, tableName, addPartitionClause, + isCreateTable, generatedPartitionId, writeEditLog); } public void addPartitionLike(Database db, String tableName, AddPartitionLikeClause addPartitionLikeClause) diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java index 17a4d34aa38..0da07c7c833 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java @@ -33,6 +33,7 @@ import org.apache.doris.catalog.DynamicPartitionProperty; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HashDistributionInfo; import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.MetaIdGenerator; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.PartitionItem; @@ -45,11 +46,14 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.common.util.AutoBucketUtils; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.DynamicPartitionUtil; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.RangeUtils; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.persist.PartitionPersistInfo; import org.apache.doris.rpc.RpcException; import org.apache.doris.thrift.TStorageMedium; @@ -622,9 +626,30 @@ public class DynamicPartitionScheduler extends MasterDaemon { } if (!skipAddPartition) { - for (AddPartitionClause addPartitionClause : addPartitionClauses) { + // get partitionIds and indexIds + List<Long> indexIds = new ArrayList<>(olapTable.getCopiedIndexIdToMeta().keySet()); + List<Long> generatedPartitionIds = new ArrayList<>(); + cloudBatchBeforeCreatePartitions(executeFirstTime, addPartitionClauses, olapTable, indexIds, + db, tableName, generatedPartitionIds); + + List<PartitionPersistInfo> partsInfo = new ArrayList<>(); + for (int i = 0; i < addPartitionClauses.size(); i++) { try { - Env.getCurrentEnv().addPartition(db, tableName, addPartitionClause); + boolean needWriteEditLog = true; + // ATTN: !executeFirstTime, needWriteEditLog + // here executeFirstTime is create table, so in cloud edit log will postpone + if (Config.isCloudMode()) { + needWriteEditLog = !executeFirstTime; + } + PartitionPersistInfo info = + Env.getCurrentEnv().addPartition(db, tableName, addPartitionClauses.get(i), + executeFirstTime, + executeFirstTime && Config.isCloudMode() ? generatedPartitionIds.get(i) : 0, + needWriteEditLog); + if (info == null) { + throw new Exception("null persisted partition returned"); + } + partsInfo.add(info); clearCreatePartitionFailedMsg(olapTable.getId()); } catch (Exception e) { recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId()); @@ -634,7 +659,108 @@ public class DynamicPartitionScheduler extends MasterDaemon { } } } + cloudBatchAfterCreatePartitions(executeFirstTime, partsInfo, + addPartitionClauses, db, olapTable, indexIds, tableName); + } + } + } + + private void cloudBatchAfterCreatePartitions(boolean executeFirstTime, List<PartitionPersistInfo> partsInfo, + ArrayList<AddPartitionClause> addPartitionClauses, Database db, + OlapTable olapTable, List<Long> indexIds, + String tableName) throws DdlException { + if (Config.isNotCloudMode()) { + return; + } + List<Long> succeedPartitionIds = partsInfo.stream().map(partitionPersistInfo + -> partitionPersistInfo.getPartition().getId()).collect(Collectors.toList()); + if (!executeFirstTime || addPartitionClauses.isEmpty()) { + LOG.info("cloud commit rpc in batch, {}-{}", !executeFirstTime, addPartitionClauses.size()); + return; + } + try { + // ATTN: failedPids = generatedPartitionIds - succeedPartitionIds, + // means some partitions failed when addPartition, failedPids will be recycled by recycler + if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.before.commitCloudPartition")) { + LOG.info("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition, throw e"); + // not commit, not log edit + throw new Exception("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition"); } + Env.getCurrentInternalCatalog().afterCreatePartitions(db.getId(), olapTable.getId(), + succeedPartitionIds, indexIds, true); + LOG.info("begin write edit log to add partitions in batch, " + + "numPartitions: {}, db: {}, table: {}, tableId: {}", + partsInfo.size(), db.getFullName(), tableName, olapTable.getId()); + // ATTN: here, edit log must after commit cloud partition, + // prevent commit RPC failure from causing data loss + if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.before.logEditPartitions")) { + LOG.info("debug point FE.DynamicPartitionScheduler.before.logEditPartitions, throw e"); + // committed, but not log edit + throw new Exception("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition"); + } + for (int i = 0; i < partsInfo.size(); i++) { + Env.getCurrentEnv().getEditLog().logAddPartition(partsInfo.get(i)); + if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.in.logEditPartitions")) { + if (i == partsInfo.size() / 2) { + LOG.info("debug point FE.DynamicPartitionScheduler.in.logEditPartitions, throw e"); + // committed, but log some edit, others failed + throw new Exception("debug point FE.DynamicPartitionScheduler" + + ".in.commitCloudPartition"); + } + } + } + LOG.info("finish write edit log to add partitions in batch, " + + "numPartitions: {}, db: {}, table: {}, tableId: {}", + partsInfo.size(), db.getFullName(), tableName, olapTable.getId()); + } catch (Exception e) { + LOG.warn("cloud in commit step, dbName {}, tableName {}, tableId {} exception {}", + db.getFullName(), tableName, olapTable.getId(), e.getMessage()); + recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId()); + throw new DdlException("cloud in commit step err"); + } + } + + private void cloudBatchBeforeCreatePartitions(boolean executeFirstTime, + ArrayList<AddPartitionClause> addPartitionClauses, + OlapTable olapTable, List<Long> indexIds, Database db, + String tableName, List<Long> generatedPartitionIds) + throws DdlException { + if (Config.isNotCloudMode()) { + return; + } + if (!executeFirstTime || addPartitionClauses.isEmpty()) { + LOG.info("cloud prepare rpc in batch, {}-{}", !executeFirstTime, addPartitionClauses.size()); + return; + } + AddPartitionClause addPartitionClause = addPartitionClauses.get(0); + DistributionDesc distributionDesc = addPartitionClause.getDistributionDesc(); + try { + DistributionInfo distributionInfo = distributionDesc + .toDistributionInfo(olapTable.getBaseSchema()); + if (distributionDesc == null) { + distributionInfo = olapTable.getDefaultDistributionInfo() + .toDistributionDesc().toDistributionInfo(olapTable.getBaseSchema()); + } + long allPartitionBufferSize = 0; + for (int i = 0; i < addPartitionClauses.size(); i++) { + long bufferSize = InternalCatalog.checkAndGetBufferSize(indexIds.size(), + distributionInfo.getBucketNum(), + addPartitionClause.getSingeRangePartitionDesc() + .getReplicaAlloc().getTotalReplicaNum(), + db, tableName); + allPartitionBufferSize += bufferSize; + } + MetaIdGenerator.IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv() + .getIdGeneratorBuffer(allPartitionBufferSize); + addPartitionClauses.forEach(p -> generatedPartitionIds.add(idGeneratorBuffer.getNextId())); + // executeFirstTime true + Env.getCurrentInternalCatalog().beforeCreatePartitions(db.getId(), olapTable.getId(), + generatedPartitionIds, indexIds, true); + } catch (Exception e) { + LOG.warn("cloud in prepare step, dbName {}, tableName {}, tableId {} indexId {} exception {}", + db.getFullName(), tableName, olapTable.getId(), indexIds, e.getMessage()); + recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId()); + throw new DdlException("cloud in prepare step err"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java index b6adfb1789b..e72690af1c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java @@ -403,7 +403,8 @@ public class CloudInternalCatalog extends InternalCatalog { } @Override - protected void beforeCreatePartitions(long dbId, long tableId, List<Long> partitionIds, List<Long> indexIds) + public void beforeCreatePartitions(long dbId, long tableId, List<Long> partitionIds, List<Long> indexIds, + boolean isCreateTable) throws DdlException { if (partitionIds == null) { prepareMaterializedIndex(tableId, indexIds, 0); @@ -413,7 +414,7 @@ public class CloudInternalCatalog extends InternalCatalog { } @Override - protected void afterCreatePartitions(long dbId, long tableId, List<Long> partitionIds, List<Long> indexIds, + public void afterCreatePartitions(long dbId, long tableId, List<Long> partitionIds, List<Long> indexIds, boolean isCreateTable) throws DdlException { if (partitionIds == null) { @@ -421,6 +422,19 @@ public class CloudInternalCatalog extends InternalCatalog { } else { commitPartition(dbId, tableId, partitionIds, indexIds); } + if (!Config.check_create_table_recycle_key_remained) { + return; + } + checkCreatePartitions(dbId, tableId, partitionIds, indexIds); + } + + private void checkCreatePartitions(long dbId, long tableId, List<Long> partitionIds, List<Long> indexIds) + throws DdlException { + if (partitionIds == null) { + checkMaterializedIndex(dbId, tableId, indexIds); + } else { + checkPartition(dbId, tableId, partitionIds); + } } private void preparePartition(long dbId, long tableId, List<Long> partitionIds, List<Long> indexIds) @@ -557,6 +571,76 @@ public class CloudInternalCatalog extends InternalCatalog { } } + private void checkPartition(long dbId, long tableId, List<Long> partitionIds) + throws DdlException { + Cloud.CheckKeyInfos.Builder checkKeyInfosBuilder = Cloud.CheckKeyInfos.newBuilder(); + checkKeyInfosBuilder.addAllPartitionIds(partitionIds); + // for ms log + checkKeyInfosBuilder.addDbIds(dbId); + checkKeyInfosBuilder.addTableIds(tableId); + + Cloud.CheckKVRequest.Builder checkKvRequestBuilder = Cloud.CheckKVRequest.newBuilder(); + checkKvRequestBuilder.setCloudUniqueId(Config.cloud_unique_id); + checkKvRequestBuilder.setCheckKeys(checkKeyInfosBuilder.build()); + checkKvRequestBuilder.setOp(Cloud.CheckKVRequest.Operation.CREATE_PARTITION_AFTER_FE_COMMIT); + final Cloud.CheckKVRequest checkKVRequest = checkKvRequestBuilder.build(); + + Cloud.CheckKVResponse response = null; + int tryTimes = 0; + while (tryTimes++ < Config.metaServiceRpcRetryTimes()) { + try { + response = MetaServiceProxy.getInstance().checkKv(checkKVRequest); + break; + } catch (RpcException e) { + LOG.warn("tryTimes:{}, checkPartition RpcException", tryTimes, e); + if (tryTimes + 1 >= Config.metaServiceRpcRetryTimes()) { + throw new DdlException(e.getMessage()); + } + } + sleepSeveralMs(); + } + + if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { + LOG.warn("checkPartition response: {} ", response); + throw new DdlException(response.getStatus().getMsg()); + } + } + + public void checkMaterializedIndex(long dbId, long tableId, List<Long> indexIds) + throws DdlException { + Cloud.CheckKeyInfos.Builder checkKeyInfosBuilder = Cloud.CheckKeyInfos.newBuilder(); + checkKeyInfosBuilder.addAllIndexIds(indexIds); + // for ms log + checkKeyInfosBuilder.addDbIds(dbId); + checkKeyInfosBuilder.addTableIds(tableId); + + Cloud.CheckKVRequest.Builder checkKvRequestBuilder = Cloud.CheckKVRequest.newBuilder(); + checkKvRequestBuilder.setCloudUniqueId(Config.cloud_unique_id); + checkKvRequestBuilder.setCheckKeys(checkKeyInfosBuilder.build()); + checkKvRequestBuilder.setOp(Cloud.CheckKVRequest.Operation.CREATE_INDEX_AFTER_FE_COMMIT); + final Cloud.CheckKVRequest checkKVRequest = checkKvRequestBuilder.build(); + + Cloud.CheckKVResponse response = null; + int tryTimes = 0; + while (tryTimes++ < Config.metaServiceRpcRetryTimes()) { + try { + response = MetaServiceProxy.getInstance().checkKv(checkKVRequest); + break; + } catch (RpcException e) { + LOG.warn("tryTimes:{}, checkIndex RpcException", tryTimes, e); + if (tryTimes + 1 >= Config.metaServiceRpcRetryTimes()) { + throw new DdlException(e.getMessage()); + } + } + sleepSeveralMs(); + } + + if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { + LOG.warn("checkIndex response: {} ", response); + throw new DdlException(response.getStatus().getMsg()); + } + } + public Cloud.CreateTabletsResponse sendCreateTabletsRpc(Cloud.CreateTabletsRequest.Builder requestBuilder) throws DdlException { requestBuilder.setCloudUniqueId(Config.cloud_unique_id); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java index f7a178deb01..d5cdc79eb7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java @@ -255,6 +255,10 @@ public class MetaServiceClient { return blockingStub.commitPartition(request); } + public Cloud.CheckKVResponse checkKv(Cloud.CheckKVRequest request) { + return blockingStub.checkKv(request); + } + public Cloud.PartitionResponse dropPartition(Cloud.PartitionRequest request) { return blockingStub.dropPartition(request); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java index 00f271099e4..d7ec3289067 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java @@ -318,6 +318,15 @@ public class MetaServiceProxy { } } + public Cloud.CheckKVResponse checkKv(Cloud.CheckKVRequest request) throws RpcException { + try { + final MetaServiceClient client = getProxy(); + return client.checkKv(request); + } catch (Exception e) { + throw new RpcException("", e.getMessage(), e); + } + } + public Cloud.IndexResponse dropIndex(Cloud.IndexRequest request) throws RpcException { try { final MetaServiceClient client = getProxy(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index e13316c3e76..910df9e1735 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1479,7 +1479,7 @@ public class InternalCatalog implements CatalogIf<Database> { } finally { table.readUnlock(); } - addPartition(db, tableName, clause); + addPartition(db, tableName, clause, false, 0, true); } catch (UserException e) { throw new DdlException("Failed to ADD PARTITION " + addPartitionLikeClause.getPartitionName() @@ -1487,7 +1487,25 @@ public class InternalCatalog implements CatalogIf<Database> { } } - public void addPartition(Database db, String tableName, AddPartitionClause addPartitionClause) throws DdlException { + public static long checkAndGetBufferSize(long indexNum, long bucketNum, + long replicaNum, Database db, String tableName) throws DdlException { + long totalReplicaNum = indexNum * bucketNum * replicaNum; + if (Config.isNotCloudMode() && totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) { + throw new DdlException("Database " + db.getFullName() + " table " + tableName + " add partition increasing " + + totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]"); + } + return 1 + totalReplicaNum + indexNum * bucketNum; + } + + public PartitionPersistInfo addPartition(Database db, String tableName, AddPartitionClause addPartitionClause, + boolean isCreateTable, long generatedPartitionId, + boolean writeEditLog) throws DdlException { + // in cloud mode, isCreateTable == true, create dynamic partition use, so partitionId must have been generated. + // isCreateTable == false, other case, partitionId generate in below, must be set 0 + if (!FeConstants.runningUnitTest && Config.isCloudMode() + && (isCreateTable && generatedPartitionId == 0) || (!isCreateTable && generatedPartitionId != 0)) { + throw new DdlException("not impossible"); + } SinglePartitionDesc singlePartitionDesc = addPartitionClause.getSingeRangePartitionDesc(); DistributionDesc distributionDesc = addPartitionClause.getDistributionDesc(); boolean isTempPartition = addPartitionClause.isTempPartition(); @@ -1510,7 +1528,7 @@ public class InternalCatalog implements CatalogIf<Database> { if (singlePartitionDesc.isSetIfNotExists()) { LOG.info("table[{}] add partition[{}] which already exists", olapTable.getName(), partitionName); if (!DebugPointUtil.isEnable("InternalCatalog.addPartition.noCheckExists")) { - return; + return null; } } else { ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION, partitionName); @@ -1660,17 +1678,10 @@ public class InternalCatalog implements CatalogIf<Database> { DataProperty dataProperty = singlePartitionDesc.getPartitionDataProperty(); Preconditions.checkNotNull(dataProperty); // check replica quota if this operation done - long indexNum = indexIdToMeta.size(); - long bucketNum = distributionInfo.getBucketNum(); - long replicaNum = singlePartitionDesc.getReplicaAlloc().getTotalReplicaNum(); - long totalReplicaNum = indexNum * bucketNum * replicaNum; - if (Config.isNotCloudMode() && totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) { - throw new DdlException("Database " + db.getFullName() + " table " + tableName + " add partition increasing " - + totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]"); - } - Set<Long> tabletIdSet = new HashSet<>(); - long bufferSize = 1 + totalReplicaNum + indexNum * bucketNum; + long bufferSize = checkAndGetBufferSize(indexIdToMeta.size(), distributionInfo.getBucketNum(), + singlePartitionDesc.getReplicaAlloc().getTotalReplicaNum(), db, tableName); IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv().getIdGeneratorBuffer(bufferSize); + Set<Long> tabletIdSet = new HashSet<>(); String storagePolicy = olapTable.getStoragePolicy(); if (!Strings.isNullOrEmpty(dataProperty.getStoragePolicy())) { storagePolicy = dataProperty.getStoragePolicy(); @@ -1681,10 +1692,13 @@ public class InternalCatalog implements CatalogIf<Database> { } }; try { - long partitionId = idGeneratorBuffer.getNextId(); + long partitionId = Config.isCloudMode() && !FeConstants.runningUnitTest && isCreateTable + ? generatedPartitionId : idGeneratorBuffer.getNextId(); List<Long> partitionIds = Lists.newArrayList(partitionId); - List<Long> indexIds = indexIdToMeta.keySet().stream().collect(Collectors.toList()); - beforeCreatePartitions(db.getId(), olapTable.getId(), partitionIds, indexIds); + List<Long> indexIds = new ArrayList<>(indexIdToMeta.keySet()); + if (!isCreateTable) { + beforeCreatePartitions(db.getId(), olapTable.getId(), partitionIds, indexIds, isCreateTable); + } Partition partition = createPartitionWithIndices(db.getId(), olapTable, partitionId, partitionName, indexIdToMeta, distributionInfo, dataProperty, singlePartitionDesc.getReplicaAlloc(), @@ -1693,7 +1707,6 @@ public class InternalCatalog implements CatalogIf<Database> { singlePartitionDesc.getTabletType(), storagePolicy, idGeneratorBuffer, binlogConfig, dataProperty.isStorageMediumSpecified(), null); - // TODO cluster key ids // check again olapTable = db.getOlapTableOrDdlException(tableName); @@ -1705,7 +1718,7 @@ public class InternalCatalog implements CatalogIf<Database> { LOG.info("table[{}] add partition[{}] which already exists", olapTable.getName(), partitionName); if (singlePartitionDesc.isSetIfNotExists()) { failedCleanCallback.run(); - return; + return null; } else { ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION, partitionName); } @@ -1773,25 +1786,31 @@ public class InternalCatalog implements CatalogIf<Database> { PartitionPersistInfo info = null; if (partitionInfo.getType() == PartitionType.RANGE) { info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition, - partitionInfo.getItem(partitionId).getItems(), ListPartitionItem.DUMMY_ITEM, dataProperty, - partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), - isTempPartition, partitionInfo.getIsMutable(partitionId)); + partitionInfo.getItem(partitionId).getItems(), ListPartitionItem.DUMMY_ITEM, dataProperty, + partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), + isTempPartition, partitionInfo.getIsMutable(partitionId)); } else if (partitionInfo.getType() == PartitionType.LIST) { info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition, - RangePartitionItem.DUMMY_RANGE, partitionInfo.getItem(partitionId), dataProperty, - partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), - isTempPartition, partitionInfo.getIsMutable(partitionId)); + RangePartitionItem.DUMMY_RANGE, partitionInfo.getItem(partitionId), dataProperty, + partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), + isTempPartition, partitionInfo.getIsMutable(partitionId)); } else { info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition, - RangePartitionItem.DUMMY_RANGE, ListPartitionItem.DUMMY_ITEM, dataProperty, - partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), - isTempPartition, partitionInfo.getIsMutable(partitionId)); + RangePartitionItem.DUMMY_RANGE, ListPartitionItem.DUMMY_ITEM, dataProperty, + partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), + isTempPartition, partitionInfo.getIsMutable(partitionId)); } - afterCreatePartitions(db.getId(), olapTable.getId(), partitionIds, indexIds, false); - Env.getCurrentEnv().getEditLog().logAddPartition(info); - - LOG.info("succeed in creating partition[{}], temp: {}", partitionId, isTempPartition); + if (!isCreateTable) { + afterCreatePartitions(db.getId(), olapTable.getId(), partitionIds, indexIds, isCreateTable); + } + if (writeEditLog) { + Env.getCurrentEnv().getEditLog().logAddPartition(info); + LOG.info("succeed in creating partition[{}], temp: {}", partitionId, isTempPartition); + } else { + LOG.info("postpone creating partition[{}], temp: {}", partitionId, isTempPartition); + } + return info; } finally { olapTable.writeUnlock(); } @@ -2144,11 +2163,12 @@ public class InternalCatalog implements CatalogIf<Database> { return partition; } - protected void beforeCreatePartitions(long dbId, long tableId, List<Long> partitionIds, List<Long> indexIds) + public void beforeCreatePartitions(long dbId, long tableId, List<Long> partitionIds, List<Long> indexIds, + boolean isCreateTable) throws DdlException { } - protected void afterCreatePartitions(long dbId, long tableId, List<Long> partitionIds, List<Long> indexIds, + public void afterCreatePartitions(long dbId, long tableId, List<Long> partitionIds, List<Long> indexIds, boolean isCreateTable) throws DdlException { } @@ -2827,7 +2847,7 @@ public class InternalCatalog implements CatalogIf<Database> { "Database " + db.getFullName() + " create unpartitioned table " + tableName + " increasing " + totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]"); } - beforeCreatePartitions(db.getId(), olapTable.getId(), null, olapTable.getIndexIdList()); + beforeCreatePartitions(db.getId(), olapTable.getId(), null, olapTable.getIndexIdList(), true); Partition partition = createPartitionWithIndices(db.getId(), olapTable, partitionId, partitionName, olapTable.getIndexIdToMeta(), partitionDistributionInfo, @@ -2891,7 +2911,7 @@ public class InternalCatalog implements CatalogIf<Database> { + totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]"); } - beforeCreatePartitions(db.getId(), olapTable.getId(), null, olapTable.getIndexIdList()); + beforeCreatePartitions(db.getId(), olapTable.getId(), null, olapTable.getIndexIdList(), true); // this is a 2-level partitioned tables for (Map.Entry<String, Long> entry : partitionNameToId.entrySet()) { @@ -3367,7 +3387,7 @@ public class InternalCatalog implements CatalogIf<Database> { } List<Long> indexIds = copiedTbl.getIndexIdToMeta().keySet().stream().collect(Collectors.toList()); - beforeCreatePartitions(db.getId(), copiedTbl.getId(), newPartitionIds, indexIds); + beforeCreatePartitions(db.getId(), copiedTbl.getId(), newPartitionIds, indexIds, true); for (Map.Entry<String, Long> entry : origPartitions.entrySet()) { // the new partition must use new id diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java index 6595afb70f7..3d77f42a1cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java @@ -394,7 +394,8 @@ public class MTMVPartitionUtil { AddPartitionClause addPartitionClause = new AddPartitionClause(singlePartitionDesc, mtmv.getDefaultDistributionInfo().toDistributionDesc(), partitionProperties, false); - Env.getCurrentEnv().addPartition((Database) mtmv.getDatabase(), mtmv.getName(), addPartitionClause); + Env.getCurrentEnv().addPartition((Database) mtmv.getDatabase(), mtmv.getName(), addPartitionClause, + false, 0, true); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index ab92cbd0d63..17bc984b2cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -3357,7 +3357,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { for (AddPartitionClause addPartitionClause : addPartitionClauseMap.values()) { try { // here maybe check and limit created partitions num - Env.getCurrentEnv().addPartition(db, olapTable.getName(), addPartitionClause); + Env.getCurrentEnv().addPartition(db, olapTable.getName(), addPartitionClause, false, 0, true); } catch (DdlException e) { LOG.warn(e); errorStatus.setErrorMsgs( diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/InternalSchemaAlterTest.java b/fe/fe-core/src/test/java/org/apache/doris/alter/InternalSchemaAlterTest.java index 27c00162d5f..5e7f5387f57 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/alter/InternalSchemaAlterTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/InternalSchemaAlterTest.java @@ -42,10 +42,10 @@ public class InternalSchemaAlterTest extends TestWithFeService { @Override protected void runBeforeAll() throws Exception { - InternalSchemaInitializer.createDb(); - InternalSchemaInitializer.createTbl(); Config.allow_replica_on_same_host = true; FeConstants.runningUnitTest = true; + InternalSchemaInitializer.createDb(); + InternalSchemaInitializer.createTbl(); } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java index 24ab4d99cae..b4a63e5d344 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java @@ -23,6 +23,7 @@ import org.apache.doris.analysis.CreateTableStmt; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.ExceptionChecker; +import org.apache.doris.common.FeConstants; import org.apache.doris.qe.ConnectContext; import org.apache.doris.utframe.UtFrameUtils; @@ -48,6 +49,7 @@ public class CreateTableLikeTest { @BeforeClass public static void beforeClass() throws Exception { + FeConstants.runningUnitTest = true; UtFrameUtils.createDorisCluster(runningDir); // create connect context diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java index c4901ced10d..bdd5a66902c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java @@ -23,6 +23,7 @@ import org.apache.doris.common.ConfigBase; import org.apache.doris.common.ConfigException; import org.apache.doris.common.DdlException; import org.apache.doris.common.ExceptionChecker; +import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; import org.apache.doris.utframe.TestWithFeService; @@ -43,6 +44,7 @@ public class CreateTableTest extends TestWithFeService { @Override protected void runBeforeAll() throws Exception { + FeConstants.runningUnitTest = true; Config.allow_replica_on_same_host = true; createDatabase("test"); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java index a0575eff548..e98e6f74545 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java @@ -24,6 +24,7 @@ import org.apache.doris.analysis.CreateTableStmt; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.ExceptionChecker; +import org.apache.doris.common.FeConstants; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.DdlExecutor; import org.apache.doris.resource.Tag; @@ -48,6 +49,7 @@ public class ModifyBackendTest { @BeforeClass public static void beforeClass() throws Exception { + FeConstants.runningUnitTest = true; UtFrameUtils.createDorisCluster(runningDir); // create connect context connectContext = UtFrameUtils.createDefaultCtx(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/RecoverTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/RecoverTest.java index a48652678e7..d3ec208a341 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/RecoverTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/RecoverTest.java @@ -26,6 +26,7 @@ import org.apache.doris.analysis.RecoverDbStmt; import org.apache.doris.analysis.RecoverPartitionStmt; import org.apache.doris.analysis.RecoverTableStmt; import org.apache.doris.common.DdlException; +import org.apache.doris.common.FeConstants; import org.apache.doris.qe.ConnectContext; import org.apache.doris.utframe.UtFrameUtils; @@ -45,6 +46,7 @@ public class RecoverTest { @BeforeClass public static void beforeClass() throws Exception { + FeConstants.runningUnitTest = true; UtFrameUtils.createDorisCluster(runningDir); // create connect context diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java index 22560492b40..632a062ed4a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java @@ -61,6 +61,7 @@ public class QueryPlanTest extends TestWithFeService { @Override protected void runBeforeAll() throws Exception { + FeConstants.runningUnitTest = true; // disable bucket shuffle join Deencapsulation.setField(connectContext.getSessionVariable(), "enableBucketShuffleJoin", false); connectContext.getSessionVariable().setEnableRuntimeFilterPrune(false); diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index d056c457a5b..f8797ea54e7 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -1423,6 +1423,28 @@ message GetRLTaskCommitAttachResponse { optional RLTaskTxnCommitAttachmentPB commit_attach = 2; } +message CheckKeyInfos { + repeated int64 db_ids = 1; + repeated int64 table_ids = 2; + repeated int64 index_ids = 3; + repeated int64 partition_ids = 4; +} + +message CheckKVRequest { + enum Operation { + CREATE_INDEX_AFTER_FE_COMMIT = 1; + CREATE_PARTITION_AFTER_FE_COMMIT = 2; + } + optional string cloud_unique_id = 1; // For auth + optional CheckKeyInfos check_keys = 2; + optional Operation op = 3; +} + +message CheckKVResponse { + optional MetaServiceResponseStatus status = 1; + optional CheckKeyInfos bad_keys = 2; +} + service MetaService { rpc begin_txn(BeginTxnRequest) returns (BeginTxnResponse); rpc precommit_txn(PrecommitTxnRequest) returns (PrecommitTxnResponse); @@ -1491,6 +1513,9 @@ service MetaService { // routine load progress rpc get_rl_task_commit_attach(GetRLTaskCommitAttachRequest) returns (GetRLTaskCommitAttachResponse); + + // check KV + rpc check_kv(CheckKVRequest) returns (CheckKVResponse); }; service RecyclerService { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org