This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c701ec83a42 [fix](migrate disk) fix migrate disk lost data during publish version (#29887) c701ec83a42 is described below commit c701ec83a42db15e9436bcac132294aba29893f4 Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Sun Jan 14 10:27:05 2024 +0800 [fix](migrate disk) fix migrate disk lost data during publish version (#29887) Co-authored-by: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com> --- be/src/agent/task_worker_pool.cpp | 12 +++- be/src/common/config.cpp | 5 +- be/src/common/config.h | 5 +- be/src/olap/tablet.h | 4 +- be/src/olap/task/engine_publish_version_task.cpp | 17 +++++ be/src/olap/task/engine_storage_migration_task.cpp | 16 ++--- be/src/olap/task/engine_storage_migration_task.h | 3 +- be/src/util/debug_points.h | 14 ++++ .../test_migrate_disk_with_publish_version.out | 11 +++ .../test_migrate_disk_with_publish_version.groovy | 84 ++++++++++++++++++++++ 10 files changed, 151 insertions(+), 20 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index a845b6253f9..f40fe73758f 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -856,7 +856,9 @@ void check_consistency_callback(StorageEngine& engine, const TAgentTaskRequest& void report_task_callback(const TMasterInfo& master_info) { TReportRequest request; - random_sleep(5); + if (config::report_random_wait) { + random_sleep(5); + } request.__isset.tasks = true; { std::lock_guard lock(s_task_signatures_mtx); @@ -880,7 +882,9 @@ void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info) // Random sleep 1~5 seconds before doing report. // In order to avoid the problem that the FE receives many report requests at the same time // and can not be processed. - random_sleep(5); + if (config::report_random_wait) { + random_sleep(5); + } TReportRequest request; request.__set_backend(BackendOptions::get_local_backend()); @@ -914,7 +918,9 @@ void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info) } void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_info) { - random_sleep(5); + if (config::report_random_wait) { + random_sleep(5); + } TReportRequest request; request.__set_backend(BackendOptions::get_local_backend()); diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 17ac1905dd6..2d418b2bf24 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -178,6 +178,9 @@ DEFINE_Int32(download_worker_count, "1"); DEFINE_Int32(make_snapshot_worker_count, "5"); // the count of thread to release snapshot DEFINE_Int32(release_snapshot_worker_count, "5"); +// report random wait a little time to avoid FE receiving multiple be reports at the same time. +// do not set it to false for production environment +DEFINE_mBool(report_random_wait, "true"); // the interval time(seconds) for agent report tasks signature to FE DEFINE_mInt32(report_task_interval_seconds, "10"); // the interval time(seconds) for refresh storage policy from FE @@ -192,8 +195,6 @@ DEFINE_mInt32(max_download_speed_kbps, "50000"); DEFINE_mInt32(download_low_speed_limit_kbps, "50"); // download low speed time(seconds) DEFINE_mInt32(download_low_speed_time, "300"); -// sleep time for one second -DEFINE_Int32(sleep_one_second, "1"); // log dir DEFINE_String(sys_log_dir, "${DORIS_HOME}/log"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 0f7c47f8ec3..7ae583cedb6 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -226,6 +226,9 @@ DECLARE_Int32(download_worker_count); DECLARE_Int32(make_snapshot_worker_count); // the count of thread to release snapshot DECLARE_Int32(release_snapshot_worker_count); +// report random wait a little time to avoid FE receiving multiple be reports at the same time. +// do not set it to false for production environment +DECLARE_mBool(report_random_wait); // the interval time(seconds) for agent report tasks signature to FE DECLARE_mInt32(report_task_interval_seconds); // the interval time(seconds) for refresh storage policy from FE @@ -240,8 +243,6 @@ DECLARE_mInt32(max_download_speed_kbps); DECLARE_mInt32(download_low_speed_limit_kbps); // download low speed time(seconds) DECLARE_mInt32(download_low_speed_time); -// sleep time for one second -DECLARE_Int32(sleep_one_second); // log dir DECLARE_String(sys_log_dir); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 8492b0b158e..d6ad0285233 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -195,7 +195,7 @@ public: std::mutex& get_base_compaction_lock() { return _base_compaction_lock; } std::mutex& get_cumulative_compaction_lock() { return _cumulative_compaction_lock; } - std::shared_mutex& get_migration_lock() { return _migration_lock; } + std::shared_timed_mutex& get_migration_lock() { return _migration_lock; } std::mutex& get_schema_change_lock() { return _schema_change_lock; } @@ -625,7 +625,7 @@ private: std::mutex _base_compaction_lock; std::mutex _cumulative_compaction_lock; std::mutex _schema_change_lock; - std::shared_mutex _migration_lock; + std::shared_timed_mutex _migration_lock; std::mutex _build_inverted_index_lock; // In unique key table with MoW, we should guarantee that only one diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index ac3cc82e526..78f5cb9c328 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -348,6 +348,13 @@ TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* engine_task } void TabletPublishTxnTask::handle() { + std::shared_lock migration_rlock(_tablet->get_migration_lock(), std::chrono::seconds(5)); + if (!migration_rlock.owns_lock()) { + _result = Status::Error<TRY_LOCK_FAILED, false>("got migration_rlock failed"); + LOG(WARNING) << "failed to publish version. tablet_id=" << _tablet_info.tablet_id + << ", txn_id=" << _transaction_id << ", res=" << _result; + return; + } std::unique_lock<std::mutex> rowset_update_lock(_tablet->get_rowset_update_lock(), std::defer_lock); if (_tablet->enable_unique_key_merge_on_write()) { @@ -364,6 +371,8 @@ void TabletPublishTxnTask::handle() { return; } + DBUG_EXECUTE_IF("EnginePublishVersionTask.handle.block_add_rowsets", DBUG_BLOCK); + // add visible rowset to tablet int64_t t1 = MonotonicMicros(); _result = _tablet->add_inc_rowset(_rowset); @@ -389,6 +398,12 @@ void TabletPublishTxnTask::handle() { } void AsyncTabletPublishTask::handle() { + std::shared_lock migration_rlock(_tablet->get_migration_lock(), std::chrono::seconds(5)); + if (!migration_rlock.owns_lock()) { + LOG(WARNING) << "failed to publish version. tablet_id=" << _tablet->tablet_id() + << ", txn_id=" << _transaction_id << ", got migration_rlock failed"; + return; + } std::lock_guard<std::mutex> wrlock(_tablet->get_rowset_update_lock()); _stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us; std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs; @@ -409,6 +424,8 @@ void AsyncTabletPublishTask::handle() { return; } + DBUG_EXECUTE_IF("EnginePublishVersionTask.handle.block_add_rowsets", DBUG_BLOCK); + // add visible rowset to tablet int64_t t1 = MonotonicMicros(); publish_status = _tablet->add_inc_rowset(rowset); diff --git a/be/src/olap/task/engine_storage_migration_task.cpp b/be/src/olap/task/engine_storage_migration_task.cpp index 8cd5717afb2..efa85406c69 100644 --- a/be/src/olap/task/engine_storage_migration_task.cpp +++ b/be/src/olap/task/engine_storage_migration_task.cpp @@ -49,8 +49,6 @@ namespace doris { using std::stringstream; -const int CHECK_TXNS_MAX_WAIT_TIME_SECS = 60; - EngineStorageMigrationTask::EngineStorageMigrationTask(const TabletSharedPtr& tablet, DataDir* dest_store) : _tablet(tablet), _dest_store(dest_store) { @@ -115,16 +113,15 @@ Status EngineStorageMigrationTask::_check_running_txns() { } Status EngineStorageMigrationTask::_check_running_txns_until_timeout( - std::unique_lock<std::shared_mutex>* migration_wlock) { + std::unique_lock<std::shared_timed_mutex>* migration_wlock) { // caller should not hold migration lock, and 'migration_wlock' should not be nullptr // ownership of the migration_wlock is transferred to the caller if check succ DCHECK_NE(migration_wlock, nullptr); Status res = Status::OK(); - int try_times = 1; do { // to avoid invalid loops, the lock is guaranteed to be acquired here { - std::unique_lock<std::shared_mutex> wlock(_tablet->get_migration_lock()); + std::unique_lock<std::shared_timed_mutex> wlock(_tablet->get_migration_lock()); if (_tablet->tablet_state() == TABLET_SHUTDOWN) { return Status::Error<ErrorCode::INTERNAL_ERROR, false>("tablet {} has deleted", _tablet->tablet_id()); @@ -136,8 +133,7 @@ Status EngineStorageMigrationTask::_check_running_txns_until_timeout( return res; } } - sleep(std::min(config::sleep_one_second * try_times, CHECK_TXNS_MAX_WAIT_TIME_SECS)); - ++try_times; + std::this_thread::sleep_for(std::chrono::milliseconds(200)); } while (!_is_timeout()); return res; } @@ -224,8 +220,8 @@ Status EngineStorageMigrationTask::_migrate() { uint64_t shard = 0; std::string full_path; { - std::unique_lock<std::shared_mutex> migration_wlock(_tablet->get_migration_lock(), - std::try_to_lock); + std::unique_lock<std::shared_timed_mutex> migration_wlock(_tablet->get_migration_lock(), + std::chrono::seconds(1)); if (!migration_wlock.owns_lock()) { return Status::InternalError("could not own migration_wlock"); } @@ -260,7 +256,7 @@ Status EngineStorageMigrationTask::_migrate() { if (!res.ok()) { break; } - std::unique_lock<std::shared_mutex> migration_wlock; + std::unique_lock<std::shared_timed_mutex> migration_wlock; res = _check_running_txns_until_timeout(&migration_wlock); if (!res.ok()) { break; diff --git a/be/src/olap/task/engine_storage_migration_task.h b/be/src/olap/task/engine_storage_migration_task.h index 2831fc11df4..c15b0576701 100644 --- a/be/src/olap/task/engine_storage_migration_task.h +++ b/be/src/olap/task/engine_storage_migration_task.h @@ -54,7 +54,8 @@ private: Status _check_running_txns(); // caller should not hold migration lock, and 'migration_wlock' should not be nullptr // ownership of the migration lock is transferred to the caller if check succ - Status _check_running_txns_until_timeout(std::unique_lock<std::shared_mutex>* migration_wlock); + Status _check_running_txns_until_timeout( + std::unique_lock<std::shared_timed_mutex>* migration_wlock); // if the size less than threshold, return true bool _is_rowsets_size_less_than_threshold( diff --git a/be/src/util/debug_points.h b/be/src/util/debug_points.h index 1106a548f8d..7af666f1196 100644 --- a/be/src/util/debug_points.h +++ b/be/src/util/debug_points.h @@ -19,9 +19,11 @@ #include <atomic> #include <boost/lexical_cast.hpp> +#include <chrono> #include <functional> #include <map> #include <memory> +#include <thread> #include <type_traits> #include "common/compiler_util.h" @@ -33,10 +35,22 @@ if (UNLIKELY(config::enable_debug_points)) { \ auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ if (dp) { \ + [[maybe_unused]] auto DP_NAME = debug_point_name; \ code; \ } \ } +// define some common debug actions +// usage example: DBUG_EXECUTE_IF("xxx", DBUG_BLOCK); +#define DBUG_BLOCK \ + { \ + LOG(INFO) << "start debug block " << DP_NAME; \ + while (DebugPoints::instance()->is_enable(DP_NAME)) { \ + std::this_thread::sleep_for(std::chrono::milliseconds(10)); \ + } \ + LOG(INFO) << "end debug block " << DP_NAME; \ + } + namespace doris { struct DebugPoint { diff --git a/regression-test/data/migrate_p0/test_migrate_disk_with_publish_version.out b/regression-test/data/migrate_p0/test_migrate_disk_with_publish_version.out new file mode 100644 index 00000000000..004ada24cbc --- /dev/null +++ b/regression-test/data/migrate_p0/test_migrate_disk_with_publish_version.out @@ -0,0 +1,11 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_1 -- +1 10 + +-- !select_2 -- +1 10 + +-- !select_3 -- +1 10 +2 20 + diff --git a/regression-test/suites/migrate_p0/test_migrate_disk_with_publish_version.groovy b/regression-test/suites/migrate_p0/test_migrate_disk_with_publish_version.groovy new file mode 100644 index 00000000000..e5b22b791af --- /dev/null +++ b/regression-test/suites/migrate_p0/test_migrate_disk_with_publish_version.groovy @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.NodeType + +suite('test_migrate_disk_with_publish_version') { + def checkTabletOnHDD = { isOnHdd -> + sleep 5000 + + def targetPathHash = [] as Set + sql_return_maparray("SHOW PROC '/backends'").each { + def paths = sql_return_maparray("SHOW PROC '/backends/${it.BackendId}'") + for (def path : paths) { + if (path.RootPath.endsWith(isOnHdd ? 'HDD' : 'SSD')) { + targetPathHash.add(path.PathHash) + } + } + } + + def tablets = sql_return_maparray 'SHOW TABLETS FROM tbl' + tablets.each { + assertTrue(it.PathHash in targetPathHash, "tablet path hash ${it.PathHash} not in ${targetPathHash}") + } + } + + def options = new ClusterOptions() + options.enableDebugPoints() + options.beConfigs += [ + 'report_random_wait=false', + 'report_tablet_interval_seconds=1', + 'report_disk_state_interval_seconds=1' + ] + options.beDisks = ['HDD=1', 'SSD=1' ] + docker(options) { + cluster.checkBeIsAlive(1, true) + cluster.checkBeIsAlive(2, true) + cluster.checkBeIsAlive(3, true) + sleep 2000 + + sql 'SET GLOBAL insert_visible_timeout_ms = 2000' + sql "ADMIN SET FRONTEND CONFIG ('agent_task_resend_wait_time_ms' = '1000')" + + sql 'CREATE TABLE tbl (k1 INT, k2 INT) DISTRIBUTED BY HASH(k1) BUCKETS 1' + sql 'INSERT INTO tbl VALUES (1, 10)' + + checkTabletOnHDD true + + // add debug point, txn will block + cluster.injectDebugPoints(NodeType.FE, ['PublishVersionDaemon.stop_publish':null]) + cluster.injectDebugPoints(NodeType.BE, ['EnginePublishVersionTask.handle.block_add_rowsets':null]) + sql 'INSERT INTO tbl VALUES (2, 20)' + + sql "ALTER TABLE tbl MODIFY PARTITION(*) SET ( 'storage_medium' = 'ssd' )" + // tablet has unfinished txn, it couldn't migrate among disks + checkTabletOnHDD true + + order_qt_select_1 'SELECT * FROM tbl' + + cluster.clearFrontendDebugPoints() + // tablet finished all txns, but publish thread hold the migrate lock, migrate will failed + checkTabletOnHDD true + order_qt_select_2 'SELECT * FROM tbl' + + cluster.clearBackendDebugPoints() + // tablet finished all txns, and publish thread not hold migrate lock, migrate should succ + checkTabletOnHDD false + order_qt_select_3 'SELECT * FROM tbl' + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org