This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new f4e2f78a1a [fix] Fix the bug that data balance causes tablet loss (#9971) f4e2f78a1a is described below commit f4e2f78a1a911b907ba8e73156a67dadb9193acc Author: plat1ko <36853835+platon...@users.noreply.github.com> AuthorDate: Wed Jun 15 09:52:56 2022 +0800 [fix] Fix the bug that data balance causes tablet loss (#9971) 1. Provide a FE conf to test the reliability in single replica case when tablet scheduling are frequent. 2. According to #6063, almost apply this fix on current code. --- be/src/agent/task_worker_pool.cpp | 3 +- be/src/olap/base_tablet.h | 9 +- be/src/olap/snapshot_manager.cpp | 3 +- be/src/olap/snapshot_manager.h | 2 +- be/src/olap/tablet.cpp | 1 + be/src/olap/tablet_manager.cpp | 97 +++++++++++----------- be/src/olap/tablet_manager.h | 6 +- be/src/olap/tablet_meta.cpp | 10 ++- be/src/olap/tablet_meta.h | 11 ++- be/src/olap/task/engine_clone_task.cpp | 10 ++- be/src/olap/task/engine_storage_migration_task.cpp | 3 +- be/src/runtime/snapshot_loader.cpp | 4 +- be/test/olap/cumulative_compaction_policy_test.cpp | 4 +- be/test/olap/delete_handler_test.cpp | 22 ++--- be/test/olap/delta_writer_test.cpp | 35 +++----- .../olap/engine_storage_migration_task_test.cpp | 11 +-- be/test/olap/tablet_meta_test.cpp | 2 +- be/test/olap/tablet_mgr_test.cpp | 8 +- be/test/olap/tablet_test.cpp | 2 +- be/test/olap/test_data/header_without_inc_rs.txt | 3 +- .../java/org/apache/doris/alter/RollupJobV2.java | 3 +- .../org/apache/doris/alter/SchemaChangeJobV2.java | 3 +- .../java/org/apache/doris/backup/RestoreJob.java | 2 +- .../java/org/apache/doris/catalog/Catalog.java | 3 +- .../org/apache/doris/catalog/PartitionInfo.java | 2 +- .../org/apache/doris/clone/BeLoadRebalancer.java | 3 +- .../apache/doris/clone/ClusterLoadStatistic.java | 17 +++- .../org/apache/doris/clone/TabletSchedCtx.java | 19 +++-- .../org/apache/doris/clone/TabletScheduler.java | 7 +- .../main/java/org/apache/doris/common/Config.java | 10 +++ .../doris/datasource/InternalDataSource.java | 3 +- .../org/apache/doris/master/ReportHandler.java | 7 +- .../main/java/org/apache/doris/task/CloneTask.java | 15 ++-- .../org/apache/doris/task/CreateReplicaTask.java | 8 +- .../org/apache/doris/task/DropReplicaTask.java | 9 +- .../java/org/apache/doris/task/AgentTaskTest.java | 8 +- gensrc/proto/olap_file.proto | 1 + gensrc/thrift/AgentService.thrift | 3 + gensrc/thrift/MasterService.thrift | 1 + gensrc/thrift/Types.thrift | 1 + 40 files changed, 216 insertions(+), 155 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 0a30aad20a..c29a65cdc6 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -383,6 +383,7 @@ void TaskWorkerPool::_create_tablet_worker_thread_callback() { tablet_info.row_count = 0; tablet_info.data_size = 0; tablet_info.__set_path_hash(tablet->data_dir()->path_hash()); + tablet_info.__set_replica_id(tablet->replica_id()); finish_tablet_infos.push_back(tablet_info); } @@ -428,7 +429,7 @@ void TaskWorkerPool::_drop_tablet_worker_thread_callback() { drop_tablet_req.tablet_id, false, &err); if (dropped_tablet != nullptr) { Status drop_status = StorageEngine::instance()->tablet_manager()->drop_tablet( - drop_tablet_req.tablet_id); + drop_tablet_req.tablet_id, drop_tablet_req.replica_id); if (!drop_status.ok()) { LOG(WARNING) << "drop table failed! signature: " << agent_task_req.signature; error_msgs.push_back("drop table failed!"); diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index 7c97247958..4bf83324af 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -54,8 +54,9 @@ public: const std::string full_name() const; int64_t partition_id() const; int64_t tablet_id() const; + int64_t replica_id() const; int32_t schema_hash() const; - int16_t shard_id(); + int16_t shard_id() const; bool equal(int64_t tablet_id, int32_t schema_hash); // properties encapsulated in TabletSchema @@ -123,11 +124,15 @@ inline int64_t BaseTablet::tablet_id() const { return _tablet_meta->tablet_id(); } +inline int64_t BaseTablet::replica_id() const { + return _tablet_meta->replica_id(); +} + inline int32_t BaseTablet::schema_hash() const { return _tablet_meta->schema_hash(); } -inline int16_t BaseTablet::shard_id() { +inline int16_t BaseTablet::shard_id() const { return _tablet_meta->shard_id(); } diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp index b2d8a33d75..87ccef34ad 100644 --- a/be/src/olap/snapshot_manager.cpp +++ b/be/src/olap/snapshot_manager.cpp @@ -119,7 +119,7 @@ Status SnapshotManager::release_snapshot(const string& snapshot_path) { // For now, alpha and beta rowset meta have same fields, so we can just use // AlphaRowsetMeta here. Status SnapshotManager::convert_rowset_ids(const FilePathDesc& clone_dir_desc, int64_t tablet_id, - const int32_t& schema_hash) { + int64_t replica_id, const int32_t& schema_hash) { SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); Status res = Status::OK(); // check clone dir existed @@ -151,6 +151,7 @@ Status SnapshotManager::convert_rowset_ids(const FilePathDesc& clone_dir_desc, i // should modify tablet id and schema hash because in restore process the tablet id is not // equal to tablet id in meta new_tablet_meta_pb.set_tablet_id(tablet_id); + new_tablet_meta_pb.set_replica_id(replica_id); new_tablet_meta_pb.set_schema_hash(schema_hash); TabletSchema tablet_schema; tablet_schema.init_from_pb(new_tablet_meta_pb.schema()); diff --git a/be/src/olap/snapshot_manager.h b/be/src/olap/snapshot_manager.h index 75c00180f5..aab134297f 100644 --- a/be/src/olap/snapshot_manager.h +++ b/be/src/olap/snapshot_manager.h @@ -61,7 +61,7 @@ public: static SnapshotManager* instance(); Status convert_rowset_ids(const FilePathDesc& clone_dir_desc, int64_t tablet_id, - const int32_t& schema_hash); + int64_t replica_id, const int32_t& schema_hash); private: SnapshotManager() : _snapshot_base_id(0) { diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 275511786f..405e6acdc5 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1350,6 +1350,7 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info, tablet_info->__set_version_count(_tablet_meta->version_count()); tablet_info->__set_path_hash(_data_dir->path_hash()); tablet_info->__set_is_in_memory(_tablet_meta->tablet_schema().is_in_memory()); + tablet_info->__set_replica_id(replica_id()); } // should use this method to get a copy of current tablet meta diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 0675754d29..e002b07cde 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -17,6 +17,7 @@ #include "olap/tablet_manager.h" +#include <gen_cpp/Types_types.h> #include <rapidjson/document.h> #include <re2/re2.h> #include <thrift/protocol/TDebugProtocol.h> @@ -184,7 +185,8 @@ Status TabletManager::_add_tablet_to_map_unlocked(TTabletId tablet_id, if (drop_old) { // If the new tablet is fresher than the existing one, then replace // the existing tablet with the new one. - RETURN_NOT_OK_LOG(_drop_tablet_unlocked(tablet_id, keep_files), + // Use default replica_id to ignore whether replica_id is match when drop tablet. + RETURN_NOT_OK_LOG(_drop_tablet_unlocked(tablet_id, /* replica_id */ 0, keep_files), strings::Substitute("failed to drop old tablet when add new tablet. " "tablet_id=$0", tablet_id)); @@ -368,7 +370,7 @@ TabletSharedPtr TabletManager::_internal_create_tablet_unlocked( } // something is wrong, we need clear environment if (is_tablet_added) { - Status status = _drop_tablet_unlocked(new_tablet_id, false); + Status status = _drop_tablet_unlocked(new_tablet_id, request.replica_id, false); if (!status.ok()) { LOG(WARNING) << "fail to drop tablet when create tablet failed. res=" << res; } @@ -446,22 +448,21 @@ TabletSharedPtr TabletManager::_create_tablet_meta_and_dir_unlocked( return nullptr; } -Status TabletManager::drop_tablet(TTabletId tablet_id, bool keep_files) { - std::lock_guard<std::shared_mutex> wrlock(_get_tablets_shard_lock(tablet_id)); +Status TabletManager::drop_tablet(TTabletId tablet_id, TReplicaId replica_id, bool keep_files) { + auto& shard = _get_tablets_shard(tablet_id); + std::lock_guard wrlock(shard.lock); + if (shard.tablets_under_clone.count(tablet_id) > 0) { + LOG(INFO) << "tablet " << tablet_id << " is under clone, skip drop task"; + return Status::OK(); + } SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); - return _drop_tablet_unlocked(tablet_id, keep_files); + return _drop_tablet_unlocked(tablet_id, replica_id, keep_files); } -// Drop specified tablet, the main logical is as follows: -// 1. tablet not in schema change: -// drop specified tablet directly; -// 2. tablet in schema change: -// a. schema change not finished && the dropping tablet is a base-tablet: -// base-tablet cannot be dropped; -// b. other cases: -// drop specified tablet directly and clear schema change info. -Status TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, bool keep_files) { - LOG(INFO) << "begin drop tablet. tablet_id=" << tablet_id; +// Drop specified tablet. +Status TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, TReplicaId replica_id, + bool keep_files) { + LOG(INFO) << "begin drop tablet. tablet_id=" << tablet_id << ", replica_id=" << replica_id; DorisMetrics::instance()->drop_tablet_requests_total->increment(1); // Fetch tablet which need to be dropped @@ -471,8 +472,39 @@ Status TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, bool keep_files << "tablet_id=" << tablet_id; return Status::OK(); } + // We should compare replica id to avoid dropping new cloned tablet. + // Iff request replica id is 0, FE may be an older release, then we drop this tablet as before. + if (to_drop_tablet->replica_id() != replica_id && replica_id != 0) { + LOG(WARNING) << "fail to drop tablet because replica id not match. " + << "tablet_id=" << tablet_id << ", replica_id=" << to_drop_tablet->replica_id() + << ", request replica_id=" << replica_id; + return Status::OK(); + } - return _drop_tablet_directly_unlocked(tablet_id, keep_files); + _remove_tablet_from_partition(to_drop_tablet); + tablet_map_t& tablet_map = _get_tablet_map(tablet_id); + tablet_map.erase(tablet_id); + if (!keep_files) { + // drop tablet will update tablet meta, should lock + std::lock_guard<std::shared_mutex> wrlock(to_drop_tablet->get_header_lock()); + LOG(INFO) << "set tablet to shutdown state and remove it from memory. " + << "tablet_id=" << tablet_id + << ", tablet_path=" << to_drop_tablet->tablet_path_desc().filepath; + // NOTE: has to update tablet here, but must not update tablet meta directly. + // because other thread may hold the tablet object, they may save meta too. + // If update meta directly here, other thread may override the meta + // and the tablet will be loaded at restart time. + // To avoid this exception, we first set the state of the tablet to `SHUTDOWN`. + to_drop_tablet->set_tablet_state(TABLET_SHUTDOWN); + to_drop_tablet->save_meta(); + { + std::lock_guard<std::shared_mutex> wrdlock(_shutdown_tablets_lock); + _shutdown_tablets.push_back(to_drop_tablet); + } + } + + to_drop_tablet->deregister_tablet_from_dir(); + return Status::OK(); } Status TabletManager::drop_tablets_on_error_root_path( @@ -1220,39 +1252,6 @@ Status TabletManager::_create_tablet_meta_unlocked(const TCreateTabletReq& reque return res; } -Status TabletManager::_drop_tablet_directly_unlocked(TTabletId tablet_id, bool keep_files) { - TabletSharedPtr dropped_tablet = _get_tablet_unlocked(tablet_id); - if (dropped_tablet == nullptr) { - LOG(WARNING) << "fail to drop tablet because it does not exist. " - << " tablet_id=" << tablet_id; - return Status::OLAPInternalError(OLAP_ERR_TABLE_NOT_FOUND); - } - _remove_tablet_from_partition(dropped_tablet); - tablet_map_t& tablet_map = _get_tablet_map(tablet_id); - tablet_map.erase(tablet_id); - if (!keep_files) { - // drop tablet will update tablet meta, should lock - std::lock_guard<std::shared_mutex> wrlock(dropped_tablet->get_header_lock()); - LOG(INFO) << "set tablet to shutdown state and remove it from memory. " - << "tablet_id=" << tablet_id - << ", tablet_path=" << dropped_tablet->tablet_path_desc().filepath; - // NOTE: has to update tablet here, but must not update tablet meta directly. - // because other thread may hold the tablet object, they may save meta too. - // If update meta directly here, other thread may override the meta - // and the tablet will be loaded at restart time. - // To avoid this exception, we first set the state of the tablet to `SHUTDOWN`. - dropped_tablet->set_tablet_state(TABLET_SHUTDOWN); - dropped_tablet->save_meta(); - { - std::lock_guard<std::shared_mutex> wrdlock(_shutdown_tablets_lock); - _shutdown_tablets.push_back(dropped_tablet); - } - } - - dropped_tablet->deregister_tablet_from_dir(); - return Status::OK(); -} - TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id) { VLOG_NOTICE << "begin to get tablet. tablet_id=" << tablet_id; tablet_map_t& tablet_map = _get_tablet_map(tablet_id); diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index 1627ab741e..1e2af26d99 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -66,7 +66,7 @@ public: // Return OLAP_SUCCESS, if run ok // OLAP_ERR_TABLE_DELETE_NOEXIST_ERROR, if tablet not exist // Status::OLAPInternalError(OLAP_ERR_NOT_INITED), if not inited - Status drop_tablet(TTabletId tablet_id, bool keep_files = false); + Status drop_tablet(TTabletId tablet_id, TReplicaId replica_id, bool keep_files = false); Status drop_tablets_on_error_root_path(const std::vector<TabletInfo>& tablet_info_vec); @@ -157,9 +157,7 @@ private: bool _check_tablet_id_exist_unlocked(TTabletId tablet_id); - Status _drop_tablet_directly_unlocked(TTabletId tablet_id, bool keep_files = false); - - Status _drop_tablet_unlocked(TTabletId tablet_id, bool keep_files); + Status _drop_tablet_unlocked(TTabletId tablet_id, TReplicaId replica_id, bool keep_files); TabletSharedPtr _get_tablet_unlocked(TTabletId tablet_id); TabletSharedPtr _get_tablet_unlocked(TTabletId tablet_id, bool include_deleted, diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 299d3051a2..6dfaa2c54d 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -39,7 +39,7 @@ Status TabletMeta::create(const TCreateTabletReq& request, const TabletUid& tabl const unordered_map<uint32_t, uint32_t>& col_ordinal_to_unique_id, TabletMetaSharedPtr* tablet_meta) { tablet_meta->reset(new TabletMeta( - request.table_id, request.partition_id, request.tablet_id, + request.table_id, request.partition_id, request.tablet_id, request.replica_id, request.tablet_schema.schema_hash, shard_id, request.tablet_schema, next_unique_id, col_ordinal_to_unique_id, tablet_uid, request.__isset.tablet_type ? request.tablet_type : TTabletType::TABLET_TYPE_DISK, @@ -50,8 +50,8 @@ Status TabletMeta::create(const TCreateTabletReq& request, const TabletUid& tabl TabletMeta::TabletMeta() : _tablet_uid(0, 0), _schema(new TabletSchema) {} TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id, - int32_t schema_hash, uint64_t shard_id, const TTabletSchema& tablet_schema, - uint32_t next_unique_id, + int64_t replica_id, int32_t schema_hash, uint64_t shard_id, + const TTabletSchema& tablet_schema, uint32_t next_unique_id, const std::unordered_map<uint32_t, uint32_t>& col_ordinal_to_unique_id, TabletUid tablet_uid, TTabletType::type tabletType, TStorageMedium::type t_storage_medium, const std::string& storage_name, @@ -61,6 +61,7 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id tablet_meta_pb.set_table_id(table_id); tablet_meta_pb.set_partition_id(partition_id); tablet_meta_pb.set_tablet_id(tablet_id); + tablet_meta_pb.set_replica_id(replica_id); tablet_meta_pb.set_schema_hash(schema_hash); tablet_meta_pb.set_shard_id(shard_id); // Persist the creation time, but it is not used @@ -375,6 +376,7 @@ void TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) { _table_id = tablet_meta_pb.table_id(); _partition_id = tablet_meta_pb.partition_id(); _tablet_id = tablet_meta_pb.tablet_id(); + _replica_id = tablet_meta_pb.replica_id(); _schema_hash = tablet_meta_pb.schema_hash(); _shard_id = tablet_meta_pb.shard_id(); _creation_time = tablet_meta_pb.creation_time(); @@ -443,6 +445,7 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) { tablet_meta_pb->set_table_id(table_id()); tablet_meta_pb->set_partition_id(partition_id()); tablet_meta_pb->set_tablet_id(tablet_id()); + tablet_meta_pb->set_replica_id(replica_id()); tablet_meta_pb->set_schema_hash(schema_hash()); tablet_meta_pb->set_shard_id(shard_id()); tablet_meta_pb->set_creation_time(creation_time()); @@ -683,6 +686,7 @@ bool operator==(const TabletMeta& a, const TabletMeta& b) { if (a._table_id != b._table_id) return false; if (a._partition_id != b._partition_id) return false; if (a._tablet_id != b._tablet_id) return false; + if (a._replica_id != b._replica_id) return false; if (a._schema_hash != b._schema_hash) return false; if (a._shard_id != b._shard_id) return false; if (a._creation_time != b._creation_time) return false; diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index c4166213d0..8624d04333 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -79,8 +79,9 @@ public: TabletMeta(); // Only remote_storage_name is needed in meta, it is a key used to get remote params from fe. // The config of storage is saved in fe. - TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id, int32_t schema_hash, - uint64_t shard_id, const TTabletSchema& tablet_schema, uint32_t next_unique_id, + TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id, int64_t replica_id, + int32_t schema_hash, uint64_t shard_id, const TTabletSchema& tablet_schema, + uint32_t next_unique_id, const std::unordered_map<uint32_t, uint32_t>& col_ordinal_to_unique_id, TabletUid tablet_uid, TTabletType::type tabletType, TStorageMedium::type t_storage_medium, const std::string& remote_storage_name, @@ -112,6 +113,7 @@ public: int64_t table_id() const; int64_t partition_id() const; int64_t tablet_id() const; + int64_t replica_id() const; int32_t schema_hash() const; int16_t shard_id() const; void set_shard_id(int32_t shard_id); @@ -188,6 +190,7 @@ private: int64_t _table_id = 0; int64_t _partition_id = 0; int64_t _tablet_id = 0; + int64_t _replica_id = 0; int32_t _schema_hash = 0; int32_t _shard_id = 0; int64_t _creation_time = 0; @@ -232,6 +235,10 @@ inline int64_t TabletMeta::tablet_id() const { return _tablet_id; } +inline int64_t TabletMeta::replica_id() const { + return _replica_id; +} + inline int32_t TabletMeta::schema_hash() const { return _schema_hash; } diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index 04a7df7199..84dc1e4068 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -218,12 +218,14 @@ void EngineCloneTask::_set_tablet_info(Status status, bool is_new_tablet) { if (status.ok()) { TTabletInfo tablet_info; tablet_info.__set_tablet_id(_clone_req.tablet_id); + tablet_info.__set_replica_id(_clone_req.replica_id); tablet_info.__set_schema_hash(_clone_req.schema_hash); Status get_tablet_info_status = StorageEngine::instance()->tablet_manager()->report_tablet_info(&tablet_info); if (get_tablet_info_status != Status::OK()) { LOG(WARNING) << "clone success, but get tablet info failed." << " tablet id: " << _clone_req.tablet_id + << ", replica_id:" << _clone_req.replica_id << " schema hash: " << _clone_req.schema_hash << " signature: " << _signature; _error_msgs->push_back("clone success, but get tablet info failed."); @@ -231,6 +233,7 @@ void EngineCloneTask::_set_tablet_info(Status status, bool is_new_tablet) { } else if (_clone_req.__isset.committed_version && tablet_info.version < _clone_req.committed_version) { LOG(WARNING) << "failed to clone tablet. tablet_id:" << _clone_req.tablet_id + << ", replica_id:" << _clone_req.replica_id << ", schema_hash:" << _clone_req.schema_hash << ", signature:" << _signature << ", version:" << tablet_info.version << ", expected_version: " << _clone_req.committed_version; @@ -241,11 +244,12 @@ void EngineCloneTask::_set_tablet_info(Status status, bool is_new_tablet) { // if not, maybe this is a stale remaining table which is waiting for drop. // we drop it. LOG(WARNING) << "begin to drop the stale tablet. tablet_id:" << _clone_req.tablet_id + << ", replica_id:" << _clone_req.replica_id << ", schema_hash:" << _clone_req.schema_hash << ", signature:" << _signature << ", version:" << tablet_info.version << ", expected_version: " << _clone_req.committed_version; Status drop_status = StorageEngine::instance()->tablet_manager()->drop_tablet( - _clone_req.tablet_id, _clone_req.schema_hash); + _clone_req.tablet_id, _clone_req.replica_id); if (drop_status != Status::OK() && drop_status.precise_code() != OLAP_ERR_TABLE_NOT_FOUND) { // just log @@ -332,10 +336,12 @@ Status EngineCloneTask::_make_and_download_snapshots(DataDir& data_dir, if (status.ok()) { // change all rowset ids because they maybe its id same with local rowset auto olap_st = SnapshotManager::instance()->convert_rowset_ids( - local_path, _clone_req.tablet_id, _clone_req.schema_hash); + local_path, _clone_req.tablet_id, _clone_req.replica_id, + _clone_req.schema_hash); if (olap_st != Status::OK()) { LOG(WARNING) << "fail to convert rowset ids, path=" << local_path << ", tablet_id=" << _clone_req.tablet_id + << ", replica_id=" << _clone_req.replica_id << ", schema_hash=" << _clone_req.schema_hash << ", error=" << olap_st; status = Status::InternalError("Failed to convert rowset ids"); } diff --git a/be/src/olap/task/engine_storage_migration_task.cpp b/be/src/olap/task/engine_storage_migration_task.cpp index 68c629ebb2..ff9acb8ddd 100644 --- a/be/src/olap/task/engine_storage_migration_task.cpp +++ b/be/src/olap/task/engine_storage_migration_task.cpp @@ -141,7 +141,8 @@ Status EngineStorageMigrationTask::_gen_and_write_header_to_hdr_file( } // it will change rowset id and its create time // rowset create time is useful when load tablet from meta to check which tablet is the tablet to load - res = SnapshotManager::instance()->convert_rowset_ids(full_path, tablet_id, schema_hash); + res = SnapshotManager::instance()->convert_rowset_ids(full_path, tablet_id, + _tablet->replica_id(), schema_hash); if (!res.ok()) { LOG(WARNING) << "failed to convert rowset id when do storage migration" << " path = " << full_path; diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index e8448a9c88..f86cdefd9b 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -413,8 +413,8 @@ Status SnapshotLoader::move(const std::string& snapshot_path, TabletSharedPtr ta } // rename the rowset ids and tabletid info in rowset meta - Status convert_status = - SnapshotManager::instance()->convert_rowset_ids(snapshot_path, tablet_id, schema_hash); + Status convert_status = SnapshotManager::instance()->convert_rowset_ids( + snapshot_path, tablet_id, tablet->replica_id(), schema_hash); if (convert_status != Status::OK()) { std::stringstream ss; ss << "failed to convert rowsetids in snapshot: " << snapshot_path diff --git a/be/test/olap/cumulative_compaction_policy_test.cpp b/be/test/olap/cumulative_compaction_policy_test.cpp index e30df7e343..76a5acaf36 100644 --- a/be/test/olap/cumulative_compaction_policy_test.cpp +++ b/be/test/olap/cumulative_compaction_policy_test.cpp @@ -32,7 +32,7 @@ public: TestNumBasedCumulativeCompactionPolicy() {} void SetUp() { _tablet_meta = static_cast<TabletMetaSharedPtr>(new TabletMeta( - 1, 2, 15673, 4, 5, TTabletSchema(), 6, {{7, 8}}, UniqueId(9, 10), + 1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, TStorageMedium::HDD, "", TCompressionType::LZ4F)); _json_rowset_meta = R"({ @@ -336,7 +336,7 @@ public: config::cumulative_size_based_compaction_lower_size_mbytes = 64; _tablet_meta = static_cast<TabletMetaSharedPtr>(new TabletMeta( - 1, 2, 15673, 4, 5, TTabletSchema(), 6, {{7, 8}}, UniqueId(9, 10), + 1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, TStorageMedium::HDD, "", TCompressionType::LZ4F)); _json_rowset_meta = R"({ diff --git a/be/test/olap/delete_handler_test.cpp b/be/test/olap/delete_handler_test.cpp index d34fe7253d..57a0ebb8f7 100644 --- a/be/test/olap/delete_handler_test.cpp +++ b/be/test/olap/delete_handler_test.cpp @@ -261,16 +261,14 @@ protected: set_default_create_tablet_request(&_create_tablet); res = k_engine->create_tablet(_create_tablet); EXPECT_EQ(Status::OK(), res); - tablet = k_engine->tablet_manager()->get_tablet(_create_tablet.tablet_id, - _create_tablet.tablet_schema.schema_hash); + tablet = k_engine->tablet_manager()->get_tablet(_create_tablet.tablet_id); EXPECT_NE(tablet.get(), nullptr); _tablet_path = tablet->tablet_path_desc().filepath; set_create_duplicate_tablet_request(&_create_dup_tablet); res = k_engine->create_tablet(_create_dup_tablet); EXPECT_EQ(Status::OK(), res); - dup_tablet = k_engine->tablet_manager()->get_tablet( - _create_dup_tablet.tablet_id, _create_dup_tablet.tablet_schema.schema_hash); + dup_tablet = k_engine->tablet_manager()->get_tablet(_create_dup_tablet.tablet_id); EXPECT_TRUE(dup_tablet.get() != NULL); _dup_tablet_path = tablet->tablet_path_desc().filepath; } @@ -279,8 +277,8 @@ protected: // Remove all dir. tablet.reset(); dup_tablet.reset(); - StorageEngine::instance()->tablet_manager()->drop_tablet( - _create_tablet.tablet_id, _create_tablet.tablet_schema.schema_hash); + StorageEngine::instance()->tablet_manager()->drop_tablet(_create_tablet.tablet_id, + _create_tablet.replica_id); EXPECT_TRUE(FileUtils::remove_all(config::storage_root_path).ok()); } @@ -438,8 +436,7 @@ protected: set_default_create_tablet_request(&_create_tablet); res = k_engine->create_tablet(_create_tablet); EXPECT_EQ(Status::OK(), res); - tablet = k_engine->tablet_manager()->get_tablet(_create_tablet.tablet_id, - _create_tablet.tablet_schema.schema_hash); + tablet = k_engine->tablet_manager()->get_tablet(_create_tablet.tablet_id); EXPECT_TRUE(tablet.get() != nullptr); _tablet_path = tablet->tablet_path_desc().filepath; } @@ -448,7 +445,7 @@ protected: // Remove all dir. tablet.reset(); k_engine->tablet_manager()->drop_tablet(_create_tablet.tablet_id, - _create_tablet.tablet_schema.schema_hash); + _create_tablet.replica_id); EXPECT_TRUE(FileUtils::remove_all(config::storage_root_path).ok()); } @@ -813,8 +810,7 @@ protected: set_default_create_tablet_request(&_create_tablet); res = k_engine->create_tablet(_create_tablet); EXPECT_EQ(Status::OK(), res); - tablet = k_engine->tablet_manager()->get_tablet(_create_tablet.tablet_id, - _create_tablet.tablet_schema.schema_hash); + tablet = k_engine->tablet_manager()->get_tablet(_create_tablet.tablet_id); EXPECT_TRUE(tablet != nullptr); _tablet_path = tablet->tablet_path_desc().filepath; @@ -826,8 +822,8 @@ protected: // Remove all dir. tablet.reset(); _delete_handler.finalize(); - StorageEngine::instance()->tablet_manager()->drop_tablet( - _create_tablet.tablet_id, _create_tablet.tablet_schema.schema_hash); + StorageEngine::instance()->tablet_manager()->drop_tablet(_create_tablet.tablet_id, + _create_tablet.replica_id); EXPECT_TRUE(FileUtils::remove_all(config::storage_root_path).ok()); } diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp index db9429893a..632086f891 100644 --- a/be/test/olap/delta_writer_test.cpp +++ b/be/test/olap/delta_writer_test.cpp @@ -381,10 +381,7 @@ TEST_F(TestDeltaWriter, open) { EXPECT_EQ(Status::OK(), res); SAFE_DELETE(delta_writer); - TDropTabletReq drop_request; - auto tablet_id = 10003; - auto schema_hash = 270068375; - res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash); + res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id); EXPECT_EQ(Status::OK(), res); } @@ -480,8 +477,7 @@ TEST_F(TestDeltaWriter, write) { EXPECT_EQ(Status::OK(), res); // publish version success - TabletSharedPtr tablet = - k_engine->tablet_manager()->get_tablet(write_req.tablet_id, write_req.schema_hash); + TabletSharedPtr tablet = k_engine->tablet_manager()->get_tablet(write_req.tablet_id); OlapMeta* meta = tablet->data_dir()->get_meta(); Version version; version.first = tablet->rowset_with_max_version()->end_version() + 1; @@ -500,9 +496,7 @@ TEST_F(TestDeltaWriter, write) { } EXPECT_EQ(1, tablet->num_rows()); - auto tablet_id = 10003; - auto schema_hash = 270068375; - res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash); + res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id); EXPECT_EQ(Status::OK(), res); delete delta_writer; } @@ -614,8 +608,7 @@ TEST_F(TestDeltaWriter, vec_write) { ASSERT_TRUE(res.ok()); // publish version success - TabletSharedPtr tablet = - k_engine->tablet_manager()->get_tablet(write_req.tablet_id, write_req.schema_hash); + TabletSharedPtr tablet = k_engine->tablet_manager()->get_tablet(write_req.tablet_id); std::cout << "before publish, tablet row nums:" << tablet->num_rows() << std::endl; OlapMeta* meta = tablet->data_dir()->get_meta(); Version version; @@ -639,11 +632,9 @@ TEST_F(TestDeltaWriter, vec_write) { res = tablet->add_inc_rowset(rowset); ASSERT_TRUE(res.ok()); } - ASSERT_EQ(2, tablet->num_rows()); + ASSERT_EQ(1, tablet->num_rows()); - auto tablet_id = 10003; - auto schema_hash = 270068375; - res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash); + res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id); ASSERT_TRUE(res.ok()); delete delta_writer; } @@ -692,8 +683,7 @@ TEST_F(TestDeltaWriter, sequence_col) { EXPECT_EQ(Status::OK(), res); // publish version success - TabletSharedPtr tablet = - k_engine->tablet_manager()->get_tablet(write_req.tablet_id, write_req.schema_hash); + TabletSharedPtr tablet = k_engine->tablet_manager()->get_tablet(write_req.tablet_id); OlapMeta* meta = tablet->data_dir()->get_meta(); Version version; version.first = tablet->rowset_with_max_version()->end_version() + 1; @@ -712,9 +702,7 @@ TEST_F(TestDeltaWriter, sequence_col) { } EXPECT_EQ(1, tablet->num_rows()); - auto tablet_id = 10005; - auto schema_hash = 270068377; - res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash); + res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id); EXPECT_EQ(Status::OK(), res); delete delta_writer; } @@ -777,8 +765,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) { ASSERT_TRUE(res.ok()); // publish version success - TabletSharedPtr tablet = - k_engine->tablet_manager()->get_tablet(write_req.tablet_id, write_req.schema_hash); + TabletSharedPtr tablet = k_engine->tablet_manager()->get_tablet(write_req.tablet_id); std::cout << "before publish, tablet row nums:" << tablet->num_rows() << std::endl; OlapMeta* meta = tablet->data_dir()->get_meta(); Version version; @@ -804,9 +791,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) { } ASSERT_EQ(1, tablet->num_rows()); - auto tablet_id = 10005; - auto schema_hash = 270068377; - res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash); + res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id); ASSERT_TRUE(res.ok()); delete delta_writer; } diff --git a/be/test/olap/engine_storage_migration_task_test.cpp b/be/test/olap/engine_storage_migration_task_test.cpp index 608d4b6021..36b95a8aa3 100644 --- a/be/test/olap/engine_storage_migration_task_test.cpp +++ b/be/test/olap/engine_storage_migration_task_test.cpp @@ -198,8 +198,7 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) { EXPECT_EQ(Status::OK(), res); // publish version success - TabletSharedPtr tablet = - k_engine->tablet_manager()->get_tablet(write_req.tablet_id, write_req.schema_hash); + TabletSharedPtr tablet = k_engine->tablet_manager()->get_tablet(write_req.tablet_id); OlapMeta* meta = tablet->data_dir()->get_meta(); Version version; version.first = tablet->rowset_with_max_version()->end_version() + 1; @@ -234,9 +233,7 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) { res = engine_task.execute(); EXPECT_EQ(Status::OK(), res); // reget the tablet from manager after migration - auto tablet_id = 10005; - auto schema_hash = 270068377; - TabletSharedPtr tablet2 = k_engine->tablet_manager()->get_tablet(tablet_id, schema_hash); + TabletSharedPtr tablet2 = k_engine->tablet_manager()->get_tablet(request.tablet_id); // check path EXPECT_EQ(tablet2->data_dir()->path(), dest_store->path()); // check rows @@ -254,7 +251,7 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) { EngineStorageMigrationTask engine_task2(tablet2, dest_store); res = engine_task2.execute(); EXPECT_EQ(Status::OK(), res); - TabletSharedPtr tablet3 = k_engine->tablet_manager()->get_tablet(tablet_id, schema_hash); + TabletSharedPtr tablet3 = k_engine->tablet_manager()->get_tablet(request.tablet_id); // check path EXPECT_EQ(tablet3->data_dir()->path(), tablet->data_dir()->path()); // check rows @@ -264,7 +261,7 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) { EXPECT_NE(tablet3, tablet); // test case 2 end - res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash); + res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id); EXPECT_EQ(Status::OK(), res); delete delta_writer; } diff --git a/be/test/olap/tablet_meta_test.cpp b/be/test/olap/tablet_meta_test.cpp index f728616922..67f9f95fa4 100644 --- a/be/test/olap/tablet_meta_test.cpp +++ b/be/test/olap/tablet_meta_test.cpp @@ -26,7 +26,7 @@ namespace doris { TEST(TabletMetaTest, SaveAndParse) { std::string meta_path = "./be/test/olap/test_data/tablet_meta_test.hdr"; - TabletMeta old_tablet_meta(1, 2, 3, 4, 5, TTabletSchema(), 6, {{7, 8}}, UniqueId(9, 10), + TabletMeta old_tablet_meta(1, 2, 3, 3, 4, 5, TTabletSchema(), 6, {{7, 8}}, UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, TStorageMedium::HDD, "", TCompressionType::LZ4F); EXPECT_EQ(Status::OK(), old_tablet_meta.save(meta_path)); diff --git a/be/test/olap/tablet_mgr_test.cpp b/be/test/olap/tablet_mgr_test.cpp index 840e5968ce..6ac994e7cd 100644 --- a/be/test/olap/tablet_mgr_test.cpp +++ b/be/test/olap/tablet_mgr_test.cpp @@ -119,7 +119,7 @@ TEST_F(TabletMgrTest, CreateTablet) { create_st = _tablet_mgr->create_tablet(create_tablet_req, data_dirs); EXPECT_TRUE(create_st == Status::OK()); - Status drop_st = _tablet_mgr->drop_tablet(111, false); + Status drop_st = _tablet_mgr->drop_tablet(111, create_tablet_req.replica_id); EXPECT_TRUE(drop_st == Status::OK()); tablet.reset(); Status trash_st = _tablet_mgr->start_trash_sweep(); @@ -174,7 +174,7 @@ TEST_F(TabletMgrTest, CreateTabletWithSequence) { Status check_meta_st = TabletMetaManager::get_meta(_data_dir, 111, 3333, new_tablet_meta); EXPECT_TRUE(check_meta_st == Status::OK()); - Status drop_st = _tablet_mgr->drop_tablet(111, false); + Status drop_st = _tablet_mgr->drop_tablet(111, create_tablet_req.replica_id); EXPECT_TRUE(drop_st == Status::OK()); tablet.reset(); Status trash_st = _tablet_mgr->start_trash_sweep(); @@ -208,13 +208,13 @@ TEST_F(TabletMgrTest, DropTablet) { EXPECT_TRUE(tablet != nullptr); // drop unexist tablet will be success - Status drop_st = _tablet_mgr->drop_tablet(1121, false); + Status drop_st = _tablet_mgr->drop_tablet(1121, create_tablet_req.replica_id); EXPECT_TRUE(drop_st == Status::OK()); tablet = _tablet_mgr->get_tablet(111); EXPECT_TRUE(tablet != nullptr); // drop exist tablet will be success - drop_st = _tablet_mgr->drop_tablet(111, false); + drop_st = _tablet_mgr->drop_tablet(111, create_tablet_req.replica_id); EXPECT_TRUE(drop_st == Status::OK()); tablet = _tablet_mgr->get_tablet(111); EXPECT_TRUE(tablet == nullptr); diff --git a/be/test/olap/tablet_test.cpp b/be/test/olap/tablet_test.cpp index d5883ad708..9d6d36c42e 100644 --- a/be/test/olap/tablet_test.cpp +++ b/be/test/olap/tablet_test.cpp @@ -36,7 +36,7 @@ public: virtual void SetUp() { _tablet_meta = static_cast<TabletMetaSharedPtr>(new TabletMeta( - 1, 2, 15673, 4, 5, TTabletSchema(), 6, {{7, 8}}, UniqueId(9, 10), + 1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, TStorageMedium::HDD, "", TCompressionType::LZ4F)); _json_rowset_meta = R"({ "rowset_id": 540081, diff --git a/be/test/olap/test_data/header_without_inc_rs.txt b/be/test/olap/test_data/header_without_inc_rs.txt index 03cef06a3d..12c1f81a0e 100644 --- a/be/test/olap/test_data/header_without_inc_rs.txt +++ b/be/test/olap/test_data/header_without_inc_rs.txt @@ -144,5 +144,6 @@ "preferred_rowset_type": "BETA_ROWSET", "tablet_type": "TABLET_TYPE_DISK", "storage_medium": "HDD", - "remote_storage_name": "" + "remote_storage_name": "", + "replica_id": 0 } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index 45acf8345c..9a66f10772 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -219,13 +219,14 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { List<Replica> rollupReplicas = rollupTablet.getReplicas(); for (Replica rollupReplica : rollupReplicas) { long backendId = rollupReplica.getBackendId(); + long rollupReplicaId = rollupReplica.getId(); Preconditions.checkNotNull(tabletIdMap.get(rollupTabletId)); // baseTabletId countDownLatch.addMark(backendId, rollupTabletId); // create replica with version 1. // version will be updated by following load process, or when rollup task finished. CreateReplicaTask createReplicaTask = new CreateReplicaTask( backendId, dbId, tableId, partitionId, rollupIndexId, rollupTabletId, - rollupShortKeyColumnCount, rollupSchemaHash, + rollupReplicaId, rollupShortKeyColumnCount, rollupSchemaHash, Partition.PARTITION_INIT_VERSION, rollupKeysType, TStorageType.COLUMN, storageMedium, rollupSchema, tbl.getCopiedBfColumns(), tbl.getBfFpp(), countDownLatch, diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index d73f22617d..411ab275a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -245,10 +245,11 @@ public class SchemaChangeJobV2 extends AlterJobV2 { List<Replica> shadowReplicas = shadowTablet.getReplicas(); for (Replica shadowReplica : shadowReplicas) { long backendId = shadowReplica.getBackendId(); + long shadowReplicaId = shadowReplica.getId(); countDownLatch.addMark(backendId, shadowTabletId); CreateReplicaTask createReplicaTask = new CreateReplicaTask( backendId, dbId, tableId, partitionId, shadowIdxId, shadowTabletId, - shadowShortKeyColumnCount, shadowSchemaHash, + shadowReplicaId, shadowShortKeyColumnCount, shadowSchemaHash, Partition.PARTITION_INIT_VERSION, originKeysType, TStorageType.COLUMN, storageMedium, shadowSchema, bfColumns, bfFpp, countDownLatch, indexes, diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 922b314a21..df90c9d9dd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -957,7 +957,7 @@ public class RestoreJob extends AbstractJob { Catalog.getCurrentInvertedIndex().addReplica(restoreTablet.getId(), restoreReplica); CreateReplicaTask task = new CreateReplicaTask(restoreReplica.getBackendId(), dbId, localTbl.getId(), restorePart.getId(), restoredIdx.getId(), - restoreTablet.getId(), indexMeta.getShortKeyColumnCount(), + restoreTablet.getId(), restoreReplica.getId(), indexMeta.getShortKeyColumnCount(), indexMeta.getSchemaHash(), restoreReplica.getVersion(), indexMeta.getKeysType(), TStorageType.COLUMN, TStorageMedium.HDD /* all restored replicas will be saved to HDD */, diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 7050344beb..9cf25f2af2 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -4807,7 +4807,8 @@ public class Catalog { List<Replica> replicas = tablet.getReplicas(); for (Replica replica : replicas) { long backendId = replica.getBackendId(); - DropReplicaTask dropTask = new DropReplicaTask(backendId, tabletId, schemaHash); + long replicaId = replica.getId(); + DropReplicaTask dropTask = new DropReplicaTask(backendId, tabletId, replicaId, schemaHash); batchTask.addTask(dropTask); } // end for replicas } // end for tablets diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java index 41a37240f3..bee4c382a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java @@ -125,7 +125,7 @@ public class PartitionInfo implements Writable { } public PartitionItem handleNewSinglePartitionDesc(SinglePartitionDesc desc, - long partitionId, boolean isTemp) throws DdlException { + long partitionId, boolean isTemp) throws DdlException { Preconditions.checkArgument(desc.isAnalyzed()); PartitionItem partitionItem = createAndCheckPartitionItem(desc, isTemp); setItemInternal(partitionId, isTemp, partitionItem); diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java index 89a827cd87..fb55091b35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java @@ -25,6 +25,7 @@ import org.apache.doris.catalog.TabletMeta; import org.apache.doris.clone.SchedException.Status; import org.apache.doris.clone.TabletSchedCtx.Priority; import org.apache.doris.clone.TabletScheduler.PathSlot; +import org.apache.doris.common.Config; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TStorageMedium; @@ -282,7 +283,7 @@ public class BeLoadRebalancer extends Rebalancer { continue; } - if (!clusterStat.isMoreBalanced(tabletCtx.getSrcBackendId(), beStat.getBeId(), + if (!Config.be_rebalancer_fuzzy_test && !clusterStat.isMoreBalanced(tabletCtx.getSrcBackendId(), beStat.getBeId(), tabletCtx.getTabletId(), tabletCtx.getTabletSize(), tabletCtx.getStorageMedium())) { continue; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java index 34d6536c24..a407347de3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java @@ -168,7 +168,7 @@ public class ClusterLoadStatistic { continue; } - if (Math.abs(beStat.getLoadScore(medium) - avgLoadScore) / avgLoadScore > Config.balance_load_score_threshold) { + if (Config.be_rebalancer_fuzzy_test) { if (beStat.getLoadScore(medium) > avgLoadScore) { beStat.setClazz(medium, Classification.HIGH); highCounter++; @@ -177,8 +177,19 @@ public class ClusterLoadStatistic { lowCounter++; } } else { - beStat.setClazz(medium, Classification.MID); - midCounter++; + if (Math.abs(beStat.getLoadScore(medium) - avgLoadScore) / avgLoadScore + > Config.balance_load_score_threshold) { + if (beStat.getLoadScore(medium) > avgLoadScore) { + beStat.setClazz(medium, Classification.HIGH); + highCounter++; + } else if (beStat.getLoadScore(medium) < avgLoadScore) { + beStat.setClazz(medium, Classification.LOW); + lowCounter++; + } + } else { + beStat.setClazz(medium, Classification.MID); + midCounter++; + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index 6bc3947cc8..d6e8eeeed9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -813,7 +813,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { Backend destBe = infoService.getBackend(destBackendId); if (destBe == null) { throw new SchedException(Status.SCHEDULE_FAILED, - "dest backend " + srcReplica.getBackendId() + " does not exist"); + "dest backend " + destBackendId + " does not exist"); } taskTimeoutMs = getApproximateTimeoutMs(); @@ -828,11 +828,6 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { // another clone task. // That is, we may need to use 2 clone tasks to create a new replica. It is inefficient, // but there is no other way now. - TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(), srcBe.getHttpPort()); - cloneTask = new CloneTask(destBackendId, dbId, tblId, partitionId, indexId, - tabletId, schemaHash, Lists.newArrayList(tSrcBe), storageMedium, - visibleVersion, (int) (taskTimeoutMs / 1000)); - cloneTask.setPathHash(srcPathHash, destPathHash); // if this is a balance task, or this is a repair task with REPLICA_MISSING/REPLICA_RELOCATING or REPLICA_MISSING_IN_CLUSTER, // we create a new replica with state CLONE @@ -847,6 +842,12 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { committedVersion, /* use committed version as last failed version */ -1 /* last success version */); + TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(), srcBe.getHttpPort()); + cloneTask = new CloneTask(destBackendId, dbId, tblId, partitionId, indexId, + tabletId, cloneReplica.getId(), schemaHash, Lists.newArrayList(tSrcBe), storageMedium, + visibleVersion, (int) (taskTimeoutMs / 1000)); + cloneTask.setPathHash(srcPathHash, destPathHash); + // addReplica() method will add this replica to tablet inverted index too. tablet.addReplica(cloneReplica); } else if (tabletStatus == TabletStatus.VERSION_INCOMPLETE) { @@ -861,6 +862,12 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { throw new SchedException(Status.SCHEDULE_FAILED, "dest replica's path hash is changed. " + "current: " + replica.getPathHash() + ", scheduled: " + destPathHash); } + + TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(), srcBe.getHttpPort()); + cloneTask = new CloneTask(destBackendId, dbId, tblId, partitionId, indexId, + tabletId, replica.getId(), schemaHash, Lists.newArrayList(tSrcBe), storageMedium, + visibleVersion, (int) (taskTimeoutMs / 1000)); + cloneTask.setPathHash(srcPathHash, destPathHash); } this.state = State.RUNNING; diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index a6c4501c6f..166fdc17a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -1135,7 +1135,8 @@ public class TabletScheduler extends MasterDaemon { // NOTICE: only delete the replica from meta may not work. sometimes we can depend on tablet report // deleting these replicas, but in FORCE_REDUNDANT case, replica may be added to meta again in report // process. - sendDeleteReplicaTask(replica.getBackendId(), tabletCtx.getTabletId(), tabletCtx.getSchemaHash()); + sendDeleteReplicaTask(replica.getBackendId(), tabletCtx.getTabletId(), replica.getId(), + tabletCtx.getSchemaHash()); } // write edit log @@ -1152,8 +1153,8 @@ public class TabletScheduler extends MasterDaemon { tabletCtx.getTabletId(), replica.getBackendId(), reason, force); } - private void sendDeleteReplicaTask(long backendId, long tabletId, int schemaHash) { - DropReplicaTask task = new DropReplicaTask(backendId, tabletId, schemaHash); + private void sendDeleteReplicaTask(long backendId, long tabletId, long replicaId, int schemaHash) { + DropReplicaTask task = new DropReplicaTask(backendId, tabletId, replicaId, schemaHash); AgentBatchTask batchTask = new AgentBatchTask(); batchTask.addTask(task); AgentTaskExecutor.submit(batchTask); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 65e6598d15..e5f109686d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1648,4 +1648,14 @@ public class Config extends ConfigBase { @ConfField(mutable = false, masterOnly = true) public static boolean enable_multi_catalog = false; // 1 min + /** + * If set to TRUE, FE will: + * 1. divide BE into high load and low load(no mid load) to force triggering tablet scheduling; + * 2. ignore whether the cluster can be more balanced during tablet scheduling; + * + * It's used to test the reliability in single replica case when tablet scheduling are frequent. + * Default is false. + */ + @ConfField(mutable = false, masterOnly = true) + public static boolean be_rebalancer_fuzzy_test = false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java index dcacc806a1..579715e46c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java @@ -1638,9 +1638,10 @@ public class InternalDataSource implements DataSourceIf { long tabletId = tablet.getId(); for (Replica replica : tablet.getReplicas()) { long backendId = replica.getBackendId(); + long replicaId = replica.getId(); countDownLatch.addMark(backendId, tabletId); CreateReplicaTask task = - new CreateReplicaTask(backendId, dbId, tableId, partitionId, indexId, tabletId, + new CreateReplicaTask(backendId, dbId, tableId, partitionId, indexId, tabletId, replicaId, shortKeyColumnCount, schemaHash, version, keysType, storageType, storageMedium, schema, bfColumns, bfFpp, countDownLatch, indexes, isInMemory, tabletType, dataSortInfo, compressionType); diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 3649b2bae8..7fb54271d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -594,7 +594,8 @@ public class ReportHandler extends Daemon { Set<String> bfColumns = olapTable.getCopiedBfColumns(); double bfFpp = olapTable.getBfFpp(); CreateReplicaTask createReplicaTask = new CreateReplicaTask(backendId, dbId, - tableId, partitionId, indexId, tabletId, indexMeta.getShortKeyColumnCount(), + tableId, partitionId, indexId, tabletId, replica.getId(), + indexMeta.getShortKeyColumnCount(), indexMeta.getSchemaHash(), partition.getVisibleVersion(), indexMeta.getKeysType(), TStorageType.COLUMN, @@ -692,7 +693,9 @@ public class ReportHandler extends Daemon { if (needDelete) { // drop replica - DropReplicaTask task = new DropReplicaTask(backendId, tabletId, backendTabletInfo.getSchemaHash()); + long replicaId = backendTabletInfo.getReplicaId(); + DropReplicaTask task = new DropReplicaTask(backendId, tabletId, replicaId, + backendTabletInfo.getSchemaHash()); batchTask.addTask(task); LOG.warn("delete tablet[" + tabletId + "] from backend[" + backendId + "] because not found in meta"); ++deleteFromBackendCounter; diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CloneTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/CloneTask.java index 8ef9d8eca0..d17affa8c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/CloneTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/CloneTask.java @@ -30,6 +30,7 @@ public class CloneTask extends AgentTask { public static final int VERSION_2 = 2; private int schemaHash; + private long replicaId; private List<TBackend> srcBackends; private TStorageMedium storageMedium; @@ -42,10 +43,11 @@ public class CloneTask extends AgentTask { private int taskVersion = VERSION_1; - public CloneTask(long backendId, long dbId, long tableId, long partitionId, long indexId, - long tabletId, int schemaHash, List<TBackend> srcBackends, TStorageMedium storageMedium, - long visibleVersion, int timeoutS) { + public CloneTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId, + long replicaId, int schemaHash, List<TBackend> srcBackends, TStorageMedium storageMedium, + long visibleVersion, int timeoutS) { super(null, backendId, TTaskType.CLONE, dbId, tableId, partitionId, indexId, tabletId); + this.replicaId = replicaId; this.schemaHash = schemaHash; this.srcBackends = srcBackends; this.storageMedium = storageMedium; @@ -77,6 +79,7 @@ public class CloneTask extends AgentTask { public TCloneReq toThrift() { TCloneReq request = new TCloneReq(tabletId, schemaHash, srcBackends); + request.setReplicaId(replicaId); request.setStorageMedium(storageMedium); request.setCommittedVersion(visibleVersion); request.setTaskVersion(taskVersion); @@ -92,10 +95,12 @@ public class CloneTask extends AgentTask { @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("tablet id: ").append(tabletId).append(", schema hash: ").append(schemaHash); + sb.append("tablet id: ").append(tabletId).append(", replica id: ").append(replicaId).append(", schema hash: ") + .append(schemaHash); sb.append(", storageMedium: ").append(storageMedium.name()); sb.append(", visible version(hash): ").append(visibleVersion); - sb.append(", src backend: ").append(srcBackends.get(0).getHost()).append(", src path hash: ").append(srcPathHash); + sb.append(", src backend: ").append(srcBackends.get(0).getHost()).append(", src path hash: ") + .append(srcPathHash); sb.append(", dest backend: ").append(backendId).append(", dest path hash: ").append(destPathHash); return sb.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java index 8718dd7663..0e45b86cf2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java @@ -47,6 +47,7 @@ import java.util.Set; public class CreateReplicaTask extends AgentTask { private static final Logger LOG = LogManager.getLogger(CreateReplicaTask.class); + private long replicaId; private short shortKeyColumnCount; private int schemaHash; @@ -89,7 +90,7 @@ public class CreateReplicaTask extends AgentTask { private DataSortInfo dataSortInfo; public CreateReplicaTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId, - short shortKeyColumnCount, int schemaHash, long version, + long replicaId, short shortKeyColumnCount, int schemaHash, long version, KeysType keysType, TStorageType storageType, TStorageMedium storageMedium, List<Column> columns, Set<String> bfColumns, double bfFpp, MarkedCountDownLatch<Long, Long> latch, @@ -98,6 +99,7 @@ public class CreateReplicaTask extends AgentTask { TTabletType tabletType, TCompressionType compressionType) { super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId); + this.replicaId = replicaId; this.shortKeyColumnCount = shortKeyColumnCount; this.schemaHash = schemaHash; @@ -121,7 +123,7 @@ public class CreateReplicaTask extends AgentTask { } public CreateReplicaTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId, - short shortKeyColumnCount, int schemaHash, long version, + long replicaId, short shortKeyColumnCount, int schemaHash, long version, KeysType keysType, TStorageType storageType, TStorageMedium storageMedium, List<Column> columns, Set<String> bfColumns, double bfFpp, MarkedCountDownLatch<Long, Long> latch, @@ -132,6 +134,7 @@ public class CreateReplicaTask extends AgentTask { TCompressionType compressionType) { super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId); + this.replicaId = replicaId; this.shortKeyColumnCount = shortKeyColumnCount; this.schemaHash = schemaHash; @@ -261,6 +264,7 @@ public class CreateReplicaTask extends AgentTask { } createTabletReq.setTableId(tableId); createTabletReq.setPartitionId(partitionId); + createTabletReq.setReplicaId(replicaId); if (baseTabletId != -1) { createTabletReq.setBaseTabletId(baseTabletId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/DropReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/DropReplicaTask.java index 41c3291dca..18643acd6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/DropReplicaTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/DropReplicaTask.java @@ -22,10 +22,12 @@ import org.apache.doris.thrift.TTaskType; public class DropReplicaTask extends AgentTask { private int schemaHash; // set -1L as unknown + private long replicaId; - public DropReplicaTask(long backendId, long tabletId, int schemaHash) { + public DropReplicaTask(long backendId, long tabletId, long replicaId, int schemaHash) { super(null, backendId, TTaskType.DROP, -1L, -1L, -1L, -1L, tabletId); this.schemaHash = schemaHash; + this.replicaId = replicaId; } public TDropTabletReq toThrift() { @@ -33,10 +35,15 @@ public class DropReplicaTask extends AgentTask { if (this.schemaHash != -1) { request.setSchemaHash(schemaHash); } + request.setReplicaId(replicaId); return request; } public int getSchemaHash() { return schemaHash; } + + public long getReplicaId() { + return replicaId; + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java index 31b84fa4af..3fe5d476b3 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java @@ -107,14 +107,14 @@ public class AgentTaskTest { // create createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId, partitionId, - indexId1, tabletId1, shortKeyNum, schemaHash1, + indexId1, tabletId1, replicaId1, shortKeyNum, schemaHash1, version, KeysType.AGG_KEYS, storageType, TStorageMedium.SSD, columns, null, 0, latch, null, false, TTabletType.TABLET_TYPE_DISK, TCompressionType.LZ4F); // drop - dropTask = new DropReplicaTask(backendId1, tabletId1, schemaHash1); + dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1, schemaHash1); // push pushTask = @@ -124,7 +124,7 @@ public class AgentTaskTest { // clone cloneTask = - new CloneTask(backendId1, dbId, tableId, partitionId, indexId1, tabletId1, schemaHash1, + new CloneTask(backendId1, dbId, tableId, partitionId, indexId1, tabletId1, replicaId1, schemaHash1, Arrays.asList(new TBackend("host1", 8290, 8390)), TStorageMedium.HDD, -1, 3600); // storageMediaMigrationTask @@ -240,7 +240,7 @@ public class AgentTaskTest { Assert.assertEquals(1, AgentTaskQueue.getTaskNum(backendId1, TTaskType.DROP, true)); dropTask.failed(); - DropReplicaTask dropTask2 = new DropReplicaTask(backendId2, tabletId1, schemaHash1); + DropReplicaTask dropTask2 = new DropReplicaTask(backendId2, tabletId1, replicaId1, schemaHash1); AgentTaskQueue.addTask(dropTask2); dropTask2.failed(); Assert.assertEquals(1, AgentTaskQueue.getTaskNum(backendId1, TTaskType.DROP, true)); diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index 3d5ee5e83a..8c1bd00aa3 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -262,6 +262,7 @@ message TabletMetaPB { repeated RowsetMetaPB stale_rs_metas = 18; optional StorageMediumPB storage_medium = 19 [default = HDD]; optional string remote_storage_name = 20; + optional int64 replica_id = 21 [default = 0]; } message OLAPIndexHeaderMessage { diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index 2233fae3fe..e695d7a737 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -118,11 +118,13 @@ struct TCreateTabletReq { 14: optional TTabletType tablet_type 15: optional TStorageParam storage_param 16: optional TCompressionType compression_type = TCompressionType.LZ4F + 17: optional Types.TReplicaId replica_id = 0 } struct TDropTabletReq { 1: required Types.TTabletId tablet_id 2: optional Types.TSchemaHash schema_hash + 3: optional Types.TReplicaId replica_id = 0 } struct TAlterTabletReq { @@ -204,6 +206,7 @@ struct TCloneReq { 8: optional i64 src_path_hash; 9: optional i64 dest_path_hash; 10: optional i32 timeout_s; + 11: optional Types.TReplicaId replica_id = 0 } struct TCompactionReq { diff --git a/gensrc/thrift/MasterService.thrift b/gensrc/thrift/MasterService.thrift index ded383b1b6..e49d34aacf 100644 --- a/gensrc/thrift/MasterService.thrift +++ b/gensrc/thrift/MasterService.thrift @@ -38,6 +38,7 @@ struct TTabletInfo { 12: optional bool used 13: optional Types.TPartitionId partition_id 14: optional bool is_in_memory + 15: optional Types.TReplicaId replica_id } struct TFinishTaskRequest { diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 098307c98e..13af910597 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -25,6 +25,7 @@ typedef i32 TTupleId typedef i32 TSlotId typedef i64 TTableId typedef i64 TTabletId +typedef i64 TReplicaId typedef i64 TVersion typedef i64 TVersionHash typedef i32 TSchemaHash --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org