This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f4e2f78a1a [fix] Fix the bug that data balance causes tablet loss 
(#9971)
f4e2f78a1a is described below

commit f4e2f78a1a911b907ba8e73156a67dadb9193acc
Author: plat1ko <36853835+platon...@users.noreply.github.com>
AuthorDate: Wed Jun 15 09:52:56 2022 +0800

    [fix] Fix the bug that data balance causes tablet loss (#9971)
    
    1. Provide a FE conf to test the reliability in single replica case when 
tablet scheduling are frequent.
    2. According to #6063, almost apply this fix on current code.
---
 be/src/agent/task_worker_pool.cpp                  |  3 +-
 be/src/olap/base_tablet.h                          |  9 +-
 be/src/olap/snapshot_manager.cpp                   |  3 +-
 be/src/olap/snapshot_manager.h                     |  2 +-
 be/src/olap/tablet.cpp                             |  1 +
 be/src/olap/tablet_manager.cpp                     | 97 +++++++++++-----------
 be/src/olap/tablet_manager.h                       |  6 +-
 be/src/olap/tablet_meta.cpp                        | 10 ++-
 be/src/olap/tablet_meta.h                          | 11 ++-
 be/src/olap/task/engine_clone_task.cpp             | 10 ++-
 be/src/olap/task/engine_storage_migration_task.cpp |  3 +-
 be/src/runtime/snapshot_loader.cpp                 |  4 +-
 be/test/olap/cumulative_compaction_policy_test.cpp |  4 +-
 be/test/olap/delete_handler_test.cpp               | 22 ++---
 be/test/olap/delta_writer_test.cpp                 | 35 +++-----
 .../olap/engine_storage_migration_task_test.cpp    | 11 +--
 be/test/olap/tablet_meta_test.cpp                  |  2 +-
 be/test/olap/tablet_mgr_test.cpp                   |  8 +-
 be/test/olap/tablet_test.cpp                       |  2 +-
 be/test/olap/test_data/header_without_inc_rs.txt   |  3 +-
 .../java/org/apache/doris/alter/RollupJobV2.java   |  3 +-
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |  3 +-
 .../java/org/apache/doris/backup/RestoreJob.java   |  2 +-
 .../java/org/apache/doris/catalog/Catalog.java     |  3 +-
 .../org/apache/doris/catalog/PartitionInfo.java    |  2 +-
 .../org/apache/doris/clone/BeLoadRebalancer.java   |  3 +-
 .../apache/doris/clone/ClusterLoadStatistic.java   | 17 +++-
 .../org/apache/doris/clone/TabletSchedCtx.java     | 19 +++--
 .../org/apache/doris/clone/TabletScheduler.java    |  7 +-
 .../main/java/org/apache/doris/common/Config.java  | 10 +++
 .../doris/datasource/InternalDataSource.java       |  3 +-
 .../org/apache/doris/master/ReportHandler.java     |  7 +-
 .../main/java/org/apache/doris/task/CloneTask.java | 15 ++--
 .../org/apache/doris/task/CreateReplicaTask.java   |  8 +-
 .../org/apache/doris/task/DropReplicaTask.java     |  9 +-
 .../java/org/apache/doris/task/AgentTaskTest.java  |  8 +-
 gensrc/proto/olap_file.proto                       |  1 +
 gensrc/thrift/AgentService.thrift                  |  3 +
 gensrc/thrift/MasterService.thrift                 |  1 +
 gensrc/thrift/Types.thrift                         |  1 +
 40 files changed, 216 insertions(+), 155 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 0a30aad20a..c29a65cdc6 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -383,6 +383,7 @@ void 
TaskWorkerPool::_create_tablet_worker_thread_callback() {
             tablet_info.row_count = 0;
             tablet_info.data_size = 0;
             tablet_info.__set_path_hash(tablet->data_dir()->path_hash());
+            tablet_info.__set_replica_id(tablet->replica_id());
             finish_tablet_infos.push_back(tablet_info);
         }
 
@@ -428,7 +429,7 @@ void TaskWorkerPool::_drop_tablet_worker_thread_callback() {
                 drop_tablet_req.tablet_id, false, &err);
         if (dropped_tablet != nullptr) {
             Status drop_status = 
StorageEngine::instance()->tablet_manager()->drop_tablet(
-                    drop_tablet_req.tablet_id);
+                    drop_tablet_req.tablet_id, drop_tablet_req.replica_id);
             if (!drop_status.ok()) {
                 LOG(WARNING) << "drop table failed! signature: " << 
agent_task_req.signature;
                 error_msgs.push_back("drop table failed!");
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 7c97247958..4bf83324af 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -54,8 +54,9 @@ public:
     const std::string full_name() const;
     int64_t partition_id() const;
     int64_t tablet_id() const;
+    int64_t replica_id() const;
     int32_t schema_hash() const;
-    int16_t shard_id();
+    int16_t shard_id() const;
     bool equal(int64_t tablet_id, int32_t schema_hash);
 
     // properties encapsulated in TabletSchema
@@ -123,11 +124,15 @@ inline int64_t BaseTablet::tablet_id() const {
     return _tablet_meta->tablet_id();
 }
 
+inline int64_t BaseTablet::replica_id() const {
+    return _tablet_meta->replica_id();
+}
+
 inline int32_t BaseTablet::schema_hash() const {
     return _tablet_meta->schema_hash();
 }
 
-inline int16_t BaseTablet::shard_id() {
+inline int16_t BaseTablet::shard_id() const {
     return _tablet_meta->shard_id();
 }
 
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index b2d8a33d75..87ccef34ad 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -119,7 +119,7 @@ Status SnapshotManager::release_snapshot(const string& 
snapshot_path) {
 // For now, alpha and beta rowset meta have same fields, so we can just use
 // AlphaRowsetMeta here.
 Status SnapshotManager::convert_rowset_ids(const FilePathDesc& clone_dir_desc, 
int64_t tablet_id,
-                                           const int32_t& schema_hash) {
+                                           int64_t replica_id, const int32_t& 
schema_hash) {
     SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     Status res = Status::OK();
     // check clone dir existed
@@ -151,6 +151,7 @@ Status SnapshotManager::convert_rowset_ids(const 
FilePathDesc& clone_dir_desc, i
     // should modify tablet id and schema hash because in restore process the 
tablet id is not
     // equal to tablet id in meta
     new_tablet_meta_pb.set_tablet_id(tablet_id);
+    new_tablet_meta_pb.set_replica_id(replica_id);
     new_tablet_meta_pb.set_schema_hash(schema_hash);
     TabletSchema tablet_schema;
     tablet_schema.init_from_pb(new_tablet_meta_pb.schema());
diff --git a/be/src/olap/snapshot_manager.h b/be/src/olap/snapshot_manager.h
index 75c00180f5..aab134297f 100644
--- a/be/src/olap/snapshot_manager.h
+++ b/be/src/olap/snapshot_manager.h
@@ -61,7 +61,7 @@ public:
     static SnapshotManager* instance();
 
     Status convert_rowset_ids(const FilePathDesc& clone_dir_desc, int64_t 
tablet_id,
-                              const int32_t& schema_hash);
+                              int64_t replica_id, const int32_t& schema_hash);
 
 private:
     SnapshotManager() : _snapshot_base_id(0) {
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 275511786f..405e6acdc5 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1350,6 +1350,7 @@ void Tablet::build_tablet_report_info(TTabletInfo* 
tablet_info,
     tablet_info->__set_version_count(_tablet_meta->version_count());
     tablet_info->__set_path_hash(_data_dir->path_hash());
     
tablet_info->__set_is_in_memory(_tablet_meta->tablet_schema().is_in_memory());
+    tablet_info->__set_replica_id(replica_id());
 }
 
 // should use this method to get a copy of current tablet meta
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 0675754d29..e002b07cde 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -17,6 +17,7 @@
 
 #include "olap/tablet_manager.h"
 
+#include <gen_cpp/Types_types.h>
 #include <rapidjson/document.h>
 #include <re2/re2.h>
 #include <thrift/protocol/TDebugProtocol.h>
@@ -184,7 +185,8 @@ Status TabletManager::_add_tablet_to_map_unlocked(TTabletId 
tablet_id,
     if (drop_old) {
         // If the new tablet is fresher than the existing one, then replace
         // the existing tablet with the new one.
-        RETURN_NOT_OK_LOG(_drop_tablet_unlocked(tablet_id, keep_files),
+        // Use default replica_id to ignore whether replica_id is match when 
drop tablet.
+        RETURN_NOT_OK_LOG(_drop_tablet_unlocked(tablet_id, /* replica_id */ 0, 
keep_files),
                           strings::Substitute("failed to drop old tablet when 
add new tablet. "
                                               "tablet_id=$0",
                                               tablet_id));
@@ -368,7 +370,7 @@ TabletSharedPtr 
TabletManager::_internal_create_tablet_unlocked(
     }
     // something is wrong, we need clear environment
     if (is_tablet_added) {
-        Status status = _drop_tablet_unlocked(new_tablet_id, false);
+        Status status = _drop_tablet_unlocked(new_tablet_id, 
request.replica_id, false);
         if (!status.ok()) {
             LOG(WARNING) << "fail to drop tablet when create tablet failed. 
res=" << res;
         }
@@ -446,22 +448,21 @@ TabletSharedPtr 
TabletManager::_create_tablet_meta_and_dir_unlocked(
     return nullptr;
 }
 
-Status TabletManager::drop_tablet(TTabletId tablet_id, bool keep_files) {
-    std::lock_guard<std::shared_mutex> 
wrlock(_get_tablets_shard_lock(tablet_id));
+Status TabletManager::drop_tablet(TTabletId tablet_id, TReplicaId replica_id, 
bool keep_files) {
+    auto& shard = _get_tablets_shard(tablet_id);
+    std::lock_guard wrlock(shard.lock);
+    if (shard.tablets_under_clone.count(tablet_id) > 0) {
+        LOG(INFO) << "tablet " << tablet_id << " is under clone, skip drop 
task";
+        return Status::OK();
+    }
     SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
-    return _drop_tablet_unlocked(tablet_id, keep_files);
+    return _drop_tablet_unlocked(tablet_id, replica_id, keep_files);
 }
 
-// Drop specified tablet, the main logical is as follows:
-// 1. tablet not in schema change:
-//      drop specified tablet directly;
-// 2. tablet in schema change:
-//      a. schema change not finished && the dropping tablet is a base-tablet:
-//          base-tablet cannot be dropped;
-//      b. other cases:
-//          drop specified tablet directly and clear schema change info.
-Status TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, bool 
keep_files) {
-    LOG(INFO) << "begin drop tablet. tablet_id=" << tablet_id;
+// Drop specified tablet.
+Status TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, TReplicaId 
replica_id,
+                                            bool keep_files) {
+    LOG(INFO) << "begin drop tablet. tablet_id=" << tablet_id << ", 
replica_id=" << replica_id;
     DorisMetrics::instance()->drop_tablet_requests_total->increment(1);
 
     // Fetch tablet which need to be dropped
@@ -471,8 +472,39 @@ Status TabletManager::_drop_tablet_unlocked(TTabletId 
tablet_id, bool keep_files
                      << "tablet_id=" << tablet_id;
         return Status::OK();
     }
+    // We should compare replica id to avoid dropping new cloned tablet.
+    // Iff request replica id is 0, FE may be an older release, then we drop 
this tablet as before.
+    if (to_drop_tablet->replica_id() != replica_id && replica_id != 0) {
+        LOG(WARNING) << "fail to drop tablet because replica id not match. "
+                     << "tablet_id=" << tablet_id << ", replica_id=" << 
to_drop_tablet->replica_id()
+                     << ", request replica_id=" << replica_id;
+        return Status::OK();
+    }
 
-    return _drop_tablet_directly_unlocked(tablet_id, keep_files);
+    _remove_tablet_from_partition(to_drop_tablet);
+    tablet_map_t& tablet_map = _get_tablet_map(tablet_id);
+    tablet_map.erase(tablet_id);
+    if (!keep_files) {
+        // drop tablet will update tablet meta, should lock
+        std::lock_guard<std::shared_mutex> 
wrlock(to_drop_tablet->get_header_lock());
+        LOG(INFO) << "set tablet to shutdown state and remove it from memory. "
+                  << "tablet_id=" << tablet_id
+                  << ", tablet_path=" << 
to_drop_tablet->tablet_path_desc().filepath;
+        // NOTE: has to update tablet here, but must not update tablet meta 
directly.
+        // because other thread may hold the tablet object, they may save meta 
too.
+        // If update meta directly here, other thread may override the meta
+        // and the tablet will be loaded at restart time.
+        // To avoid this exception, we first set the state of the tablet to 
`SHUTDOWN`.
+        to_drop_tablet->set_tablet_state(TABLET_SHUTDOWN);
+        to_drop_tablet->save_meta();
+        {
+            std::lock_guard<std::shared_mutex> wrdlock(_shutdown_tablets_lock);
+            _shutdown_tablets.push_back(to_drop_tablet);
+        }
+    }
+
+    to_drop_tablet->deregister_tablet_from_dir();
+    return Status::OK();
 }
 
 Status TabletManager::drop_tablets_on_error_root_path(
@@ -1220,39 +1252,6 @@ Status TabletManager::_create_tablet_meta_unlocked(const 
TCreateTabletReq& reque
     return res;
 }
 
-Status TabletManager::_drop_tablet_directly_unlocked(TTabletId tablet_id, bool 
keep_files) {
-    TabletSharedPtr dropped_tablet = _get_tablet_unlocked(tablet_id);
-    if (dropped_tablet == nullptr) {
-        LOG(WARNING) << "fail to drop tablet because it does not exist. "
-                     << " tablet_id=" << tablet_id;
-        return Status::OLAPInternalError(OLAP_ERR_TABLE_NOT_FOUND);
-    }
-    _remove_tablet_from_partition(dropped_tablet);
-    tablet_map_t& tablet_map = _get_tablet_map(tablet_id);
-    tablet_map.erase(tablet_id);
-    if (!keep_files) {
-        // drop tablet will update tablet meta, should lock
-        std::lock_guard<std::shared_mutex> 
wrlock(dropped_tablet->get_header_lock());
-        LOG(INFO) << "set tablet to shutdown state and remove it from memory. "
-                  << "tablet_id=" << tablet_id
-                  << ", tablet_path=" << 
dropped_tablet->tablet_path_desc().filepath;
-        // NOTE: has to update tablet here, but must not update tablet meta 
directly.
-        // because other thread may hold the tablet object, they may save meta 
too.
-        // If update meta directly here, other thread may override the meta
-        // and the tablet will be loaded at restart time.
-        // To avoid this exception, we first set the state of the tablet to 
`SHUTDOWN`.
-        dropped_tablet->set_tablet_state(TABLET_SHUTDOWN);
-        dropped_tablet->save_meta();
-        {
-            std::lock_guard<std::shared_mutex> wrdlock(_shutdown_tablets_lock);
-            _shutdown_tablets.push_back(dropped_tablet);
-        }
-    }
-
-    dropped_tablet->deregister_tablet_from_dir();
-    return Status::OK();
-}
-
 TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id) {
     VLOG_NOTICE << "begin to get tablet. tablet_id=" << tablet_id;
     tablet_map_t& tablet_map = _get_tablet_map(tablet_id);
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 1627ab741e..1e2af26d99 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -66,7 +66,7 @@ public:
     // Return OLAP_SUCCESS, if run ok
     //        OLAP_ERR_TABLE_DELETE_NOEXIST_ERROR, if tablet not exist
     //        Status::OLAPInternalError(OLAP_ERR_NOT_INITED), if not inited
-    Status drop_tablet(TTabletId tablet_id, bool keep_files = false);
+    Status drop_tablet(TTabletId tablet_id, TReplicaId replica_id, bool 
keep_files = false);
 
     Status drop_tablets_on_error_root_path(const std::vector<TabletInfo>& 
tablet_info_vec);
 
@@ -157,9 +157,7 @@ private:
 
     bool _check_tablet_id_exist_unlocked(TTabletId tablet_id);
 
-    Status _drop_tablet_directly_unlocked(TTabletId tablet_id, bool keep_files 
= false);
-
-    Status _drop_tablet_unlocked(TTabletId tablet_id, bool keep_files);
+    Status _drop_tablet_unlocked(TTabletId tablet_id, TReplicaId replica_id, 
bool keep_files);
 
     TabletSharedPtr _get_tablet_unlocked(TTabletId tablet_id);
     TabletSharedPtr _get_tablet_unlocked(TTabletId tablet_id, bool 
include_deleted,
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 299d3051a2..6dfaa2c54d 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -39,7 +39,7 @@ Status TabletMeta::create(const TCreateTabletReq& request, 
const TabletUid& tabl
                           const unordered_map<uint32_t, uint32_t>& 
col_ordinal_to_unique_id,
                           TabletMetaSharedPtr* tablet_meta) {
     tablet_meta->reset(new TabletMeta(
-            request.table_id, request.partition_id, request.tablet_id,
+            request.table_id, request.partition_id, request.tablet_id, 
request.replica_id,
             request.tablet_schema.schema_hash, shard_id, 
request.tablet_schema, next_unique_id,
             col_ordinal_to_unique_id, tablet_uid,
             request.__isset.tablet_type ? request.tablet_type : 
TTabletType::TABLET_TYPE_DISK,
@@ -50,8 +50,8 @@ Status TabletMeta::create(const TCreateTabletReq& request, 
const TabletUid& tabl
 TabletMeta::TabletMeta() : _tablet_uid(0, 0), _schema(new TabletSchema) {}
 
 TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t 
tablet_id,
-                       int32_t schema_hash, uint64_t shard_id, const 
TTabletSchema& tablet_schema,
-                       uint32_t next_unique_id,
+                       int64_t replica_id, int32_t schema_hash, uint64_t 
shard_id,
+                       const TTabletSchema& tablet_schema, uint32_t 
next_unique_id,
                        const std::unordered_map<uint32_t, uint32_t>& 
col_ordinal_to_unique_id,
                        TabletUid tablet_uid, TTabletType::type tabletType,
                        TStorageMedium::type t_storage_medium, const 
std::string& storage_name,
@@ -61,6 +61,7 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t 
partition_id, int64_t tablet_id
     tablet_meta_pb.set_table_id(table_id);
     tablet_meta_pb.set_partition_id(partition_id);
     tablet_meta_pb.set_tablet_id(tablet_id);
+    tablet_meta_pb.set_replica_id(replica_id);
     tablet_meta_pb.set_schema_hash(schema_hash);
     tablet_meta_pb.set_shard_id(shard_id);
     // Persist the creation time, but it is not used
@@ -375,6 +376,7 @@ void TabletMeta::init_from_pb(const TabletMetaPB& 
tablet_meta_pb) {
     _table_id = tablet_meta_pb.table_id();
     _partition_id = tablet_meta_pb.partition_id();
     _tablet_id = tablet_meta_pb.tablet_id();
+    _replica_id = tablet_meta_pb.replica_id();
     _schema_hash = tablet_meta_pb.schema_hash();
     _shard_id = tablet_meta_pb.shard_id();
     _creation_time = tablet_meta_pb.creation_time();
@@ -443,6 +445,7 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
     tablet_meta_pb->set_table_id(table_id());
     tablet_meta_pb->set_partition_id(partition_id());
     tablet_meta_pb->set_tablet_id(tablet_id());
+    tablet_meta_pb->set_replica_id(replica_id());
     tablet_meta_pb->set_schema_hash(schema_hash());
     tablet_meta_pb->set_shard_id(shard_id());
     tablet_meta_pb->set_creation_time(creation_time());
@@ -683,6 +686,7 @@ bool operator==(const TabletMeta& a, const TabletMeta& b) {
     if (a._table_id != b._table_id) return false;
     if (a._partition_id != b._partition_id) return false;
     if (a._tablet_id != b._tablet_id) return false;
+    if (a._replica_id != b._replica_id) return false;
     if (a._schema_hash != b._schema_hash) return false;
     if (a._shard_id != b._shard_id) return false;
     if (a._creation_time != b._creation_time) return false;
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index c4166213d0..8624d04333 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -79,8 +79,9 @@ public:
     TabletMeta();
     // Only remote_storage_name is needed in meta, it is a key used to get 
remote params from fe.
     // The config of storage is saved in fe.
-    TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id, 
int32_t schema_hash,
-               uint64_t shard_id, const TTabletSchema& tablet_schema, uint32_t 
next_unique_id,
+    TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id, 
int64_t replica_id,
+               int32_t schema_hash, uint64_t shard_id, const TTabletSchema& 
tablet_schema,
+               uint32_t next_unique_id,
                const std::unordered_map<uint32_t, uint32_t>& 
col_ordinal_to_unique_id,
                TabletUid tablet_uid, TTabletType::type tabletType,
                TStorageMedium::type t_storage_medium, const std::string& 
remote_storage_name,
@@ -112,6 +113,7 @@ public:
     int64_t table_id() const;
     int64_t partition_id() const;
     int64_t tablet_id() const;
+    int64_t replica_id() const;
     int32_t schema_hash() const;
     int16_t shard_id() const;
     void set_shard_id(int32_t shard_id);
@@ -188,6 +190,7 @@ private:
     int64_t _table_id = 0;
     int64_t _partition_id = 0;
     int64_t _tablet_id = 0;
+    int64_t _replica_id = 0;
     int32_t _schema_hash = 0;
     int32_t _shard_id = 0;
     int64_t _creation_time = 0;
@@ -232,6 +235,10 @@ inline int64_t TabletMeta::tablet_id() const {
     return _tablet_id;
 }
 
+inline int64_t TabletMeta::replica_id() const {
+    return _replica_id;
+}
+
 inline int32_t TabletMeta::schema_hash() const {
     return _schema_hash;
 }
diff --git a/be/src/olap/task/engine_clone_task.cpp 
b/be/src/olap/task/engine_clone_task.cpp
index 04a7df7199..84dc1e4068 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -218,12 +218,14 @@ void EngineCloneTask::_set_tablet_info(Status status, 
bool is_new_tablet) {
     if (status.ok()) {
         TTabletInfo tablet_info;
         tablet_info.__set_tablet_id(_clone_req.tablet_id);
+        tablet_info.__set_replica_id(_clone_req.replica_id);
         tablet_info.__set_schema_hash(_clone_req.schema_hash);
         Status get_tablet_info_status =
                 
StorageEngine::instance()->tablet_manager()->report_tablet_info(&tablet_info);
         if (get_tablet_info_status != Status::OK()) {
             LOG(WARNING) << "clone success, but get tablet info failed."
                          << " tablet id: " << _clone_req.tablet_id
+                         << ", replica_id:" << _clone_req.replica_id
                          << " schema hash: " << _clone_req.schema_hash
                          << " signature: " << _signature;
             _error_msgs->push_back("clone success, but get tablet info 
failed.");
@@ -231,6 +233,7 @@ void EngineCloneTask::_set_tablet_info(Status status, bool 
is_new_tablet) {
         } else if (_clone_req.__isset.committed_version &&
                    tablet_info.version < _clone_req.committed_version) {
             LOG(WARNING) << "failed to clone tablet. tablet_id:" << 
_clone_req.tablet_id
+                         << ", replica_id:" << _clone_req.replica_id
                          << ", schema_hash:" << _clone_req.schema_hash
                          << ", signature:" << _signature << ", version:" << 
tablet_info.version
                          << ", expected_version: " << 
_clone_req.committed_version;
@@ -241,11 +244,12 @@ void EngineCloneTask::_set_tablet_info(Status status, 
bool is_new_tablet) {
                 // if not, maybe this is a stale remaining table which is 
waiting for drop.
                 // we drop it.
                 LOG(WARNING) << "begin to drop the stale tablet. tablet_id:" 
<< _clone_req.tablet_id
+                             << ", replica_id:" << _clone_req.replica_id
                              << ", schema_hash:" << _clone_req.schema_hash
                              << ", signature:" << _signature << ", version:" 
<< tablet_info.version
                              << ", expected_version: " << 
_clone_req.committed_version;
                 Status drop_status = 
StorageEngine::instance()->tablet_manager()->drop_tablet(
-                        _clone_req.tablet_id, _clone_req.schema_hash);
+                        _clone_req.tablet_id, _clone_req.replica_id);
                 if (drop_status != Status::OK() &&
                     drop_status.precise_code() != OLAP_ERR_TABLE_NOT_FOUND) {
                     // just log
@@ -332,10 +336,12 @@ Status 
EngineCloneTask::_make_and_download_snapshots(DataDir& data_dir,
         if (status.ok()) {
             // change all rowset ids because they maybe its id same with local 
rowset
             auto olap_st = SnapshotManager::instance()->convert_rowset_ids(
-                    local_path, _clone_req.tablet_id, _clone_req.schema_hash);
+                    local_path, _clone_req.tablet_id, _clone_req.replica_id,
+                    _clone_req.schema_hash);
             if (olap_st != Status::OK()) {
                 LOG(WARNING) << "fail to convert rowset ids, path=" << 
local_path
                              << ", tablet_id=" << _clone_req.tablet_id
+                             << ", replica_id=" << _clone_req.replica_id
                              << ", schema_hash=" << _clone_req.schema_hash << 
", error=" << olap_st;
                 status = Status::InternalError("Failed to convert rowset ids");
             }
diff --git a/be/src/olap/task/engine_storage_migration_task.cpp 
b/be/src/olap/task/engine_storage_migration_task.cpp
index 68c629ebb2..ff9acb8ddd 100644
--- a/be/src/olap/task/engine_storage_migration_task.cpp
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -141,7 +141,8 @@ Status 
EngineStorageMigrationTask::_gen_and_write_header_to_hdr_file(
     }
     // it will change rowset id and its create time
     // rowset create time is useful when load tablet from meta to check which 
tablet is the tablet to load
-    res = SnapshotManager::instance()->convert_rowset_ids(full_path, 
tablet_id, schema_hash);
+    res = SnapshotManager::instance()->convert_rowset_ids(full_path, tablet_id,
+                                                          
_tablet->replica_id(), schema_hash);
     if (!res.ok()) {
         LOG(WARNING) << "failed to convert rowset id when do storage migration"
                      << " path = " << full_path;
diff --git a/be/src/runtime/snapshot_loader.cpp 
b/be/src/runtime/snapshot_loader.cpp
index e8448a9c88..f86cdefd9b 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -413,8 +413,8 @@ Status SnapshotLoader::move(const std::string& 
snapshot_path, TabletSharedPtr ta
     }
 
     // rename the rowset ids and tabletid info in rowset meta
-    Status convert_status =
-            SnapshotManager::instance()->convert_rowset_ids(snapshot_path, 
tablet_id, schema_hash);
+    Status convert_status = SnapshotManager::instance()->convert_rowset_ids(
+            snapshot_path, tablet_id, tablet->replica_id(), schema_hash);
     if (convert_status != Status::OK()) {
         std::stringstream ss;
         ss << "failed to convert rowsetids in snapshot: " << snapshot_path
diff --git a/be/test/olap/cumulative_compaction_policy_test.cpp 
b/be/test/olap/cumulative_compaction_policy_test.cpp
index e30df7e343..76a5acaf36 100644
--- a/be/test/olap/cumulative_compaction_policy_test.cpp
+++ b/be/test/olap/cumulative_compaction_policy_test.cpp
@@ -32,7 +32,7 @@ public:
     TestNumBasedCumulativeCompactionPolicy() {}
     void SetUp() {
         _tablet_meta = static_cast<TabletMetaSharedPtr>(new TabletMeta(
-                1, 2, 15673, 4, 5, TTabletSchema(), 6, {{7, 8}}, UniqueId(9, 
10),
+                1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, 
UniqueId(9, 10),
                 TTabletType::TABLET_TYPE_DISK, TStorageMedium::HDD, "", 
TCompressionType::LZ4F));
 
         _json_rowset_meta = R"({
@@ -336,7 +336,7 @@ public:
         config::cumulative_size_based_compaction_lower_size_mbytes = 64;
 
         _tablet_meta = static_cast<TabletMetaSharedPtr>(new TabletMeta(
-                1, 2, 15673, 4, 5, TTabletSchema(), 6, {{7, 8}}, UniqueId(9, 
10),
+                1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, 
UniqueId(9, 10),
                 TTabletType::TABLET_TYPE_DISK, TStorageMedium::HDD, "", 
TCompressionType::LZ4F));
 
         _json_rowset_meta = R"({
diff --git a/be/test/olap/delete_handler_test.cpp 
b/be/test/olap/delete_handler_test.cpp
index d34fe7253d..57a0ebb8f7 100644
--- a/be/test/olap/delete_handler_test.cpp
+++ b/be/test/olap/delete_handler_test.cpp
@@ -261,16 +261,14 @@ protected:
         set_default_create_tablet_request(&_create_tablet);
         res = k_engine->create_tablet(_create_tablet);
         EXPECT_EQ(Status::OK(), res);
-        tablet = 
k_engine->tablet_manager()->get_tablet(_create_tablet.tablet_id,
-                                                        
_create_tablet.tablet_schema.schema_hash);
+        tablet = 
k_engine->tablet_manager()->get_tablet(_create_tablet.tablet_id);
         EXPECT_NE(tablet.get(), nullptr);
         _tablet_path = tablet->tablet_path_desc().filepath;
 
         set_create_duplicate_tablet_request(&_create_dup_tablet);
         res = k_engine->create_tablet(_create_dup_tablet);
         EXPECT_EQ(Status::OK(), res);
-        dup_tablet = k_engine->tablet_manager()->get_tablet(
-                _create_dup_tablet.tablet_id, 
_create_dup_tablet.tablet_schema.schema_hash);
+        dup_tablet = 
k_engine->tablet_manager()->get_tablet(_create_dup_tablet.tablet_id);
         EXPECT_TRUE(dup_tablet.get() != NULL);
         _dup_tablet_path = tablet->tablet_path_desc().filepath;
     }
@@ -279,8 +277,8 @@ protected:
         // Remove all dir.
         tablet.reset();
         dup_tablet.reset();
-        StorageEngine::instance()->tablet_manager()->drop_tablet(
-                _create_tablet.tablet_id, 
_create_tablet.tablet_schema.schema_hash);
+        
StorageEngine::instance()->tablet_manager()->drop_tablet(_create_tablet.tablet_id,
+                                                                 
_create_tablet.replica_id);
         EXPECT_TRUE(FileUtils::remove_all(config::storage_root_path).ok());
     }
 
@@ -438,8 +436,7 @@ protected:
         set_default_create_tablet_request(&_create_tablet);
         res = k_engine->create_tablet(_create_tablet);
         EXPECT_EQ(Status::OK(), res);
-        tablet = 
k_engine->tablet_manager()->get_tablet(_create_tablet.tablet_id,
-                                                        
_create_tablet.tablet_schema.schema_hash);
+        tablet = 
k_engine->tablet_manager()->get_tablet(_create_tablet.tablet_id);
         EXPECT_TRUE(tablet.get() != nullptr);
         _tablet_path = tablet->tablet_path_desc().filepath;
     }
@@ -448,7 +445,7 @@ protected:
         // Remove all dir.
         tablet.reset();
         k_engine->tablet_manager()->drop_tablet(_create_tablet.tablet_id,
-                                                
_create_tablet.tablet_schema.schema_hash);
+                                                _create_tablet.replica_id);
         EXPECT_TRUE(FileUtils::remove_all(config::storage_root_path).ok());
     }
 
@@ -813,8 +810,7 @@ protected:
         set_default_create_tablet_request(&_create_tablet);
         res = k_engine->create_tablet(_create_tablet);
         EXPECT_EQ(Status::OK(), res);
-        tablet = 
k_engine->tablet_manager()->get_tablet(_create_tablet.tablet_id,
-                                                        
_create_tablet.tablet_schema.schema_hash);
+        tablet = 
k_engine->tablet_manager()->get_tablet(_create_tablet.tablet_id);
         EXPECT_TRUE(tablet != nullptr);
         _tablet_path = tablet->tablet_path_desc().filepath;
 
@@ -826,8 +822,8 @@ protected:
         // Remove all dir.
         tablet.reset();
         _delete_handler.finalize();
-        StorageEngine::instance()->tablet_manager()->drop_tablet(
-                _create_tablet.tablet_id, 
_create_tablet.tablet_schema.schema_hash);
+        
StorageEngine::instance()->tablet_manager()->drop_tablet(_create_tablet.tablet_id,
+                                                                 
_create_tablet.replica_id);
         EXPECT_TRUE(FileUtils::remove_all(config::storage_root_path).ok());
     }
 
diff --git a/be/test/olap/delta_writer_test.cpp 
b/be/test/olap/delta_writer_test.cpp
index db9429893a..632086f891 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -381,10 +381,7 @@ TEST_F(TestDeltaWriter, open) {
     EXPECT_EQ(Status::OK(), res);
     SAFE_DELETE(delta_writer);
 
-    TDropTabletReq drop_request;
-    auto tablet_id = 10003;
-    auto schema_hash = 270068375;
-    res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash);
+    res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, 
request.replica_id);
     EXPECT_EQ(Status::OK(), res);
 }
 
@@ -480,8 +477,7 @@ TEST_F(TestDeltaWriter, write) {
     EXPECT_EQ(Status::OK(), res);
 
     // publish version success
-    TabletSharedPtr tablet =
-            k_engine->tablet_manager()->get_tablet(write_req.tablet_id, 
write_req.schema_hash);
+    TabletSharedPtr tablet = 
k_engine->tablet_manager()->get_tablet(write_req.tablet_id);
     OlapMeta* meta = tablet->data_dir()->get_meta();
     Version version;
     version.first = tablet->rowset_with_max_version()->end_version() + 1;
@@ -500,9 +496,7 @@ TEST_F(TestDeltaWriter, write) {
     }
     EXPECT_EQ(1, tablet->num_rows());
 
-    auto tablet_id = 10003;
-    auto schema_hash = 270068375;
-    res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash);
+    res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, 
request.replica_id);
     EXPECT_EQ(Status::OK(), res);
     delete delta_writer;
 }
@@ -614,8 +608,7 @@ TEST_F(TestDeltaWriter, vec_write) {
     ASSERT_TRUE(res.ok());
 
     // publish version success
-    TabletSharedPtr tablet =
-            k_engine->tablet_manager()->get_tablet(write_req.tablet_id, 
write_req.schema_hash);
+    TabletSharedPtr tablet = 
k_engine->tablet_manager()->get_tablet(write_req.tablet_id);
     std::cout << "before publish, tablet row nums:" << tablet->num_rows() << 
std::endl;
     OlapMeta* meta = tablet->data_dir()->get_meta();
     Version version;
@@ -639,11 +632,9 @@ TEST_F(TestDeltaWriter, vec_write) {
         res = tablet->add_inc_rowset(rowset);
         ASSERT_TRUE(res.ok());
     }
-    ASSERT_EQ(2, tablet->num_rows());
+    ASSERT_EQ(1, tablet->num_rows());
 
-    auto tablet_id = 10003;
-    auto schema_hash = 270068375;
-    res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash);
+    res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, 
request.replica_id);
     ASSERT_TRUE(res.ok());
     delete delta_writer;
 }
@@ -692,8 +683,7 @@ TEST_F(TestDeltaWriter, sequence_col) {
     EXPECT_EQ(Status::OK(), res);
 
     // publish version success
-    TabletSharedPtr tablet =
-            k_engine->tablet_manager()->get_tablet(write_req.tablet_id, 
write_req.schema_hash);
+    TabletSharedPtr tablet = 
k_engine->tablet_manager()->get_tablet(write_req.tablet_id);
     OlapMeta* meta = tablet->data_dir()->get_meta();
     Version version;
     version.first = tablet->rowset_with_max_version()->end_version() + 1;
@@ -712,9 +702,7 @@ TEST_F(TestDeltaWriter, sequence_col) {
     }
     EXPECT_EQ(1, tablet->num_rows());
 
-    auto tablet_id = 10005;
-    auto schema_hash = 270068377;
-    res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash);
+    res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, 
request.replica_id);
     EXPECT_EQ(Status::OK(), res);
     delete delta_writer;
 }
@@ -777,8 +765,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
     ASSERT_TRUE(res.ok());
 
     // publish version success
-    TabletSharedPtr tablet =
-            k_engine->tablet_manager()->get_tablet(write_req.tablet_id, 
write_req.schema_hash);
+    TabletSharedPtr tablet = 
k_engine->tablet_manager()->get_tablet(write_req.tablet_id);
     std::cout << "before publish, tablet row nums:" << tablet->num_rows() << 
std::endl;
     OlapMeta* meta = tablet->data_dir()->get_meta();
     Version version;
@@ -804,9 +791,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
     }
     ASSERT_EQ(1, tablet->num_rows());
 
-    auto tablet_id = 10005;
-    auto schema_hash = 270068377;
-    res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash);
+    res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, 
request.replica_id);
     ASSERT_TRUE(res.ok());
     delete delta_writer;
 }
diff --git a/be/test/olap/engine_storage_migration_task_test.cpp 
b/be/test/olap/engine_storage_migration_task_test.cpp
index 608d4b6021..36b95a8aa3 100644
--- a/be/test/olap/engine_storage_migration_task_test.cpp
+++ b/be/test/olap/engine_storage_migration_task_test.cpp
@@ -198,8 +198,7 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) 
{
     EXPECT_EQ(Status::OK(), res);
 
     // publish version success
-    TabletSharedPtr tablet =
-            k_engine->tablet_manager()->get_tablet(write_req.tablet_id, 
write_req.schema_hash);
+    TabletSharedPtr tablet = 
k_engine->tablet_manager()->get_tablet(write_req.tablet_id);
     OlapMeta* meta = tablet->data_dir()->get_meta();
     Version version;
     version.first = tablet->rowset_with_max_version()->end_version() + 1;
@@ -234,9 +233,7 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) 
{
     res = engine_task.execute();
     EXPECT_EQ(Status::OK(), res);
     // reget the tablet from manager after migration
-    auto tablet_id = 10005;
-    auto schema_hash = 270068377;
-    TabletSharedPtr tablet2 = 
k_engine->tablet_manager()->get_tablet(tablet_id, schema_hash);
+    TabletSharedPtr tablet2 = 
k_engine->tablet_manager()->get_tablet(request.tablet_id);
     // check path
     EXPECT_EQ(tablet2->data_dir()->path(), dest_store->path());
     // check rows
@@ -254,7 +251,7 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) 
{
     EngineStorageMigrationTask engine_task2(tablet2, dest_store);
     res = engine_task2.execute();
     EXPECT_EQ(Status::OK(), res);
-    TabletSharedPtr tablet3 = 
k_engine->tablet_manager()->get_tablet(tablet_id, schema_hash);
+    TabletSharedPtr tablet3 = 
k_engine->tablet_manager()->get_tablet(request.tablet_id);
     // check path
     EXPECT_EQ(tablet3->data_dir()->path(), tablet->data_dir()->path());
     // check rows
@@ -264,7 +261,7 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) 
{
     EXPECT_NE(tablet3, tablet);
     // test case 2 end
 
-    res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash);
+    res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, 
request.replica_id);
     EXPECT_EQ(Status::OK(), res);
     delete delta_writer;
 }
diff --git a/be/test/olap/tablet_meta_test.cpp 
b/be/test/olap/tablet_meta_test.cpp
index f728616922..67f9f95fa4 100644
--- a/be/test/olap/tablet_meta_test.cpp
+++ b/be/test/olap/tablet_meta_test.cpp
@@ -26,7 +26,7 @@ namespace doris {
 TEST(TabletMetaTest, SaveAndParse) {
     std::string meta_path = "./be/test/olap/test_data/tablet_meta_test.hdr";
 
-    TabletMeta old_tablet_meta(1, 2, 3, 4, 5, TTabletSchema(), 6, {{7, 8}}, 
UniqueId(9, 10),
+    TabletMeta old_tablet_meta(1, 2, 3, 3, 4, 5, TTabletSchema(), 6, {{7, 8}}, 
UniqueId(9, 10),
                                TTabletType::TABLET_TYPE_DISK, 
TStorageMedium::HDD, "",
                                TCompressionType::LZ4F);
     EXPECT_EQ(Status::OK(), old_tablet_meta.save(meta_path));
diff --git a/be/test/olap/tablet_mgr_test.cpp b/be/test/olap/tablet_mgr_test.cpp
index 840e5968ce..6ac994e7cd 100644
--- a/be/test/olap/tablet_mgr_test.cpp
+++ b/be/test/olap/tablet_mgr_test.cpp
@@ -119,7 +119,7 @@ TEST_F(TabletMgrTest, CreateTablet) {
     create_st = _tablet_mgr->create_tablet(create_tablet_req, data_dirs);
     EXPECT_TRUE(create_st == Status::OK());
 
-    Status drop_st = _tablet_mgr->drop_tablet(111, false);
+    Status drop_st = _tablet_mgr->drop_tablet(111, 
create_tablet_req.replica_id);
     EXPECT_TRUE(drop_st == Status::OK());
     tablet.reset();
     Status trash_st = _tablet_mgr->start_trash_sweep();
@@ -174,7 +174,7 @@ TEST_F(TabletMgrTest, CreateTabletWithSequence) {
     Status check_meta_st = TabletMetaManager::get_meta(_data_dir, 111, 3333, 
new_tablet_meta);
     EXPECT_TRUE(check_meta_st == Status::OK());
 
-    Status drop_st = _tablet_mgr->drop_tablet(111, false);
+    Status drop_st = _tablet_mgr->drop_tablet(111, 
create_tablet_req.replica_id);
     EXPECT_TRUE(drop_st == Status::OK());
     tablet.reset();
     Status trash_st = _tablet_mgr->start_trash_sweep();
@@ -208,13 +208,13 @@ TEST_F(TabletMgrTest, DropTablet) {
     EXPECT_TRUE(tablet != nullptr);
 
     // drop unexist tablet will be success
-    Status drop_st = _tablet_mgr->drop_tablet(1121, false);
+    Status drop_st = _tablet_mgr->drop_tablet(1121, 
create_tablet_req.replica_id);
     EXPECT_TRUE(drop_st == Status::OK());
     tablet = _tablet_mgr->get_tablet(111);
     EXPECT_TRUE(tablet != nullptr);
 
     // drop exist tablet will be success
-    drop_st = _tablet_mgr->drop_tablet(111, false);
+    drop_st = _tablet_mgr->drop_tablet(111, create_tablet_req.replica_id);
     EXPECT_TRUE(drop_st == Status::OK());
     tablet = _tablet_mgr->get_tablet(111);
     EXPECT_TRUE(tablet == nullptr);
diff --git a/be/test/olap/tablet_test.cpp b/be/test/olap/tablet_test.cpp
index d5883ad708..9d6d36c42e 100644
--- a/be/test/olap/tablet_test.cpp
+++ b/be/test/olap/tablet_test.cpp
@@ -36,7 +36,7 @@ public:
 
     virtual void SetUp() {
         _tablet_meta = static_cast<TabletMetaSharedPtr>(new TabletMeta(
-                1, 2, 15673, 4, 5, TTabletSchema(), 6, {{7, 8}}, UniqueId(9, 
10),
+                1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, 
UniqueId(9, 10),
                 TTabletType::TABLET_TYPE_DISK, TStorageMedium::HDD, "", 
TCompressionType::LZ4F));
         _json_rowset_meta = R"({
             "rowset_id": 540081,
diff --git a/be/test/olap/test_data/header_without_inc_rs.txt 
b/be/test/olap/test_data/header_without_inc_rs.txt
index 03cef06a3d..12c1f81a0e 100644
--- a/be/test/olap/test_data/header_without_inc_rs.txt
+++ b/be/test/olap/test_data/header_without_inc_rs.txt
@@ -144,5 +144,6 @@
     "preferred_rowset_type": "BETA_ROWSET",
     "tablet_type": "TABLET_TYPE_DISK",
     "storage_medium": "HDD",
-    "remote_storage_name": ""
+    "remote_storage_name": "",
+    "replica_id": 0
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index 45acf8345c..9a66f10772 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -219,13 +219,14 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
                     List<Replica> rollupReplicas = rollupTablet.getReplicas();
                     for (Replica rollupReplica : rollupReplicas) {
                         long backendId = rollupReplica.getBackendId();
+                        long rollupReplicaId = rollupReplica.getId();
                         
Preconditions.checkNotNull(tabletIdMap.get(rollupTabletId)); // baseTabletId
                         countDownLatch.addMark(backendId, rollupTabletId);
                         // create replica with version 1.
                         // version will be updated by following load process, 
or when rollup task finished.
                         CreateReplicaTask createReplicaTask = new 
CreateReplicaTask(
                                 backendId, dbId, tableId, partitionId, 
rollupIndexId, rollupTabletId,
-                                rollupShortKeyColumnCount, rollupSchemaHash,
+                                rollupReplicaId, rollupShortKeyColumnCount, 
rollupSchemaHash,
                                 Partition.PARTITION_INIT_VERSION,
                                 rollupKeysType, TStorageType.COLUMN, 
storageMedium,
                                 rollupSchema, tbl.getCopiedBfColumns(), 
tbl.getBfFpp(), countDownLatch,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index d73f22617d..411ab275a5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -245,10 +245,11 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                         List<Replica> shadowReplicas = 
shadowTablet.getReplicas();
                         for (Replica shadowReplica : shadowReplicas) {
                             long backendId = shadowReplica.getBackendId();
+                            long shadowReplicaId = shadowReplica.getId();
                             countDownLatch.addMark(backendId, shadowTabletId);
                             CreateReplicaTask createReplicaTask = new 
CreateReplicaTask(
                                     backendId, dbId, tableId, partitionId, 
shadowIdxId, shadowTabletId,
-                                    shadowShortKeyColumnCount, 
shadowSchemaHash,
+                                    shadowReplicaId, 
shadowShortKeyColumnCount, shadowSchemaHash,
                                     Partition.PARTITION_INIT_VERSION,
                                     originKeysType, TStorageType.COLUMN, 
storageMedium,
                                     shadowSchema, bfColumns, bfFpp, 
countDownLatch, indexes,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 922b314a21..df90c9d9dd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -957,7 +957,7 @@ public class RestoreJob extends AbstractJob {
                     
Catalog.getCurrentInvertedIndex().addReplica(restoreTablet.getId(), 
restoreReplica);
                     CreateReplicaTask task = new 
CreateReplicaTask(restoreReplica.getBackendId(), dbId,
                             localTbl.getId(), restorePart.getId(), 
restoredIdx.getId(),
-                            restoreTablet.getId(), 
indexMeta.getShortKeyColumnCount(),
+                            restoreTablet.getId(), restoreReplica.getId(), 
indexMeta.getShortKeyColumnCount(),
                             indexMeta.getSchemaHash(), 
restoreReplica.getVersion(),
                             indexMeta.getKeysType(), TStorageType.COLUMN,
                             TStorageMedium.HDD /* all restored replicas will 
be saved to HDD */,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 7050344beb..9cf25f2af2 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -4807,7 +4807,8 @@ public class Catalog {
                         List<Replica> replicas = tablet.getReplicas();
                         for (Replica replica : replicas) {
                             long backendId = replica.getBackendId();
-                            DropReplicaTask dropTask = new 
DropReplicaTask(backendId, tabletId, schemaHash);
+                            long replicaId = replica.getId();
+                            DropReplicaTask dropTask = new 
DropReplicaTask(backendId, tabletId, replicaId, schemaHash);
                             batchTask.addTask(dropTask);
                         } // end for replicas
                     } // end for tablets
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
index 41a37240f3..bee4c382a0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
@@ -125,7 +125,7 @@ public class PartitionInfo implements Writable {
     }
 
     public PartitionItem handleNewSinglePartitionDesc(SinglePartitionDesc desc,
-                                              long partitionId, boolean 
isTemp) throws DdlException {
+                                                      long partitionId, 
boolean isTemp) throws DdlException {
         Preconditions.checkArgument(desc.isAnalyzed());
         PartitionItem partitionItem = createAndCheckPartitionItem(desc, 
isTemp);
         setItemInternal(partitionId, isTemp, partitionItem);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
index 89a827cd87..fb55091b35 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
@@ -25,6 +25,7 @@ import org.apache.doris.catalog.TabletMeta;
 import org.apache.doris.clone.SchedException.Status;
 import org.apache.doris.clone.TabletSchedCtx.Priority;
 import org.apache.doris.clone.TabletScheduler.PathSlot;
+import org.apache.doris.common.Config;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TStorageMedium;
@@ -282,7 +283,7 @@ public class BeLoadRebalancer extends Rebalancer {
                     continue;
                 }
 
-                if (!clusterStat.isMoreBalanced(tabletCtx.getSrcBackendId(), 
beStat.getBeId(),
+                if (!Config.be_rebalancer_fuzzy_test && 
!clusterStat.isMoreBalanced(tabletCtx.getSrcBackendId(), beStat.getBeId(),
                         tabletCtx.getTabletId(), tabletCtx.getTabletSize(), 
tabletCtx.getStorageMedium())) {
                     continue;
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java
index 34d6536c24..a407347de3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java
@@ -168,7 +168,7 @@ public class ClusterLoadStatistic {
                 continue;
             }
 
-            if (Math.abs(beStat.getLoadScore(medium) - avgLoadScore) / 
avgLoadScore > Config.balance_load_score_threshold) {
+            if (Config.be_rebalancer_fuzzy_test) {
                 if (beStat.getLoadScore(medium) > avgLoadScore) {
                     beStat.setClazz(medium, Classification.HIGH);
                     highCounter++;
@@ -177,8 +177,19 @@ public class ClusterLoadStatistic {
                     lowCounter++;
                 }
             } else {
-                beStat.setClazz(medium, Classification.MID);
-                midCounter++;
+                if (Math.abs(beStat.getLoadScore(medium) - avgLoadScore) / 
avgLoadScore
+                        > Config.balance_load_score_threshold) {
+                    if (beStat.getLoadScore(medium) > avgLoadScore) {
+                        beStat.setClazz(medium, Classification.HIGH);
+                        highCounter++;
+                    } else if (beStat.getLoadScore(medium) < avgLoadScore) {
+                        beStat.setClazz(medium, Classification.LOW);
+                        lowCounter++;
+                    }
+                } else {
+                    beStat.setClazz(medium, Classification.MID);
+                    midCounter++;
+                }
             }
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index 6bc3947cc8..d6e8eeeed9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -813,7 +813,7 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
         Backend destBe = infoService.getBackend(destBackendId);
         if (destBe == null) {
             throw new SchedException(Status.SCHEDULE_FAILED,
-                "dest backend " + srcReplica.getBackendId() + " does not 
exist");
+                "dest backend " + destBackendId + " does not exist");
         }
 
         taskTimeoutMs = getApproximateTimeoutMs();
@@ -828,11 +828,6 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
         // another clone task.
         // That is, we may need to use 2 clone tasks to create a new replica. 
It is inefficient,
         // but there is no other way now.
-        TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(), 
srcBe.getHttpPort());
-        cloneTask = new CloneTask(destBackendId, dbId, tblId, partitionId, 
indexId,
-                tabletId, schemaHash, Lists.newArrayList(tSrcBe), 
storageMedium,
-                visibleVersion, (int) (taskTimeoutMs / 1000));
-        cloneTask.setPathHash(srcPathHash, destPathHash);
 
         // if this is a balance task, or this is a repair task with 
REPLICA_MISSING/REPLICA_RELOCATING or REPLICA_MISSING_IN_CLUSTER,
         // we create a new replica with state CLONE
@@ -847,6 +842,12 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
                     committedVersion, /* use committed version as last failed 
version */
                     -1 /* last success version */);
 
+            TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(), 
srcBe.getHttpPort());
+            cloneTask = new CloneTask(destBackendId, dbId, tblId, partitionId, 
indexId,
+                tabletId, cloneReplica.getId(), schemaHash, 
Lists.newArrayList(tSrcBe), storageMedium,
+                visibleVersion, (int) (taskTimeoutMs / 1000));
+            cloneTask.setPathHash(srcPathHash, destPathHash);
+
             // addReplica() method will add this replica to tablet inverted 
index too.
             tablet.addReplica(cloneReplica);
         } else if (tabletStatus == TabletStatus.VERSION_INCOMPLETE) {
@@ -861,6 +862,12 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
                 throw new SchedException(Status.SCHEDULE_FAILED, "dest 
replica's path hash is changed. "
                         + "current: " + replica.getPathHash() + ", scheduled: 
" + destPathHash);
             }
+
+            TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(), 
srcBe.getHttpPort());
+            cloneTask = new CloneTask(destBackendId, dbId, tblId, partitionId, 
indexId,
+                tabletId, replica.getId(), schemaHash, 
Lists.newArrayList(tSrcBe), storageMedium,
+                visibleVersion, (int) (taskTimeoutMs / 1000));
+            cloneTask.setPathHash(srcPathHash, destPathHash);
         }
 
         this.state = State.RUNNING;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index a6c4501c6f..166fdc17a4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -1135,7 +1135,8 @@ public class TabletScheduler extends MasterDaemon {
             // NOTICE: only delete the replica from meta may not work. 
sometimes we can depend on tablet report
             // deleting these replicas, but in FORCE_REDUNDANT case, replica 
may be added to meta again in report
             // process.
-            sendDeleteReplicaTask(replica.getBackendId(), 
tabletCtx.getTabletId(), tabletCtx.getSchemaHash());
+            sendDeleteReplicaTask(replica.getBackendId(), 
tabletCtx.getTabletId(), replica.getId(),
+                    tabletCtx.getSchemaHash());
         }
 
         // write edit log
@@ -1152,8 +1153,8 @@ public class TabletScheduler extends MasterDaemon {
                 tabletCtx.getTabletId(), replica.getBackendId(), reason, 
force);
     }
 
-    private void sendDeleteReplicaTask(long backendId, long tabletId, int 
schemaHash) {
-        DropReplicaTask task = new DropReplicaTask(backendId, tabletId, 
schemaHash);
+    private void sendDeleteReplicaTask(long backendId, long tabletId, long 
replicaId, int schemaHash) {
+        DropReplicaTask task = new DropReplicaTask(backendId, tabletId, 
replicaId, schemaHash);
         AgentBatchTask batchTask = new AgentBatchTask();
         batchTask.addTask(task);
         AgentTaskExecutor.submit(batchTask);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 65e6598d15..e5f109686d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1648,4 +1648,14 @@ public class Config extends ConfigBase {
     @ConfField(mutable = false, masterOnly = true)
     public static boolean enable_multi_catalog = false; // 1 min
 
+    /**
+     * If set to TRUE, FE will: 
+     * 1. divide BE into high load and low load(no mid load) to force 
triggering tablet scheduling;
+     * 2. ignore whether the cluster can be more balanced during tablet 
scheduling;
+     *
+     * It's used to test the reliability in single replica case when tablet 
scheduling are frequent. 
+     * Default is false.
+     */    
+    @ConfField(mutable = false, masterOnly = true)
+    public static boolean be_rebalancer_fuzzy_test = false;
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
index dcacc806a1..579715e46c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
@@ -1638,9 +1638,10 @@ public class InternalDataSource implements DataSourceIf {
                 long tabletId = tablet.getId();
                 for (Replica replica : tablet.getReplicas()) {
                     long backendId = replica.getBackendId();
+                    long replicaId = replica.getId();
                     countDownLatch.addMark(backendId, tabletId);
                     CreateReplicaTask task =
-                            new CreateReplicaTask(backendId, dbId, tableId, 
partitionId, indexId, tabletId,
+                            new CreateReplicaTask(backendId, dbId, tableId, 
partitionId, indexId, tabletId, replicaId,
                                     shortKeyColumnCount, schemaHash, version, 
keysType, storageType, storageMedium,
                                     schema, bfColumns, bfFpp, countDownLatch, 
indexes, isInMemory, tabletType,
                                     dataSortInfo, compressionType);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 3649b2bae8..7fb54271d1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -594,7 +594,8 @@ public class ReportHandler extends Daemon {
                                     Set<String> bfColumns = 
olapTable.getCopiedBfColumns();
                                     double bfFpp = olapTable.getBfFpp();
                                     CreateReplicaTask createReplicaTask = new 
CreateReplicaTask(backendId, dbId,
-                                            tableId, partitionId, indexId, 
tabletId, indexMeta.getShortKeyColumnCount(),
+                                            tableId, partitionId, indexId, 
tabletId, replica.getId(),
+                                            indexMeta.getShortKeyColumnCount(),
                                             indexMeta.getSchemaHash(), 
partition.getVisibleVersion(),
                                             indexMeta.getKeysType(),
                                             TStorageType.COLUMN,
@@ -692,7 +693,9 @@ public class ReportHandler extends Daemon {
 
             if (needDelete) {
                 // drop replica
-                DropReplicaTask task = new DropReplicaTask(backendId, 
tabletId, backendTabletInfo.getSchemaHash());
+                long replicaId = backendTabletInfo.getReplicaId();
+                DropReplicaTask task = new DropReplicaTask(backendId, 
tabletId, replicaId,
+                        backendTabletInfo.getSchemaHash());
                 batchTask.addTask(task);
                 LOG.warn("delete tablet[" + tabletId + "] from backend[" + 
backendId + "] because not found in meta");
                 ++deleteFromBackendCounter;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CloneTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/CloneTask.java
index 8ef9d8eca0..d17affa8c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/CloneTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CloneTask.java
@@ -30,6 +30,7 @@ public class CloneTask extends AgentTask {
     public static final int VERSION_2 = 2;
 
     private int schemaHash;
+    private long replicaId;
     private List<TBackend> srcBackends;
     private TStorageMedium storageMedium;
 
@@ -42,10 +43,11 @@ public class CloneTask extends AgentTask {
 
     private int taskVersion = VERSION_1;
 
-    public CloneTask(long backendId, long dbId, long tableId, long 
partitionId, long indexId,
-                     long tabletId, int schemaHash, List<TBackend> 
srcBackends, TStorageMedium storageMedium,
-                     long visibleVersion, int timeoutS) {
+    public CloneTask(long backendId, long dbId, long tableId, long 
partitionId, long indexId, long tabletId,
+            long replicaId, int schemaHash, List<TBackend> srcBackends, 
TStorageMedium storageMedium,
+            long visibleVersion, int timeoutS) {
         super(null, backendId, TTaskType.CLONE, dbId, tableId, partitionId, 
indexId, tabletId);
+        this.replicaId = replicaId;
         this.schemaHash = schemaHash;
         this.srcBackends = srcBackends;
         this.storageMedium = storageMedium;
@@ -77,6 +79,7 @@ public class CloneTask extends AgentTask {
 
     public TCloneReq toThrift() {
         TCloneReq request = new TCloneReq(tabletId, schemaHash, srcBackends);
+        request.setReplicaId(replicaId);
         request.setStorageMedium(storageMedium);
         request.setCommittedVersion(visibleVersion);
         request.setTaskVersion(taskVersion);
@@ -92,10 +95,12 @@ public class CloneTask extends AgentTask {
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
-        sb.append("tablet id: ").append(tabletId).append(", schema hash: 
").append(schemaHash);
+        sb.append("tablet id: ").append(tabletId).append(", replica id: 
").append(replicaId).append(", schema hash: ")
+                .append(schemaHash);
         sb.append(", storageMedium: ").append(storageMedium.name());
         sb.append(", visible version(hash): ").append(visibleVersion);
-        sb.append(", src backend: 
").append(srcBackends.get(0).getHost()).append(", src path hash: 
").append(srcPathHash);
+        sb.append(", src backend: 
").append(srcBackends.get(0).getHost()).append(", src path hash: ")
+                .append(srcPathHash);
         sb.append(", dest backend: ").append(backendId).append(", dest path 
hash: ").append(destPathHash);
         return sb.toString();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
index 8718dd7663..0e45b86cf2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
@@ -47,6 +47,7 @@ import java.util.Set;
 public class CreateReplicaTask extends AgentTask {
     private static final Logger LOG = 
LogManager.getLogger(CreateReplicaTask.class);
 
+    private long replicaId;
     private short shortKeyColumnCount;
     private int schemaHash;
 
@@ -89,7 +90,7 @@ public class CreateReplicaTask extends AgentTask {
     private DataSortInfo dataSortInfo;
 
     public CreateReplicaTask(long backendId, long dbId, long tableId, long 
partitionId, long indexId, long tabletId,
-                             short shortKeyColumnCount, int schemaHash, long 
version,
+                             long replicaId, short shortKeyColumnCount, int 
schemaHash, long version,
                              KeysType keysType, TStorageType storageType,
                              TStorageMedium storageMedium, List<Column> 
columns,
                              Set<String> bfColumns, double bfFpp, 
MarkedCountDownLatch<Long, Long> latch,
@@ -98,6 +99,7 @@ public class CreateReplicaTask extends AgentTask {
                              TTabletType tabletType, TCompressionType 
compressionType) {
         super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, 
indexId, tabletId);
 
+        this.replicaId = replicaId;
         this.shortKeyColumnCount = shortKeyColumnCount;
         this.schemaHash = schemaHash;
 
@@ -121,7 +123,7 @@ public class CreateReplicaTask extends AgentTask {
     }
 
     public CreateReplicaTask(long backendId, long dbId, long tableId, long 
partitionId, long indexId, long tabletId,
-                             short shortKeyColumnCount, int schemaHash, long 
version,
+                             long replicaId, short shortKeyColumnCount, int 
schemaHash, long version,
                              KeysType keysType, TStorageType storageType,
                              TStorageMedium storageMedium, List<Column> 
columns,
                              Set<String> bfColumns, double bfFpp, 
MarkedCountDownLatch<Long, Long> latch,
@@ -132,6 +134,7 @@ public class CreateReplicaTask extends AgentTask {
                              TCompressionType compressionType) {
         super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, 
indexId, tabletId);
 
+        this.replicaId = replicaId;
         this.shortKeyColumnCount = shortKeyColumnCount;
         this.schemaHash = schemaHash;
 
@@ -261,6 +264,7 @@ public class CreateReplicaTask extends AgentTask {
         }
         createTabletReq.setTableId(tableId);
         createTabletReq.setPartitionId(partitionId);
+        createTabletReq.setReplicaId(replicaId);
 
         if (baseTabletId != -1) {
             createTabletReq.setBaseTabletId(baseTabletId);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/DropReplicaTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/DropReplicaTask.java
index 41c3291dca..18643acd6c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/DropReplicaTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/DropReplicaTask.java
@@ -22,10 +22,12 @@ import org.apache.doris.thrift.TTaskType;
 
 public class DropReplicaTask extends AgentTask {
     private int schemaHash; // set -1L as unknown
+    private long replicaId;
 
-    public DropReplicaTask(long backendId, long tabletId, int schemaHash) {
+    public DropReplicaTask(long backendId, long tabletId, long replicaId, int 
schemaHash) {
         super(null, backendId, TTaskType.DROP, -1L, -1L, -1L, -1L, tabletId);
         this.schemaHash = schemaHash;
+        this.replicaId = replicaId;
     }
 
     public TDropTabletReq toThrift() {
@@ -33,10 +35,15 @@ public class DropReplicaTask extends AgentTask {
         if (this.schemaHash != -1) {
             request.setSchemaHash(schemaHash);
         }
+        request.setReplicaId(replicaId);
         return request;
     }
 
     public int getSchemaHash() {
         return schemaHash;
     }
+
+    public long getReplicaId() {
+        return replicaId;
+    }
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
index 31b84fa4af..3fe5d476b3 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
@@ -107,14 +107,14 @@ public class AgentTaskTest {
 
         // create
         createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId, 
partitionId,
-                                                  indexId1, tabletId1, 
shortKeyNum, schemaHash1,
+                                                  indexId1, tabletId1, 
replicaId1, shortKeyNum, schemaHash1,
                                                   version, KeysType.AGG_KEYS,
                                                   storageType, 
TStorageMedium.SSD,
                                                   columns, null, 0, latch, 
null,
                                                   false, 
TTabletType.TABLET_TYPE_DISK, TCompressionType.LZ4F);
 
         // drop
-        dropTask = new DropReplicaTask(backendId1, tabletId1, schemaHash1);
+        dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1, 
schemaHash1);
 
         // push
         pushTask =
@@ -124,7 +124,7 @@ public class AgentTaskTest {
 
         // clone
         cloneTask =
-                new CloneTask(backendId1, dbId, tableId, partitionId, 
indexId1, tabletId1, schemaHash1,
+                new CloneTask(backendId1, dbId, tableId, partitionId, 
indexId1, tabletId1, replicaId1, schemaHash1,
                         Arrays.asList(new TBackend("host1", 8290, 8390)), 
TStorageMedium.HDD, -1, 3600);
 
         // storageMediaMigrationTask
@@ -240,7 +240,7 @@ public class AgentTaskTest {
         Assert.assertEquals(1, AgentTaskQueue.getTaskNum(backendId1, 
TTaskType.DROP, true));
 
         dropTask.failed();
-        DropReplicaTask dropTask2 = new DropReplicaTask(backendId2, tabletId1, 
schemaHash1);
+        DropReplicaTask dropTask2 = new DropReplicaTask(backendId2, tabletId1, 
replicaId1, schemaHash1);
         AgentTaskQueue.addTask(dropTask2);
         dropTask2.failed();
         Assert.assertEquals(1, AgentTaskQueue.getTaskNum(backendId1, 
TTaskType.DROP, true));
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 3d5ee5e83a..8c1bd00aa3 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -262,6 +262,7 @@ message TabletMetaPB {
     repeated RowsetMetaPB stale_rs_metas = 18;
     optional StorageMediumPB storage_medium = 19 [default = HDD];
     optional string remote_storage_name = 20;
+    optional int64 replica_id = 21 [default = 0];
 }
 
 message OLAPIndexHeaderMessage {
diff --git a/gensrc/thrift/AgentService.thrift 
b/gensrc/thrift/AgentService.thrift
index 2233fae3fe..e695d7a737 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -118,11 +118,13 @@ struct TCreateTabletReq {
     14: optional TTabletType tablet_type
     15: optional TStorageParam storage_param
     16: optional TCompressionType compression_type = TCompressionType.LZ4F
+    17: optional Types.TReplicaId replica_id = 0
 }
 
 struct TDropTabletReq {
     1: required Types.TTabletId tablet_id
     2: optional Types.TSchemaHash schema_hash
+    3: optional Types.TReplicaId replica_id = 0
 }
 
 struct TAlterTabletReq {
@@ -204,6 +206,7 @@ struct TCloneReq {
     8: optional i64 src_path_hash;
     9: optional i64 dest_path_hash;
     10: optional i32 timeout_s;
+    11: optional Types.TReplicaId replica_id = 0
 }
 
 struct TCompactionReq {
diff --git a/gensrc/thrift/MasterService.thrift 
b/gensrc/thrift/MasterService.thrift
index ded383b1b6..e49d34aacf 100644
--- a/gensrc/thrift/MasterService.thrift
+++ b/gensrc/thrift/MasterService.thrift
@@ -38,6 +38,7 @@ struct TTabletInfo {
     12: optional bool used
     13: optional Types.TPartitionId partition_id
     14: optional bool is_in_memory
+    15: optional Types.TReplicaId replica_id
 }
 
 struct TFinishTaskRequest {
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 098307c98e..13af910597 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -25,6 +25,7 @@ typedef i32 TTupleId
 typedef i32 TSlotId
 typedef i64 TTableId
 typedef i64 TTabletId
+typedef i64 TReplicaId
 typedef i64 TVersion
 typedef i64 TVersionHash
 typedef i32 TSchemaHash


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to