This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new bf1c7a1c15b [fix](clone) fix stale tablet report miss the new cloning replica #38695 (#38839) bf1c7a1c15b is described below commit bf1c7a1c15b81ddbe5ec68f2b01162de6f24b290 Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Mon Aug 5 18:04:24 2024 +0800 [fix](clone) fix stale tablet report miss the new cloning replica #38695 (#38839) cherry pick from #38695 --- be/src/agent/task_worker_pool.cpp | 14 +++++++++++--- be/src/olap/task/engine_clone_task.cpp | 11 ++++++----- be/src/olap/task/engine_clone_task.h | 5 ++++- .../src/main/java/org/apache/doris/master/MasterImpl.java | 5 +++++ 4 files changed, 26 insertions(+), 9 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index e5bdf4dfb63..e79f45f3d17 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -98,6 +98,10 @@ std::unordered_map<TTaskType::type, std::unordered_set<int64_t>> s_task_signatur std::atomic_ulong s_report_version(time(nullptr) * 10000); +void increase_report_version() { + s_report_version.fetch_add(1, std::memory_order_relaxed); +} + // FIXME(plat1ko): Paired register and remove task info bool register_task_info(const TTaskType::type task_type, int64_t signature) { if (task_type == TTaskType::type::PUSH_STORAGE_POLICY || @@ -197,7 +201,7 @@ void alter_tablet(StorageEngine& engine, const TAgentTaskRequest& agent_task_req } if (status.ok()) { - s_report_version.fetch_add(1, std::memory_order_relaxed); + increase_report_version(); } // Return result to fe @@ -1381,7 +1385,7 @@ void create_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) .tag("tablet_id", create_tablet_req.tablet_id) .error(status); } else { - s_report_version.fetch_add(1, std::memory_order_relaxed); + increase_report_version(); // get path hash of the created tablet TabletSharedPtr tablet; { @@ -1476,7 +1480,7 @@ void push_callback(const TAgentTaskRequest& req) { .tag("signature", req.signature) .tag("tablet_id", push_req.tablet_id) .tag("push_type", push_req.push_type); - ++s_report_version; + increase_report_version(); finish_task_request.__set_finish_tablet_infos(tablet_infos); } else { LOG_WARNING("failed to execute push task") @@ -1743,6 +1747,10 @@ void clone_callback(StorageEngine& engine, const TMasterInfo& master_info, LOG_INFO("successfully clone tablet") .tag("signature", req.signature) .tag("tablet_id", clone_req.tablet_id); + if (engine_task.is_new_tablet()) { + increase_report_version(); + finish_task_request.__set_report_version(s_report_version); + } finish_task_request.__set_finish_tablet_infos(tablet_infos); } diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index 62af1fec61a..300b65527c1 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -190,7 +190,7 @@ Status EngineCloneTask::_do_clone() { tablet->tablet_id(), tablet->replica_id(), false)); tablet.reset(); } - bool is_new_tablet = tablet == nullptr; + _is_new_tablet = tablet == nullptr; // try to incremental clone std::vector<Version> missed_versions; // try to repair a tablet with missing version @@ -229,7 +229,7 @@ Status EngineCloneTask::_do_clone() { if (missed_versions.empty()) { LOG(INFO) << "missed version size = 0, skip clone and return success. tablet_id=" << _clone_req.tablet_id << " replica_id=" << _clone_req.replica_id; - RETURN_IF_ERROR(_set_tablet_info(is_new_tablet)); + RETURN_IF_ERROR(_set_tablet_info()); return Status::OK(); } @@ -308,10 +308,11 @@ Status EngineCloneTask::_do_clone() { TabletMeta::construct_header_file_path(tablet_dir, _clone_req.tablet_id); RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(header_path)); } - return _set_tablet_info(is_new_tablet); + + return _set_tablet_info(); } -Status EngineCloneTask::_set_tablet_info(bool is_new_tablet) { +Status EngineCloneTask::_set_tablet_info() { // Get clone tablet info TTabletInfo tablet_info; tablet_info.__set_tablet_id(_clone_req.tablet_id); @@ -321,7 +322,7 @@ Status EngineCloneTask::_set_tablet_info(bool is_new_tablet) { if (_clone_req.__isset.version && tablet_info.version < _clone_req.version) { // if it is a new tablet and clone failed, then remove the tablet // if it is incremental clone, then must not drop the tablet - if (is_new_tablet) { + if (_is_new_tablet) { // we need to check if this cloned table's version is what we expect. // if not, maybe this is a stale remaining table which is waiting for drop. // we drop it. diff --git a/be/src/olap/task/engine_clone_task.h b/be/src/olap/task/engine_clone_task.h index 6924bfc2aa9..80b9fdf4213 100644 --- a/be/src/olap/task/engine_clone_task.h +++ b/be/src/olap/task/engine_clone_task.h @@ -56,6 +56,8 @@ public: vector<TTabletInfo>* tablet_infos); ~EngineCloneTask() {} + bool is_new_tablet() const { return _is_new_tablet; } + private: Status _do_clone(); @@ -72,7 +74,7 @@ private: const vector<Version>& missing_versions, bool* allow_incremental_clone); - Status _set_tablet_info(bool is_new_tablet); + Status _set_tablet_info(); // Download tablet files from Status _download_files(DataDir* data_dir, const std::string& remote_url_prefix, @@ -95,6 +97,7 @@ private: int64_t _copy_size; int64_t _copy_time_ms; std::vector<PendingRowsetGuard> _pending_rs_guards; + bool _is_new_tablet = false; }; // EngineTask } // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java index 507378851a0..92479534241 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java @@ -532,6 +532,11 @@ public class MasterImpl { private void finishClone(AgentTask task, TFinishTaskRequest request) { CloneTask cloneTask = (CloneTask) task; if (cloneTask.getTaskVersion() == CloneTask.VERSION_2) { + if (request.isSetReportVersion()) { + long reportVersion = request.getReportVersion(); + Env.getCurrentSystemInfo().updateBackendReportVersion( + task.getBackendId(), reportVersion, task.getDbId(), task.getTableId()); + } Env.getCurrentEnv().getTabletScheduler().finishCloneTask(cloneTask, request); } else { LOG.warn("invalid clone task, ignore it. {}", task); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org