This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 44b2e4f5826 [improvement](cloud) Accelerate creating table by batching 
RPC In cloud (#36786)
44b2e4f5826 is described below

commit 44b2e4f5826d265ce1ea3ea6d31ecb93283d78e4
Author: deardeng <565620...@qq.com>
AuthorDate: Sun Jul 14 23:07:37 2024 +0800

    [improvement](cloud) Accelerate creating table by batching RPC In cloud 
(#36786)
    
    Create dynamic partitions, optimize from calling prepare rpc and commit
    rpc multiple times to only calling prepare and commit once
    
    ```
    in one server(fe/ms)
    
    before optimize:
    Query OK, 0 rows affected (16.96 sec)
    Query OK, 0 rows affected (16.65 sec)
    Query OK, 0 rows affected (16.26 sec)
    Query OK, 0 rows affected (16.39 sec)
    Query OK, 0 rows affected (17.15 sec)
    
    after optimize:
    Query OK, 0 rows affected (13.31 sec)
    Query OK, 0 rows affected (11.71 sec)
    Query OK, 0 rows affected (11.81 sec)
    Query OK, 0 rows affected (11.55 sec)
    Query OK, 0 rows affected (12.07 sec)
    
    CREATE TABLE income_statistics_service (
        statistics_date date ,
        lyy_distributor_id bigint ,
        lyy_equipment_group_id bigint ,
        lyy_equipment_id bigint ,
        service_type_id bigint ,
        equipment_value varchar(40),
        wechat_pay_amount decimal,
        wechat_refund_amount decimal,
        alipay_pay_amount decimal,
        alipay_refund_amount decimal,
        union_pay_amount decimal,
        union_refund_amount decimal,
        app_pay_amount decimal,
        app_refund_amount decimal,
        ad_amount decimal,
        ad_distributor_count bigint,
        offline_coins bigint,
        online_coins bigint,
        online_startup_count bigint,
        gift_consumption_amount decimal,
        gift_consumption_count bigint,
        wechat_pay_coins bigint,
        alipay_pay_coins bigint,
        online_red_coins bigint,
        game_online_amount decimal,
        game_gift_consumption_amount decimal,
        game_gift_consumption_count bigint,
        rd_gift_consumption_count bigint,
        rd_gift_consumption_amount decimal,
        online_amount decimal,
        offline_amount decimal,
        offline_count bigint,
        wechat_pay_count bigint,
        alipay_pay_count bigint,
        union_pay_count bigint,
        wechat_refund_count bigint,
        alipay_refund_count bigint,
        union_refund_count bigint,
        app_pay_count bigint,
        app_refund_count bigint,
        created datetime,
        updated datetime,
        service_type_value varchar(32),
        service_type_name varchar(32),
        jd_pay_amount decimal ,
        jd_refund_amount decimal ,
        jd_pay_count bigint ,
        jd_refund_count bigint ,
        lyy_factory_id bigint,
        purse_amount decimal ,
        purse_refund_amount decimal ,
        purse_count bigint ,
        purse_refund_count bigint ,
        boost_pay_amount decimal ,
        boost_refund_amount decimal ,
        boost_pay_count bigint ,
        boost_refund_count bigint ,
        grabpay_pay_amount decimal ,
        grabpay_refund_amount decimal ,
        grabpay_pay_count bigint ,
        grabpay_refund_count bigint ,
        maybank_pay_amount decimal ,
        maybank_refund_amount decimal ,
        maybank_pay_count bigint ,
        maybank_refund_count bigint ,
        pay_start_amount decimal ,
        exchange_amount decimal ,
        auto_refund_count bigint ,
        auto_refund_amount decimal ,
        manual_refund_count bigint ,
        manual_refund_amount decimal ,
        custom_service_fee decimal,
        value_added_service_fee decimal,
        INDEX idx_statistics_date (`statistics_date`) USING BITMAP COMMENT '',
        INDEX idx_lyy_distributor_id (`lyy_distributor_id`) USING BITMAP 
COMMENT '',
        INDEX idx_lyy_equipment_group_id (`lyy_equipment_group_id`) USING 
BITMAP COMMENT '',
        INDEX idx_lyy_equipment_id (`lyy_equipment_id`) USING BITMAP COMMENT '',
        INDEX idx_service_type_id (`service_type_id`) USING BITMAP COMMENT ''
      ) ENGINE = OLAP UNIQUE KEY(
        `statistics_date`,
        `lyy_distributor_id`,
        `lyy_equipment_group_id`,
        `lyy_equipment_id`,
        `service_type_id`
      ) COMMENT 'OLAP' PARTITION BY RANGE(`statistics_date`)()
    DISTRIBUTED BY HASH(`lyy_distributor_id`) BUCKETS 6 PROPERTIES (
      "file_cache_ttl_seconds" = "0",
      "bloom_filter_columns" = "statistics_date, lyy_distributor_id, 
lyy_equipment_group_id, lyy_equipment_id, service_type_id",
      "dynamic_partition.enable" = "true",
      "dynamic_partition.time_unit" = "DAY",
      "dynamic_partition.time_zone" = "Asia/Shanghai",
      "dynamic_partition.start" = "-730",
      "dynamic_partition.end" = "3",
      "dynamic_partition.prefix" = "p",
      "dynamic_partition.buckets" = "2",
      "dynamic_partition.create_history_partition" = "true",
      "dynamic_partition.history_partition_num" = "-1",
      "dynamic_partition.hot_partition_num" = "0",
      "dynamic_partition.reserved_history_periods" = "NULL",
      "enable_unique_key_merge_on_write" = "true",
      "light_schema_change" = "true"
    );
    
    show partitions from income_statistics_service;
    ......
    734 rows in set (1.06 sec)
    ```
    
    ---------
    
    Co-authored-by: Gavin Chou <gavineaglec...@gmail.com>
---
 cloud/src/common/bvars.cpp                         |   2 +
 cloud/src/common/bvars.h                           |   1 +
 cloud/src/meta-service/meta_service.h              |   8 ++
 cloud/src/meta-service/meta_service_partition.cpp  | 114 +++++++++++++++++-
 cloud/test/meta_service_test.cpp                   |  64 ++++++++++
 .../main/java/org/apache/doris/common/Config.java  |  12 +-
 .../main/java/org/apache/doris/alter/Alter.java    |   2 +-
 .../main/java/org/apache/doris/catalog/Env.java    |  19 ++-
 .../doris/clone/DynamicPartitionScheduler.java     | 130 ++++++++++++++++++++-
 .../cloud/datasource/CloudInternalCatalog.java     |  88 +++++++++++++-
 .../apache/doris/cloud/rpc/MetaServiceClient.java  |   4 +
 .../apache/doris/cloud/rpc/MetaServiceProxy.java   |   9 ++
 .../apache/doris/datasource/InternalCatalog.java   |  92 +++++++++------
 .../org/apache/doris/mtmv/MTMVPartitionUtil.java   |   3 +-
 .../apache/doris/service/FrontendServiceImpl.java  |   2 +-
 .../doris/alter/InternalSchemaAlterTest.java       |   4 +-
 .../apache/doris/catalog/CreateTableLikeTest.java  |   2 +
 .../org/apache/doris/catalog/CreateTableTest.java  |   2 +
 .../apache/doris/catalog/ModifyBackendTest.java    |   2 +
 .../java/org/apache/doris/catalog/RecoverTest.java |   2 +
 .../org/apache/doris/planner/QueryPlanTest.java    |   1 +
 gensrc/proto/cloud.proto                           |  25 ++++
 22 files changed, 537 insertions(+), 51 deletions(-)

diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 43acb47e365..dc401398f68 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -79,6 +79,8 @@ BvarLatencyRecorderWithTag g_bvar_ms_finish_tablet_job("ms", 
"finish_tablet_job"
 BvarLatencyRecorderWithTag g_bvar_ms_get_cluster_status("ms", 
"get_cluster_status");
 BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status("ms", 
"set_cluster_status");
 
+BvarLatencyRecorderWithTag g_bvar_ms_check_kv("ms", "check_kv");
+
 // txn_kv's bvars
 bvar::LatencyRecorder g_bvar_txn_kv_get("txn_kv", "get");
 bvar::LatencyRecorder g_bvar_txn_kv_range_get("txn_kv", "range_get");
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index e5b50262104..f2957e35940 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -174,6 +174,7 @@ extern BvarLatencyRecorderWithTag 
g_bvar_ms_set_cluster_status;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id;
+extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv;
 
 // txn_kv's bvars
 extern bvar::LatencyRecorder g_bvar_txn_kv_get;
diff --git a/cloud/src/meta-service/meta_service.h 
b/cloud/src/meta-service/meta_service.h
index 0346afc36b3..e2360e9e6ba 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -141,6 +141,9 @@ public:
     void drop_index(::google::protobuf::RpcController* controller, const 
IndexRequest* request,
                     IndexResponse* response, ::google::protobuf::Closure* 
done) override;
 
+    void check_kv(::google::protobuf::RpcController* controller, const 
CheckKVRequest* request,
+                  CheckKVResponse* response, ::google::protobuf::Closure* 
done) override;
+
     void prepare_partition(::google::protobuf::RpcController* controller,
                            const PartitionRequest* request, PartitionResponse* 
response,
                            ::google::protobuf::Closure* done) override;
@@ -422,6 +425,11 @@ public:
         call_impl(&cloud::MetaService::drop_index, controller, request, 
response, done);
     }
 
+    void check_kv(::google::protobuf::RpcController* controller, const 
CheckKVRequest* request,
+                  CheckKVResponse* response, ::google::protobuf::Closure* 
done) override {
+        call_impl(&cloud::MetaService::check_kv, controller, request, 
response, done);
+    }
+
     void prepare_partition(::google::protobuf::RpcController* controller,
                            const PartitionRequest* request, PartitionResponse* 
response,
                            ::google::protobuf::Closure* done) override {
diff --git a/cloud/src/meta-service/meta_service_partition.cpp 
b/cloud/src/meta-service/meta_service_partition.cpp
index aae11be5edd..d165b8b5e06 100644
--- a/cloud/src/meta-service/meta_service_partition.cpp
+++ b/cloud/src/meta-service/meta_service_partition.cpp
@@ -25,7 +25,9 @@
 #include "meta_service.h"
 
 namespace doris::cloud {
-
+using check_create_table_type = std::function<const std::tuple<
+        const ::google::protobuf::RepeatedField<int64_t>, std::string,
+        std::function<std::string(std::string, int64_t)>>(const 
CheckKVRequest* request)>;
 // ATTN: xxx_id MUST NOT be reused
 //
 //              UNKNOWN
@@ -67,6 +69,11 @@ static TxnErrorCode index_exists(Transaction* txn, const 
std::string& instance_i
     return it->has_next() ? TxnErrorCode::TXN_OK : 
TxnErrorCode::TXN_KEY_NOT_FOUND;
 }
 
+static TxnErrorCode check_recycle_key_exist(Transaction* txn, const 
std::string& key) {
+    std::string val;
+    return txn->get(key, &val);
+}
+
 void MetaServiceImpl::prepare_index(::google::protobuf::RpcController* 
controller,
                                     const IndexRequest* request, 
IndexResponse* response,
                                     ::google::protobuf::Closure* done) {
@@ -614,4 +621,109 @@ void 
MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll
     }
 }
 
+void check_create_table(std::string instance_id, std::shared_ptr<TxnKv> txn_kv,
+                        const CheckKVRequest* request, CheckKVResponse* 
response,
+                        MetaServiceCode* code, std::string* msg,
+                        check_create_table_type get_check_info) {
+    StopWatch watch;
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        *code = cast_as<ErrCategory::READ>(err);
+        *msg = "failed to create txn";
+        return;
+    }
+    auto& [keys, hint, key_func] = get_check_info(request);
+    if (keys.empty()) {
+        *code = MetaServiceCode::INVALID_ARGUMENT;
+        *msg = "empty keys";
+        return;
+    }
+
+    for (int i = 0; i < keys.size();) {
+        auto key = key_func(instance_id, keys.Get(i));
+        err = check_recycle_key_exist(txn.get(), key);
+        if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+            i++;
+            continue;
+        } else if (err == TxnErrorCode::TXN_OK) {
+            // find not match, prepare commit
+            *code = MetaServiceCode::UNDEFINED_ERR;
+            *msg = "prepare and commit rpc not match, recycle key remained";
+            return;
+        } else if (err == TxnErrorCode::TXN_TOO_OLD) {
+            //  separate it to several txn for rubustness
+            txn.reset();
+            TxnErrorCode err = txn_kv->create_txn(&txn);
+            if (err != TxnErrorCode::TXN_OK) {
+                *code = cast_as<ErrCategory::READ>(err);
+                *msg = "failed to create txn in cycle";
+                return;
+            }
+            LOG_INFO("meet txn too long err, gen a new txn, and retry, size={} 
idx={}", keys.size(),
+                     i);
+            bthread_usleep(50);
+            continue;
+        } else {
+            // err != TXN_OK, fdb read err
+            *code = cast_as<ErrCategory::READ>(err);
+            *msg = fmt::format("ms read key error: {}", err);
+            return;
+        }
+    }
+    LOG_INFO("check {} success key.size={}, cost(us)={}", hint, keys.size(), 
watch.elapsed_us());
+}
+
+void MetaServiceImpl::check_kv(::google::protobuf::RpcController* controller,
+                               const CheckKVRequest* request, CheckKVResponse* 
response,
+                               ::google::protobuf::Closure* done) {
+    RPC_PREPROCESS(check_kv);
+    instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
+    if (instance_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "empty instance_id";
+        return;
+    }
+    if (!request->has_op()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "op not given";
+        return;
+    }
+    if (!request->has_check_keys()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "empty check keys";
+        return;
+    }
+    RPC_RATE_LIMIT(check_kv);
+    switch (request->op()) {
+    case CheckKVRequest::CREATE_INDEX_AFTER_FE_COMMIT: {
+        check_create_table(instance_id, txn_kv_, request, response, &code, 
&msg,
+                           [](const CheckKVRequest* request) {
+                               return std::make_tuple(
+                                       request->check_keys().index_ids(), 
"index",
+                                       [](std::string instance_id, int64_t id) 
{
+                                           return 
recycle_index_key({std::move(instance_id), id});
+                                       });
+                           });
+        break;
+    }
+    case CheckKVRequest::CREATE_PARTITION_AFTER_FE_COMMIT: {
+        check_create_table(
+                instance_id, txn_kv_, request, response, &code, &msg,
+                [](const CheckKVRequest* request) {
+                    return std::make_tuple(
+                            request->check_keys().partition_ids(), "partition",
+                            [](std::string instance_id, int64_t id) {
+                                return 
recycle_partition_key({std::move(instance_id), id});
+                            });
+                });
+        break;
+    }
+    default:
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "not support op";
+        return;
+    };
+}
+
 } // namespace doris::cloud
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index a2bf25187aa..850f8cfbabd 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -5295,6 +5295,70 @@ TEST(MetaServiceTest, PartitionRequest) {
     ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
     ASSERT_EQ(txn->get(partition_key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND);
     req.add_index_ids(index_id);
+    // ------------Test check partition-----------
+    // Normal
+    req.set_db_id(1);
+    req.set_table_id(table_id + 1);
+    req.add_index_ids(index_id + 1);
+    req.add_partition_ids(partition_id + 1);
+    meta_service->prepare_partition(&ctrl, &req, &res, nullptr);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    meta_service->commit_partition(&ctrl, &req, &res, nullptr);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    CheckKVRequest req_check;
+    CheckKVResponse res_check;
+    meta_service->check_kv(&ctrl, &req_check, &res_check, nullptr);
+    ASSERT_EQ(res_check.status().code(), MetaServiceCode::INVALID_ARGUMENT);
+    res_check.Clear();
+    req_check.set_op(CheckKVRequest::CREATE_PARTITION_AFTER_FE_COMMIT);
+    CheckKeyInfos check_keys_pb;
+    check_keys_pb.add_table_ids(table_id + 1);
+    check_keys_pb.add_index_ids(index_id + 1);
+    check_keys_pb.add_partition_ids(partition_id + 1);
+    req_check.mutable_check_keys()->CopyFrom(check_keys_pb);
+    meta_service->check_kv(&ctrl, &req_check, &res_check, nullptr);
+    ASSERT_EQ(res_check.status().code(), MetaServiceCode::OK);
+    res_check.Clear();
+    // AbNomal not commit
+    req.Clear();
+    req.set_db_id(1);
+    req.set_table_id(table_id + 2);
+    req.add_index_ids(index_id + 2);
+    req.add_partition_ids(partition_id + 2);
+    meta_service->prepare_partition(&ctrl, &req, &res, nullptr);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    req_check.Clear();
+    req_check.set_op(CheckKVRequest::CREATE_PARTITION_AFTER_FE_COMMIT);
+    check_keys_pb.Clear();
+    check_keys_pb.add_table_ids(table_id + 2);
+    check_keys_pb.add_index_ids(index_id + 2);
+    check_keys_pb.add_partition_ids(partition_id + 2);
+    req_check.mutable_check_keys()->CopyFrom(check_keys_pb);
+    meta_service->check_kv(&ctrl, &req_check, &res_check, nullptr);
+    ASSERT_EQ(res_check.status().code(), MetaServiceCode::UNDEFINED_ERR);
+
+    // ------------Test check index-----------
+    // Normal
+    IndexRequest req_index;
+    IndexResponse res_index;
+    req_index.set_db_id(1);
+    req_index.set_table_id(table_id + 3);
+    req_index.add_index_ids(index_id + 3);
+    meta_service->prepare_index(&ctrl, &req_index, &res_index, nullptr);
+    ASSERT_EQ(res_index.status().code(), MetaServiceCode::OK);
+    meta_service->commit_index(&ctrl, &req_index, &res_index, nullptr);
+    ASSERT_EQ(res_index.status().code(), MetaServiceCode::OK);
+    req_check.Clear();
+    res_check.Clear();
+    req_check.set_op(CheckKVRequest::CREATE_INDEX_AFTER_FE_COMMIT);
+    check_keys_pb.Clear();
+    check_keys_pb.add_table_ids(table_id + 3);
+    check_keys_pb.add_index_ids(index_id + 3);
+    req_check.mutable_check_keys()->CopyFrom(check_keys_pb);
+    meta_service->check_kv(&ctrl, &req_check, &res_check, nullptr);
+    ASSERT_EQ(res_check.status().code(), MetaServiceCode::OK);
+    res_check.Clear();
+
     // ------------Test drop partition------------
     reset_meta_service();
     req.Clear();
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 0467a4931d2..5756d8fbfc2 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2672,6 +2672,9 @@ public class Config extends ConfigBase {
             "Should the request content be logged before each request starts, 
specifically the query statements"})
     public static boolean enable_print_request_before_execution = false;
 
+    @ConfField(mutable = true)
+    public static boolean enable_cooldown_replica_affinity = true;
+
     
//==========================================================================
     //                    begin of cloud config
     
//==========================================================================
@@ -2900,9 +2903,12 @@ public class Config extends ConfigBase {
             "streamload route policy in cloud mode, availale options are 
public-private and empty string"})
     public static String streamload_redirect_policy = "";
 
-    @ConfField(mutable = true)
-    public static boolean enable_cooldown_replica_affinity = true;
-
+    @ConfField(description = {"存算分离模式下建表是否检查残留recycler key, 默认true",
+        "create table in cloud mode, check recycler key remained, default 
true"})
+    public static boolean check_create_table_recycle_key_remained = true;
+    // ATTN: DONOT add any config not related to cloud mode here
+    // ATTN: DONOT add any config not related to cloud mode here
+    // ATTN: DONOT add any config not related to cloud mode here
     
//==========================================================================
     //                      end of cloud config
     
//==========================================================================
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index 6c2f38ef101..8e8be7c567e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -483,7 +483,7 @@ public class Alter {
                     DynamicPartitionUtil.checkAlterAllowed(
                             (OlapTable) db.getTableOrMetaException(tableName, 
TableType.OLAP));
                 }
-                Env.getCurrentEnv().addPartition(db, tableName, 
(AddPartitionClause) alterClause);
+                Env.getCurrentEnv().addPartition(db, tableName, 
(AddPartitionClause) alterClause, false, 0, true);
             } else if (alterClause instanceof AddPartitionLikeClause) {
                 if (!((AddPartitionLikeClause) 
alterClause).getIsTempPartition()) {
                     DynamicPartitionUtil.checkAlterAllowed(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 559094de442..f01d8e79af2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -3247,8 +3247,23 @@ public class Env {
         getInternalCatalog().createTableAsSelect(stmt);
     }
 
-    public void addPartition(Database db, String tableName, AddPartitionClause 
addPartitionClause) throws DdlException {
-        getInternalCatalog().addPartition(db, tableName, addPartitionClause);
+    /**
+     * Adds a partition to a table
+     *
+     * @param db
+     * @param tableName
+     * @param addPartitionClause clause in the CreateTableStmt
+     * @param isCreateTable this call is for creating table
+     * @param generatedPartitionId the preset partition id for the partition 
to add
+     * @param writeEditLog whether to write an edit log for this addition
+     * @return PartitionPersistInfo to be written to editlog. It may be null 
if no partitions added.
+     * @throws DdlException
+     */
+    public PartitionPersistInfo addPartition(Database db, String tableName, 
AddPartitionClause addPartitionClause,
+                                             boolean isCreateTable, long 
generatedPartitionId,
+                                             boolean writeEditLog) throws 
DdlException {
+        return getInternalCatalog().addPartition(db, tableName, 
addPartitionClause,
+            isCreateTable, generatedPartitionId, writeEditLog);
     }
 
     public void addPartitionLike(Database db, String tableName, 
AddPartitionLikeClause addPartitionLikeClause)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
index 17a4d34aa38..0da07c7c833 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
@@ -33,6 +33,7 @@ import org.apache.doris.catalog.DynamicPartitionProperty;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.HashDistributionInfo;
 import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.MetaIdGenerator;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.PartitionItem;
@@ -45,11 +46,14 @@ import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.util.AutoBucketUtils;
+import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.DynamicPartitionUtil;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.common.util.RangeUtils;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.persist.PartitionPersistInfo;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.thrift.TStorageMedium;
 
@@ -622,9 +626,30 @@ public class DynamicPartitionScheduler extends 
MasterDaemon {
             }
 
             if (!skipAddPartition) {
-                for (AddPartitionClause addPartitionClause : 
addPartitionClauses) {
+                // get partitionIds and indexIds
+                List<Long> indexIds = new 
ArrayList<>(olapTable.getCopiedIndexIdToMeta().keySet());
+                List<Long> generatedPartitionIds = new ArrayList<>();
+                cloudBatchBeforeCreatePartitions(executeFirstTime, 
addPartitionClauses, olapTable, indexIds,
+                        db, tableName, generatedPartitionIds);
+
+                List<PartitionPersistInfo> partsInfo = new ArrayList<>();
+                for (int i = 0; i < addPartitionClauses.size(); i++) {
                     try {
-                        Env.getCurrentEnv().addPartition(db, tableName, 
addPartitionClause);
+                        boolean needWriteEditLog = true;
+                        // ATTN: !executeFirstTime, needWriteEditLog
+                        // here executeFirstTime is create table, so in cloud 
edit log will postpone
+                        if (Config.isCloudMode()) {
+                            needWriteEditLog = !executeFirstTime;
+                        }
+                        PartitionPersistInfo info =
+                                Env.getCurrentEnv().addPartition(db, 
tableName, addPartitionClauses.get(i),
+                                    executeFirstTime,
+                                    executeFirstTime && Config.isCloudMode() ? 
generatedPartitionIds.get(i) : 0,
+                                    needWriteEditLog);
+                        if (info == null) {
+                            throw new Exception("null persisted partition 
returned");
+                        }
+                        partsInfo.add(info);
                         clearCreatePartitionFailedMsg(olapTable.getId());
                     } catch (Exception e) {
                         recordCreatePartitionFailedMsg(db.getFullName(), 
tableName, e.getMessage(), olapTable.getId());
@@ -634,7 +659,108 @@ public class DynamicPartitionScheduler extends 
MasterDaemon {
                         }
                     }
                 }
+                cloudBatchAfterCreatePartitions(executeFirstTime, partsInfo,
+                        addPartitionClauses, db, olapTable, indexIds, 
tableName);
+            }
+        }
+    }
+
+    private void cloudBatchAfterCreatePartitions(boolean executeFirstTime, 
List<PartitionPersistInfo> partsInfo,
+                                                       
ArrayList<AddPartitionClause> addPartitionClauses, Database db,
+                                                       OlapTable olapTable, 
List<Long> indexIds,
+                                                       String tableName) 
throws DdlException {
+        if (Config.isNotCloudMode()) {
+            return;
+        }
+        List<Long> succeedPartitionIds = 
partsInfo.stream().map(partitionPersistInfo
+                -> 
partitionPersistInfo.getPartition().getId()).collect(Collectors.toList());
+        if (!executeFirstTime || addPartitionClauses.isEmpty()) {
+            LOG.info("cloud commit rpc in batch, {}-{}", !executeFirstTime, 
addPartitionClauses.size());
+            return;
+        }
+        try {
+            // ATTN: failedPids = generatedPartitionIds - succeedPartitionIds,
+            // means some partitions failed when addPartition, failedPids will 
be recycled by recycler
+            if 
(DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.before.commitCloudPartition"))
 {
+                LOG.info("debug point 
FE.DynamicPartitionScheduler.before.commitCloudPartition, throw e");
+                // not commit, not log edit
+                throw new Exception("debug point 
FE.DynamicPartitionScheduler.before.commitCloudPartition");
             }
+            Env.getCurrentInternalCatalog().afterCreatePartitions(db.getId(), 
olapTable.getId(),
+                    succeedPartitionIds, indexIds, true);
+            LOG.info("begin write edit log to add partitions in batch, "
+                    + "numPartitions: {}, db: {}, table: {}, tableId: {}",
+                    partsInfo.size(), db.getFullName(), tableName, 
olapTable.getId());
+            // ATTN: here, edit log must after commit cloud partition,
+            // prevent commit RPC failure from causing data loss
+            if 
(DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.before.logEditPartitions"))
 {
+                LOG.info("debug point 
FE.DynamicPartitionScheduler.before.logEditPartitions, throw e");
+                // committed, but not log edit
+                throw new Exception("debug point 
FE.DynamicPartitionScheduler.before.commitCloudPartition");
+            }
+            for (int i = 0; i < partsInfo.size(); i++) {
+                
Env.getCurrentEnv().getEditLog().logAddPartition(partsInfo.get(i));
+                if 
(DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.in.logEditPartitions")) {
+                    if (i == partsInfo.size() / 2) {
+                        LOG.info("debug point 
FE.DynamicPartitionScheduler.in.logEditPartitions, throw e");
+                        // committed, but log some edit, others failed
+                        throw new Exception("debug point 
FE.DynamicPartitionScheduler"
+                            + ".in.commitCloudPartition");
+                    }
+                }
+            }
+            LOG.info("finish write edit log to add partitions in batch, "
+                    + "numPartitions: {}, db: {}, table: {}, tableId: {}",
+                    partsInfo.size(), db.getFullName(), tableName, 
olapTable.getId());
+        } catch (Exception e) {
+            LOG.warn("cloud in commit step, dbName {}, tableName {}, tableId 
{} exception {}",
+                    db.getFullName(), tableName, olapTable.getId(), 
e.getMessage());
+            recordCreatePartitionFailedMsg(db.getFullName(), tableName, 
e.getMessage(), olapTable.getId());
+            throw new DdlException("cloud in commit step err");
+        }
+    }
+
+    private void cloudBatchBeforeCreatePartitions(boolean executeFirstTime,
+                                                  
ArrayList<AddPartitionClause> addPartitionClauses,
+                                                  OlapTable olapTable, 
List<Long> indexIds, Database db,
+                                                  String tableName, List<Long> 
generatedPartitionIds)
+            throws DdlException {
+        if (Config.isNotCloudMode()) {
+            return;
+        }
+        if (!executeFirstTime || addPartitionClauses.isEmpty()) {
+            LOG.info("cloud prepare rpc in batch, {}-{}", !executeFirstTime, 
addPartitionClauses.size());
+            return;
+        }
+        AddPartitionClause addPartitionClause = addPartitionClauses.get(0);
+        DistributionDesc distributionDesc = 
addPartitionClause.getDistributionDesc();
+        try {
+            DistributionInfo distributionInfo = distributionDesc
+                    .toDistributionInfo(olapTable.getBaseSchema());
+            if (distributionDesc == null) {
+                distributionInfo =  olapTable.getDefaultDistributionInfo()
+                    
.toDistributionDesc().toDistributionInfo(olapTable.getBaseSchema());
+            }
+            long allPartitionBufferSize = 0;
+            for (int i = 0; i < addPartitionClauses.size(); i++) {
+                long bufferSize = 
InternalCatalog.checkAndGetBufferSize(indexIds.size(),
+                        distributionInfo.getBucketNum(),
+                        addPartitionClause.getSingeRangePartitionDesc()
+                        .getReplicaAlloc().getTotalReplicaNum(),
+                        db, tableName);
+                allPartitionBufferSize += bufferSize;
+            }
+            MetaIdGenerator.IdGeneratorBuffer idGeneratorBuffer = 
Env.getCurrentEnv()
+                    .getIdGeneratorBuffer(allPartitionBufferSize);
+            addPartitionClauses.forEach(p -> 
generatedPartitionIds.add(idGeneratorBuffer.getNextId()));
+            // executeFirstTime true
+            Env.getCurrentInternalCatalog().beforeCreatePartitions(db.getId(), 
olapTable.getId(),
+                    generatedPartitionIds, indexIds, true);
+        } catch (Exception e) {
+            LOG.warn("cloud in prepare step, dbName {}, tableName {}, tableId 
{} indexId {} exception {}",
+                    db.getFullName(), tableName, olapTable.getId(), indexIds, 
e.getMessage());
+            recordCreatePartitionFailedMsg(db.getFullName(), tableName, 
e.getMessage(), olapTable.getId());
+            throw new DdlException("cloud in prepare step err");
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
index b6adfb1789b..e72690af1c7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
@@ -403,7 +403,8 @@ public class CloudInternalCatalog extends InternalCatalog {
     }
 
     @Override
-    protected void beforeCreatePartitions(long dbId, long tableId, List<Long> 
partitionIds, List<Long> indexIds)
+    public void beforeCreatePartitions(long dbId, long tableId, List<Long> 
partitionIds, List<Long> indexIds,
+                                          boolean isCreateTable)
             throws DdlException {
         if (partitionIds == null) {
             prepareMaterializedIndex(tableId, indexIds, 0);
@@ -413,7 +414,7 @@ public class CloudInternalCatalog extends InternalCatalog {
     }
 
     @Override
-    protected void afterCreatePartitions(long dbId, long tableId, List<Long> 
partitionIds, List<Long> indexIds,
+    public void afterCreatePartitions(long dbId, long tableId, List<Long> 
partitionIds, List<Long> indexIds,
                                          boolean isCreateTable)
             throws DdlException {
         if (partitionIds == null) {
@@ -421,6 +422,19 @@ public class CloudInternalCatalog extends InternalCatalog {
         } else {
             commitPartition(dbId, tableId, partitionIds, indexIds);
         }
+        if (!Config.check_create_table_recycle_key_remained) {
+            return;
+        }
+        checkCreatePartitions(dbId, tableId, partitionIds, indexIds);
+    }
+
+    private void checkCreatePartitions(long dbId, long tableId, List<Long> 
partitionIds, List<Long> indexIds)
+            throws DdlException {
+        if (partitionIds == null) {
+            checkMaterializedIndex(dbId, tableId, indexIds);
+        } else {
+            checkPartition(dbId, tableId, partitionIds);
+        }
     }
 
     private void preparePartition(long dbId, long tableId, List<Long> 
partitionIds, List<Long> indexIds)
@@ -557,6 +571,76 @@ public class CloudInternalCatalog extends InternalCatalog {
         }
     }
 
+    private void checkPartition(long dbId, long tableId, List<Long> 
partitionIds)
+            throws DdlException {
+        Cloud.CheckKeyInfos.Builder checkKeyInfosBuilder = 
Cloud.CheckKeyInfos.newBuilder();
+        checkKeyInfosBuilder.addAllPartitionIds(partitionIds);
+        // for ms log
+        checkKeyInfosBuilder.addDbIds(dbId);
+        checkKeyInfosBuilder.addTableIds(tableId);
+
+        Cloud.CheckKVRequest.Builder checkKvRequestBuilder = 
Cloud.CheckKVRequest.newBuilder();
+        checkKvRequestBuilder.setCloudUniqueId(Config.cloud_unique_id);
+        checkKvRequestBuilder.setCheckKeys(checkKeyInfosBuilder.build());
+        
checkKvRequestBuilder.setOp(Cloud.CheckKVRequest.Operation.CREATE_PARTITION_AFTER_FE_COMMIT);
+        final Cloud.CheckKVRequest checkKVRequest = 
checkKvRequestBuilder.build();
+
+        Cloud.CheckKVResponse response = null;
+        int tryTimes = 0;
+        while (tryTimes++ < Config.metaServiceRpcRetryTimes()) {
+            try {
+                response = 
MetaServiceProxy.getInstance().checkKv(checkKVRequest);
+                break;
+            } catch (RpcException e) {
+                LOG.warn("tryTimes:{}, checkPartition RpcException", tryTimes, 
e);
+                if (tryTimes + 1 >= Config.metaServiceRpcRetryTimes()) {
+                    throw new DdlException(e.getMessage());
+                }
+            }
+            sleepSeveralMs();
+        }
+
+        if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+            LOG.warn("checkPartition response: {} ", response);
+            throw new DdlException(response.getStatus().getMsg());
+        }
+    }
+
+    public void checkMaterializedIndex(long dbId, long tableId, List<Long> 
indexIds)
+            throws DdlException {
+        Cloud.CheckKeyInfos.Builder checkKeyInfosBuilder = 
Cloud.CheckKeyInfos.newBuilder();
+        checkKeyInfosBuilder.addAllIndexIds(indexIds);
+        // for ms log
+        checkKeyInfosBuilder.addDbIds(dbId);
+        checkKeyInfosBuilder.addTableIds(tableId);
+
+        Cloud.CheckKVRequest.Builder checkKvRequestBuilder = 
Cloud.CheckKVRequest.newBuilder();
+        checkKvRequestBuilder.setCloudUniqueId(Config.cloud_unique_id);
+        checkKvRequestBuilder.setCheckKeys(checkKeyInfosBuilder.build());
+        
checkKvRequestBuilder.setOp(Cloud.CheckKVRequest.Operation.CREATE_INDEX_AFTER_FE_COMMIT);
+        final Cloud.CheckKVRequest checkKVRequest = 
checkKvRequestBuilder.build();
+
+        Cloud.CheckKVResponse response = null;
+        int tryTimes = 0;
+        while (tryTimes++ < Config.metaServiceRpcRetryTimes()) {
+            try {
+                response = 
MetaServiceProxy.getInstance().checkKv(checkKVRequest);
+                break;
+            } catch (RpcException e) {
+                LOG.warn("tryTimes:{}, checkIndex RpcException", tryTimes, e);
+                if (tryTimes + 1 >= Config.metaServiceRpcRetryTimes()) {
+                    throw new DdlException(e.getMessage());
+                }
+            }
+            sleepSeveralMs();
+        }
+
+        if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+            LOG.warn("checkIndex response: {} ", response);
+            throw new DdlException(response.getStatus().getMsg());
+        }
+    }
+
     public Cloud.CreateTabletsResponse
             sendCreateTabletsRpc(Cloud.CreateTabletsRequest.Builder 
requestBuilder) throws DdlException  {
         requestBuilder.setCloudUniqueId(Config.cloud_unique_id);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
index f7a178deb01..d5cdc79eb7f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
@@ -255,6 +255,10 @@ public class MetaServiceClient {
         return blockingStub.commitPartition(request);
     }
 
+    public Cloud.CheckKVResponse checkKv(Cloud.CheckKVRequest request) {
+        return blockingStub.checkKv(request);
+    }
+
     public Cloud.PartitionResponse dropPartition(Cloud.PartitionRequest 
request) {
         return blockingStub.dropPartition(request);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
index 00f271099e4..d7ec3289067 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
@@ -318,6 +318,15 @@ public class MetaServiceProxy {
         }
     }
 
+    public Cloud.CheckKVResponse checkKv(Cloud.CheckKVRequest request) throws 
RpcException {
+        try {
+            final MetaServiceClient client = getProxy();
+            return client.checkKv(request);
+        } catch (Exception e) {
+            throw new RpcException("", e.getMessage(), e);
+        }
+    }
+
     public Cloud.IndexResponse dropIndex(Cloud.IndexRequest request) throws 
RpcException {
         try {
             final MetaServiceClient client = getProxy();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index e13316c3e76..910df9e1735 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1479,7 +1479,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
             } finally {
                 table.readUnlock();
             }
-            addPartition(db, tableName, clause);
+            addPartition(db, tableName, clause, false, 0, true);
 
         } catch (UserException e) {
             throw new DdlException("Failed to ADD PARTITION " + 
addPartitionLikeClause.getPartitionName()
@@ -1487,7 +1487,25 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         }
     }
 
-    public void addPartition(Database db, String tableName, AddPartitionClause 
addPartitionClause) throws DdlException {
+    public static long checkAndGetBufferSize(long indexNum, long bucketNum,
+                                             long replicaNum, Database db, 
String tableName) throws DdlException {
+        long totalReplicaNum = indexNum * bucketNum * replicaNum;
+        if (Config.isNotCloudMode() && totalReplicaNum >= 
db.getReplicaQuotaLeftWithLock()) {
+            throw new DdlException("Database " + db.getFullName() + " table " 
+ tableName + " add partition increasing "
+                + totalReplicaNum + " of replica exceeds quota[" + 
db.getReplicaQuota() + "]");
+        }
+        return 1 + totalReplicaNum + indexNum * bucketNum;
+    }
+
+    public PartitionPersistInfo addPartition(Database db, String tableName, 
AddPartitionClause addPartitionClause,
+                                             boolean isCreateTable, long 
generatedPartitionId,
+                                             boolean writeEditLog) throws 
DdlException {
+        // in cloud mode, isCreateTable == true, create dynamic partition use, 
so partitionId must have been generated.
+        // isCreateTable == false, other case, partitionId generate in below, 
must be set 0
+        if (!FeConstants.runningUnitTest && Config.isCloudMode()
+                && (isCreateTable && generatedPartitionId == 0) || 
(!isCreateTable && generatedPartitionId != 0)) {
+            throw new DdlException("not impossible");
+        }
         SinglePartitionDesc singlePartitionDesc = 
addPartitionClause.getSingeRangePartitionDesc();
         DistributionDesc distributionDesc = 
addPartitionClause.getDistributionDesc();
         boolean isTempPartition = addPartitionClause.isTempPartition();
@@ -1510,7 +1528,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                 if (singlePartitionDesc.isSetIfNotExists()) {
                     LOG.info("table[{}] add partition[{}] which already 
exists", olapTable.getName(), partitionName);
                     if 
(!DebugPointUtil.isEnable("InternalCatalog.addPartition.noCheckExists")) {
-                        return;
+                        return null;
                     }
                 } else {
                     
ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION, 
partitionName);
@@ -1660,17 +1678,10 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         DataProperty dataProperty = 
singlePartitionDesc.getPartitionDataProperty();
         Preconditions.checkNotNull(dataProperty);
         // check replica quota if this operation done
-        long indexNum = indexIdToMeta.size();
-        long bucketNum = distributionInfo.getBucketNum();
-        long replicaNum = 
singlePartitionDesc.getReplicaAlloc().getTotalReplicaNum();
-        long totalReplicaNum = indexNum * bucketNum * replicaNum;
-        if (Config.isNotCloudMode() && totalReplicaNum >= 
db.getReplicaQuotaLeftWithLock()) {
-            throw new DdlException("Database " + db.getFullName() + " table " 
+ tableName + " add partition increasing "
-                    + totalReplicaNum + " of replica exceeds quota[" + 
db.getReplicaQuota() + "]");
-        }
-        Set<Long> tabletIdSet = new HashSet<>();
-        long bufferSize = 1 + totalReplicaNum + indexNum * bucketNum;
+        long bufferSize = checkAndGetBufferSize(indexIdToMeta.size(), 
distributionInfo.getBucketNum(),
+                singlePartitionDesc.getReplicaAlloc().getTotalReplicaNum(), 
db, tableName);
         IdGeneratorBuffer idGeneratorBuffer = 
Env.getCurrentEnv().getIdGeneratorBuffer(bufferSize);
+        Set<Long> tabletIdSet = new HashSet<>();
         String storagePolicy = olapTable.getStoragePolicy();
         if (!Strings.isNullOrEmpty(dataProperty.getStoragePolicy())) {
             storagePolicy = dataProperty.getStoragePolicy();
@@ -1681,10 +1692,13 @@ public class InternalCatalog implements 
CatalogIf<Database> {
             }
         };
         try {
-            long partitionId = idGeneratorBuffer.getNextId();
+            long partitionId = Config.isCloudMode() && 
!FeConstants.runningUnitTest && isCreateTable
+                    ? generatedPartitionId : idGeneratorBuffer.getNextId();
             List<Long> partitionIds = Lists.newArrayList(partitionId);
-            List<Long> indexIds = 
indexIdToMeta.keySet().stream().collect(Collectors.toList());
-            beforeCreatePartitions(db.getId(), olapTable.getId(), 
partitionIds, indexIds);
+            List<Long> indexIds = new ArrayList<>(indexIdToMeta.keySet());
+            if (!isCreateTable) {
+                beforeCreatePartitions(db.getId(), olapTable.getId(), 
partitionIds, indexIds, isCreateTable);
+            }
             Partition partition = createPartitionWithIndices(db.getId(), 
olapTable,
                     partitionId, partitionName, indexIdToMeta,
                     distributionInfo, dataProperty, 
singlePartitionDesc.getReplicaAlloc(),
@@ -1693,7 +1707,6 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                     singlePartitionDesc.getTabletType(),
                     storagePolicy, idGeneratorBuffer,
                     binlogConfig, dataProperty.isStorageMediumSpecified(), 
null);
-            // TODO cluster key ids
 
             // check again
             olapTable = db.getOlapTableOrDdlException(tableName);
@@ -1705,7 +1718,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                     LOG.info("table[{}] add partition[{}] which already 
exists", olapTable.getName(), partitionName);
                     if (singlePartitionDesc.isSetIfNotExists()) {
                         failedCleanCallback.run();
-                        return;
+                        return null;
                     } else {
                         
ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION, 
partitionName);
                     }
@@ -1773,25 +1786,31 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                 PartitionPersistInfo info = null;
                 if (partitionInfo.getType() == PartitionType.RANGE) {
                     info = new PartitionPersistInfo(db.getId(), 
olapTable.getId(), partition,
-                            partitionInfo.getItem(partitionId).getItems(), 
ListPartitionItem.DUMMY_ITEM, dataProperty,
-                            partitionInfo.getReplicaAllocation(partitionId), 
partitionInfo.getIsInMemory(partitionId),
-                            isTempPartition, 
partitionInfo.getIsMutable(partitionId));
+                        partitionInfo.getItem(partitionId).getItems(), 
ListPartitionItem.DUMMY_ITEM, dataProperty,
+                        partitionInfo.getReplicaAllocation(partitionId), 
partitionInfo.getIsInMemory(partitionId),
+                        isTempPartition, 
partitionInfo.getIsMutable(partitionId));
                 } else if (partitionInfo.getType() == PartitionType.LIST) {
                     info = new PartitionPersistInfo(db.getId(), 
olapTable.getId(), partition,
-                            RangePartitionItem.DUMMY_RANGE, 
partitionInfo.getItem(partitionId), dataProperty,
-                            partitionInfo.getReplicaAllocation(partitionId), 
partitionInfo.getIsInMemory(partitionId),
-                            isTempPartition, 
partitionInfo.getIsMutable(partitionId));
+                        RangePartitionItem.DUMMY_RANGE, 
partitionInfo.getItem(partitionId), dataProperty,
+                        partitionInfo.getReplicaAllocation(partitionId), 
partitionInfo.getIsInMemory(partitionId),
+                        isTempPartition, 
partitionInfo.getIsMutable(partitionId));
                 } else {
                     info = new PartitionPersistInfo(db.getId(), 
olapTable.getId(), partition,
-                            RangePartitionItem.DUMMY_RANGE, 
ListPartitionItem.DUMMY_ITEM, dataProperty,
-                            partitionInfo.getReplicaAllocation(partitionId), 
partitionInfo.getIsInMemory(partitionId),
-                            isTempPartition, 
partitionInfo.getIsMutable(partitionId));
+                        RangePartitionItem.DUMMY_RANGE, 
ListPartitionItem.DUMMY_ITEM, dataProperty,
+                        partitionInfo.getReplicaAllocation(partitionId), 
partitionInfo.getIsInMemory(partitionId),
+                        isTempPartition, 
partitionInfo.getIsMutable(partitionId));
                 }
 
-                afterCreatePartitions(db.getId(), olapTable.getId(), 
partitionIds, indexIds, false);
-                Env.getCurrentEnv().getEditLog().logAddPartition(info);
-
-                LOG.info("succeed in creating partition[{}], temp: {}", 
partitionId, isTempPartition);
+                if (!isCreateTable) {
+                    afterCreatePartitions(db.getId(), olapTable.getId(), 
partitionIds, indexIds, isCreateTable);
+                }
+                if (writeEditLog) {
+                    Env.getCurrentEnv().getEditLog().logAddPartition(info);
+                    LOG.info("succeed in creating partition[{}], temp: {}", 
partitionId, isTempPartition);
+                } else {
+                    LOG.info("postpone creating partition[{}], temp: {}", 
partitionId, isTempPartition);
+                }
+                return info;
             } finally {
                 olapTable.writeUnlock();
             }
@@ -2144,11 +2163,12 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         return partition;
     }
 
-    protected void beforeCreatePartitions(long dbId, long tableId, List<Long> 
partitionIds, List<Long> indexIds)
+    public void beforeCreatePartitions(long dbId, long tableId, List<Long> 
partitionIds, List<Long> indexIds,
+                                          boolean isCreateTable)
             throws DdlException {
     }
 
-    protected void afterCreatePartitions(long dbId, long tableId, List<Long> 
partitionIds, List<Long> indexIds,
+    public void afterCreatePartitions(long dbId, long tableId, List<Long> 
partitionIds, List<Long> indexIds,
                                          boolean isCreateTable)
             throws DdlException {
     }
@@ -2827,7 +2847,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                             "Database " + db.getFullName() + " create 
unpartitioned table " + tableName + " increasing "
                                     + totalReplicaNum + " of replica exceeds 
quota[" + db.getReplicaQuota() + "]");
                 }
-                beforeCreatePartitions(db.getId(), olapTable.getId(), null, 
olapTable.getIndexIdList());
+                beforeCreatePartitions(db.getId(), olapTable.getId(), null, 
olapTable.getIndexIdList(), true);
                 Partition partition = createPartitionWithIndices(db.getId(), 
olapTable,
                         partitionId, partitionName,
                         olapTable.getIndexIdToMeta(), 
partitionDistributionInfo,
@@ -2891,7 +2911,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                                     + totalReplicaNum + " of replica exceeds 
quota[" + db.getReplicaQuota() + "]");
                 }
 
-                beforeCreatePartitions(db.getId(), olapTable.getId(), null, 
olapTable.getIndexIdList());
+                beforeCreatePartitions(db.getId(), olapTable.getId(), null, 
olapTable.getIndexIdList(), true);
 
                 // this is a 2-level partitioned tables
                 for (Map.Entry<String, Long> entry : 
partitionNameToId.entrySet()) {
@@ -3367,7 +3387,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
             }
 
             List<Long> indexIds = 
copiedTbl.getIndexIdToMeta().keySet().stream().collect(Collectors.toList());
-            beforeCreatePartitions(db.getId(), copiedTbl.getId(), 
newPartitionIds, indexIds);
+            beforeCreatePartitions(db.getId(), copiedTbl.getId(), 
newPartitionIds, indexIds, true);
 
             for (Map.Entry<String, Long> entry : origPartitions.entrySet()) {
                 // the new partition must use new id
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
index 6595afb70f7..3d77f42a1cc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
@@ -394,7 +394,8 @@ public class MTMVPartitionUtil {
 
         AddPartitionClause addPartitionClause = new 
AddPartitionClause(singlePartitionDesc,
                 mtmv.getDefaultDistributionInfo().toDistributionDesc(), 
partitionProperties, false);
-        Env.getCurrentEnv().addPartition((Database) mtmv.getDatabase(), 
mtmv.getName(), addPartitionClause);
+        Env.getCurrentEnv().addPartition((Database) mtmv.getDatabase(), 
mtmv.getName(), addPartitionClause,
+                false, 0, true);
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index ab92cbd0d63..17bc984b2cc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -3357,7 +3357,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         for (AddPartitionClause addPartitionClause : 
addPartitionClauseMap.values()) {
             try {
                 // here maybe check and limit created partitions num
-                Env.getCurrentEnv().addPartition(db, olapTable.getName(), 
addPartitionClause);
+                Env.getCurrentEnv().addPartition(db, olapTable.getName(), 
addPartitionClause, false, 0, true);
             } catch (DdlException e) {
                 LOG.warn(e);
                 errorStatus.setErrorMsgs(
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/alter/InternalSchemaAlterTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/alter/InternalSchemaAlterTest.java
index 27c00162d5f..5e7f5387f57 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/alter/InternalSchemaAlterTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/alter/InternalSchemaAlterTest.java
@@ -42,10 +42,10 @@ public class InternalSchemaAlterTest extends 
TestWithFeService {
 
     @Override
     protected void runBeforeAll() throws Exception {
-        InternalSchemaInitializer.createDb();
-        InternalSchemaInitializer.createTbl();
         Config.allow_replica_on_same_host = true;
         FeConstants.runningUnitTest = true;
+        InternalSchemaInitializer.createDb();
+        InternalSchemaInitializer.createTbl();
     }
 
     @Test
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java
index 24ab4d99cae..b4a63e5d344 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java
@@ -23,6 +23,7 @@ import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ExceptionChecker;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.utframe.UtFrameUtils;
 
@@ -48,6 +49,7 @@ public class CreateTableLikeTest {
 
     @BeforeClass
     public static void beforeClass() throws Exception {
+        FeConstants.runningUnitTest = true;
         UtFrameUtils.createDorisCluster(runningDir);
 
         // create connect context
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
index c4901ced10d..bdd5a66902c 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.ConfigBase;
 import org.apache.doris.common.ConfigException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ExceptionChecker;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.UserException;
 import org.apache.doris.utframe.TestWithFeService;
 
@@ -43,6 +44,7 @@ public class CreateTableTest extends TestWithFeService {
 
     @Override
     protected void runBeforeAll() throws Exception {
+        FeConstants.runningUnitTest = true;
         Config.allow_replica_on_same_host = true;
         createDatabase("test");
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java
index a0575eff548..e98e6f74545 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java
@@ -24,6 +24,7 @@ import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ExceptionChecker;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.DdlExecutor;
 import org.apache.doris.resource.Tag;
@@ -48,6 +49,7 @@ public class ModifyBackendTest {
 
     @BeforeClass
     public static void beforeClass() throws Exception {
+        FeConstants.runningUnitTest = true;
         UtFrameUtils.createDorisCluster(runningDir);
         // create connect context
         connectContext = UtFrameUtils.createDefaultCtx();
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/RecoverTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/RecoverTest.java
index a48652678e7..d3ec208a341 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/RecoverTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/RecoverTest.java
@@ -26,6 +26,7 @@ import org.apache.doris.analysis.RecoverDbStmt;
 import org.apache.doris.analysis.RecoverPartitionStmt;
 import org.apache.doris.analysis.RecoverTableStmt;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.utframe.UtFrameUtils;
 
@@ -45,6 +46,7 @@ public class RecoverTest {
 
     @BeforeClass
     public static void beforeClass() throws Exception {
+        FeConstants.runningUnitTest = true;
         UtFrameUtils.createDorisCluster(runningDir);
 
         // create connect context
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
index 22560492b40..632a062ed4a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
@@ -61,6 +61,7 @@ public class QueryPlanTest extends TestWithFeService {
 
     @Override
     protected void runBeforeAll() throws Exception {
+        FeConstants.runningUnitTest = true;
         // disable bucket shuffle join
         Deencapsulation.setField(connectContext.getSessionVariable(), 
"enableBucketShuffleJoin", false);
         connectContext.getSessionVariable().setEnableRuntimeFilterPrune(false);
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index d056c457a5b..f8797ea54e7 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -1423,6 +1423,28 @@ message GetRLTaskCommitAttachResponse {
     optional RLTaskTxnCommitAttachmentPB commit_attach = 2;
 }
 
+message CheckKeyInfos {
+    repeated int64 db_ids = 1;
+    repeated int64 table_ids = 2;
+    repeated int64 index_ids = 3;
+    repeated int64 partition_ids = 4;
+}
+
+message CheckKVRequest {
+    enum Operation {
+        CREATE_INDEX_AFTER_FE_COMMIT        = 1;
+        CREATE_PARTITION_AFTER_FE_COMMIT    = 2;
+    }
+    optional string cloud_unique_id = 1; // For auth
+    optional CheckKeyInfos check_keys = 2;
+    optional Operation op = 3; 
+}
+
+message CheckKVResponse {
+    optional MetaServiceResponseStatus status = 1;
+    optional CheckKeyInfos bad_keys = 2;
+}
+
 service MetaService {
     rpc begin_txn(BeginTxnRequest) returns (BeginTxnResponse);
     rpc precommit_txn(PrecommitTxnRequest) returns (PrecommitTxnResponse);
@@ -1491,6 +1513,9 @@ service MetaService {
 
     // routine load progress
     rpc get_rl_task_commit_attach(GetRLTaskCommitAttachRequest) returns 
(GetRLTaskCommitAttachResponse);
+
+    // check KV
+    rpc check_kv(CheckKVRequest) returns (CheckKVResponse);
 };
 
 service RecyclerService {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to