This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c701ec83a42 [fix](migrate disk) fix migrate disk lost data during 
publish version (#29887)
c701ec83a42 is described below

commit c701ec83a42db15e9436bcac132294aba29893f4
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Sun Jan 14 10:27:05 2024 +0800

    [fix](migrate disk) fix migrate disk lost data during publish version 
(#29887)
    
    Co-authored-by: Yongqiang YANG 
<98214048+dataroar...@users.noreply.github.com>
---
 be/src/agent/task_worker_pool.cpp                  | 12 +++-
 be/src/common/config.cpp                           |  5 +-
 be/src/common/config.h                             |  5 +-
 be/src/olap/tablet.h                               |  4 +-
 be/src/olap/task/engine_publish_version_task.cpp   | 17 +++++
 be/src/olap/task/engine_storage_migration_task.cpp | 16 ++---
 be/src/olap/task/engine_storage_migration_task.h   |  3 +-
 be/src/util/debug_points.h                         | 14 ++++
 .../test_migrate_disk_with_publish_version.out     | 11 +++
 .../test_migrate_disk_with_publish_version.groovy  | 84 ++++++++++++++++++++++
 10 files changed, 151 insertions(+), 20 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index a845b6253f9..f40fe73758f 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -856,7 +856,9 @@ void check_consistency_callback(StorageEngine& engine, 
const TAgentTaskRequest&
 
 void report_task_callback(const TMasterInfo& master_info) {
     TReportRequest request;
-    random_sleep(5);
+    if (config::report_random_wait) {
+        random_sleep(5);
+    }
     request.__isset.tasks = true;
     {
         std::lock_guard lock(s_task_signatures_mtx);
@@ -880,7 +882,9 @@ void report_disk_callback(StorageEngine& engine, const 
TMasterInfo& master_info)
     // Random sleep 1~5 seconds before doing report.
     // In order to avoid the problem that the FE receives many report requests 
at the same time
     // and can not be processed.
-    random_sleep(5);
+    if (config::report_random_wait) {
+        random_sleep(5);
+    }
 
     TReportRequest request;
     request.__set_backend(BackendOptions::get_local_backend());
@@ -914,7 +918,9 @@ void report_disk_callback(StorageEngine& engine, const 
TMasterInfo& master_info)
 }
 
 void report_tablet_callback(StorageEngine& engine, const TMasterInfo& 
master_info) {
-    random_sleep(5);
+    if (config::report_random_wait) {
+        random_sleep(5);
+    }
 
     TReportRequest request;
     request.__set_backend(BackendOptions::get_local_backend());
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 17ac1905dd6..2d418b2bf24 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -178,6 +178,9 @@ DEFINE_Int32(download_worker_count, "1");
 DEFINE_Int32(make_snapshot_worker_count, "5");
 // the count of thread to release snapshot
 DEFINE_Int32(release_snapshot_worker_count, "5");
+// report random wait a little time to avoid FE receiving multiple be reports 
at the same time.
+// do not set it to false for production environment
+DEFINE_mBool(report_random_wait, "true");
 // the interval time(seconds) for agent report tasks signature to FE
 DEFINE_mInt32(report_task_interval_seconds, "10");
 // the interval time(seconds) for refresh storage policy from FE
@@ -192,8 +195,6 @@ DEFINE_mInt32(max_download_speed_kbps, "50000");
 DEFINE_mInt32(download_low_speed_limit_kbps, "50");
 // download low speed time(seconds)
 DEFINE_mInt32(download_low_speed_time, "300");
-// sleep time for one second
-DEFINE_Int32(sleep_one_second, "1");
 
 // log dir
 DEFINE_String(sys_log_dir, "${DORIS_HOME}/log");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 0f7c47f8ec3..7ae583cedb6 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -226,6 +226,9 @@ DECLARE_Int32(download_worker_count);
 DECLARE_Int32(make_snapshot_worker_count);
 // the count of thread to release snapshot
 DECLARE_Int32(release_snapshot_worker_count);
+// report random wait a little time to avoid FE receiving multiple be reports 
at the same time.
+// do not set it to false for production environment
+DECLARE_mBool(report_random_wait);
 // the interval time(seconds) for agent report tasks signature to FE
 DECLARE_mInt32(report_task_interval_seconds);
 // the interval time(seconds) for refresh storage policy from FE
@@ -240,8 +243,6 @@ DECLARE_mInt32(max_download_speed_kbps);
 DECLARE_mInt32(download_low_speed_limit_kbps);
 // download low speed time(seconds)
 DECLARE_mInt32(download_low_speed_time);
-// sleep time for one second
-DECLARE_Int32(sleep_one_second);
 
 // log dir
 DECLARE_String(sys_log_dir);
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 8492b0b158e..d6ad0285233 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -195,7 +195,7 @@ public:
     std::mutex& get_base_compaction_lock() { return _base_compaction_lock; }
     std::mutex& get_cumulative_compaction_lock() { return 
_cumulative_compaction_lock; }
 
-    std::shared_mutex& get_migration_lock() { return _migration_lock; }
+    std::shared_timed_mutex& get_migration_lock() { return _migration_lock; }
 
     std::mutex& get_schema_change_lock() { return _schema_change_lock; }
 
@@ -625,7 +625,7 @@ private:
     std::mutex _base_compaction_lock;
     std::mutex _cumulative_compaction_lock;
     std::mutex _schema_change_lock;
-    std::shared_mutex _migration_lock;
+    std::shared_timed_mutex _migration_lock;
     std::mutex _build_inverted_index_lock;
 
     // In unique key table with MoW, we should guarantee that only one
diff --git a/be/src/olap/task/engine_publish_version_task.cpp 
b/be/src/olap/task/engine_publish_version_task.cpp
index ac3cc82e526..78f5cb9c328 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -348,6 +348,13 @@ 
TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* engine_task
 }
 
 void TabletPublishTxnTask::handle() {
+    std::shared_lock migration_rlock(_tablet->get_migration_lock(), 
std::chrono::seconds(5));
+    if (!migration_rlock.owns_lock()) {
+        _result = Status::Error<TRY_LOCK_FAILED, false>("got migration_rlock 
failed");
+        LOG(WARNING) << "failed to publish version. tablet_id=" << 
_tablet_info.tablet_id
+                     << ", txn_id=" << _transaction_id << ", res=" << _result;
+        return;
+    }
     std::unique_lock<std::mutex> 
rowset_update_lock(_tablet->get_rowset_update_lock(),
                                                     std::defer_lock);
     if (_tablet->enable_unique_key_merge_on_write()) {
@@ -364,6 +371,8 @@ void TabletPublishTxnTask::handle() {
         return;
     }
 
+    DBUG_EXECUTE_IF("EnginePublishVersionTask.handle.block_add_rowsets", 
DBUG_BLOCK);
+
     // add visible rowset to tablet
     int64_t t1 = MonotonicMicros();
     _result = _tablet->add_inc_rowset(_rowset);
@@ -389,6 +398,12 @@ void TabletPublishTxnTask::handle() {
 }
 
 void AsyncTabletPublishTask::handle() {
+    std::shared_lock migration_rlock(_tablet->get_migration_lock(), 
std::chrono::seconds(5));
+    if (!migration_rlock.owns_lock()) {
+        LOG(WARNING) << "failed to publish version. tablet_id=" << 
_tablet->tablet_id()
+                     << ", txn_id=" << _transaction_id << ", got 
migration_rlock failed";
+        return;
+    }
     std::lock_guard<std::mutex> wrlock(_tablet->get_rowset_update_lock());
     _stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us;
     std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
@@ -409,6 +424,8 @@ void AsyncTabletPublishTask::handle() {
         return;
     }
 
+    DBUG_EXECUTE_IF("EnginePublishVersionTask.handle.block_add_rowsets", 
DBUG_BLOCK);
+
     // add visible rowset to tablet
     int64_t t1 = MonotonicMicros();
     publish_status = _tablet->add_inc_rowset(rowset);
diff --git a/be/src/olap/task/engine_storage_migration_task.cpp 
b/be/src/olap/task/engine_storage_migration_task.cpp
index 8cd5717afb2..efa85406c69 100644
--- a/be/src/olap/task/engine_storage_migration_task.cpp
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -49,8 +49,6 @@ namespace doris {
 
 using std::stringstream;
 
-const int CHECK_TXNS_MAX_WAIT_TIME_SECS = 60;
-
 EngineStorageMigrationTask::EngineStorageMigrationTask(const TabletSharedPtr& 
tablet,
                                                        DataDir* dest_store)
         : _tablet(tablet), _dest_store(dest_store) {
@@ -115,16 +113,15 @@ Status EngineStorageMigrationTask::_check_running_txns() {
 }
 
 Status EngineStorageMigrationTask::_check_running_txns_until_timeout(
-        std::unique_lock<std::shared_mutex>* migration_wlock) {
+        std::unique_lock<std::shared_timed_mutex>* migration_wlock) {
     // caller should not hold migration lock, and 'migration_wlock' should not 
be nullptr
     // ownership of the migration_wlock is transferred to the caller if check 
succ
     DCHECK_NE(migration_wlock, nullptr);
     Status res = Status::OK();
-    int try_times = 1;
     do {
         // to avoid invalid loops, the lock is guaranteed to be acquired here
         {
-            std::unique_lock<std::shared_mutex> 
wlock(_tablet->get_migration_lock());
+            std::unique_lock<std::shared_timed_mutex> 
wlock(_tablet->get_migration_lock());
             if (_tablet->tablet_state() == TABLET_SHUTDOWN) {
                 return Status::Error<ErrorCode::INTERNAL_ERROR, false>("tablet 
{} has deleted",
                                                                        
_tablet->tablet_id());
@@ -136,8 +133,7 @@ Status 
EngineStorageMigrationTask::_check_running_txns_until_timeout(
                 return res;
             }
         }
-        sleep(std::min(config::sleep_one_second * try_times, 
CHECK_TXNS_MAX_WAIT_TIME_SECS));
-        ++try_times;
+        std::this_thread::sleep_for(std::chrono::milliseconds(200));
     } while (!_is_timeout());
     return res;
 }
@@ -224,8 +220,8 @@ Status EngineStorageMigrationTask::_migrate() {
     uint64_t shard = 0;
     std::string full_path;
     {
-        std::unique_lock<std::shared_mutex> 
migration_wlock(_tablet->get_migration_lock(),
-                                                            std::try_to_lock);
+        std::unique_lock<std::shared_timed_mutex> 
migration_wlock(_tablet->get_migration_lock(),
+                                                                  
std::chrono::seconds(1));
         if (!migration_wlock.owns_lock()) {
             return Status::InternalError("could not own migration_wlock");
         }
@@ -260,7 +256,7 @@ Status EngineStorageMigrationTask::_migrate() {
         if (!res.ok()) {
             break;
         }
-        std::unique_lock<std::shared_mutex> migration_wlock;
+        std::unique_lock<std::shared_timed_mutex> migration_wlock;
         res = _check_running_txns_until_timeout(&migration_wlock);
         if (!res.ok()) {
             break;
diff --git a/be/src/olap/task/engine_storage_migration_task.h 
b/be/src/olap/task/engine_storage_migration_task.h
index 2831fc11df4..c15b0576701 100644
--- a/be/src/olap/task/engine_storage_migration_task.h
+++ b/be/src/olap/task/engine_storage_migration_task.h
@@ -54,7 +54,8 @@ private:
     Status _check_running_txns();
     // caller should not hold migration lock, and 'migration_wlock' should not 
be nullptr
     // ownership of the migration lock is transferred to the caller if check 
succ
-    Status 
_check_running_txns_until_timeout(std::unique_lock<std::shared_mutex>* 
migration_wlock);
+    Status _check_running_txns_until_timeout(
+            std::unique_lock<std::shared_timed_mutex>* migration_wlock);
 
     // if the size less than threshold, return true
     bool _is_rowsets_size_less_than_threshold(
diff --git a/be/src/util/debug_points.h b/be/src/util/debug_points.h
index 1106a548f8d..7af666f1196 100644
--- a/be/src/util/debug_points.h
+++ b/be/src/util/debug_points.h
@@ -19,9 +19,11 @@
 
 #include <atomic>
 #include <boost/lexical_cast.hpp>
+#include <chrono>
 #include <functional>
 #include <map>
 #include <memory>
+#include <thread>
 #include <type_traits>
 
 #include "common/compiler_util.h"
@@ -33,10 +35,22 @@
     if (UNLIKELY(config::enable_debug_points)) {                              \
         auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
         if (dp) {                                                             \
+            [[maybe_unused]] auto DP_NAME = debug_point_name;                 \
             code;                                                             \
         }                                                                     \
     }
 
+// define some common debug actions
+// usage example: DBUG_EXECUTE_IF("xxx", DBUG_BLOCK);
+#define DBUG_BLOCK                                                      \
+    {                                                                   \
+        LOG(INFO) << "start debug block " << DP_NAME;                   \
+        while (DebugPoints::instance()->is_enable(DP_NAME)) {           \
+            std::this_thread::sleep_for(std::chrono::milliseconds(10)); \
+        }                                                               \
+        LOG(INFO) << "end debug block " << DP_NAME;                     \
+    }
+
 namespace doris {
 
 struct DebugPoint {
diff --git 
a/regression-test/data/migrate_p0/test_migrate_disk_with_publish_version.out 
b/regression-test/data/migrate_p0/test_migrate_disk_with_publish_version.out
new file mode 100644
index 00000000000..004ada24cbc
--- /dev/null
+++ b/regression-test/data/migrate_p0/test_migrate_disk_with_publish_version.out
@@ -0,0 +1,11 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_1 --
+1      10
+
+-- !select_2 --
+1      10
+
+-- !select_3 --
+1      10
+2      20
+
diff --git 
a/regression-test/suites/migrate_p0/test_migrate_disk_with_publish_version.groovy
 
b/regression-test/suites/migrate_p0/test_migrate_disk_with_publish_version.groovy
new file mode 100644
index 00000000000..e5b22b791af
--- /dev/null
+++ 
b/regression-test/suites/migrate_p0/test_migrate_disk_with_publish_version.groovy
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+
+suite('test_migrate_disk_with_publish_version') {
+    def checkTabletOnHDD = { isOnHdd ->
+        sleep 5000
+
+        def targetPathHash = [] as Set
+        sql_return_maparray("SHOW PROC '/backends'").each {
+            def paths = sql_return_maparray("SHOW PROC 
'/backends/${it.BackendId}'")
+            for (def path : paths) {
+                if (path.RootPath.endsWith(isOnHdd ? 'HDD' : 'SSD')) {
+                    targetPathHash.add(path.PathHash)
+                }
+            }
+        }
+
+        def tablets = sql_return_maparray 'SHOW TABLETS FROM tbl'
+        tablets.each {
+            assertTrue(it.PathHash in targetPathHash, "tablet path hash 
${it.PathHash} not in ${targetPathHash}")
+        }
+    }
+
+    def options = new ClusterOptions()
+    options.enableDebugPoints()
+    options.beConfigs += [
+        'report_random_wait=false',
+        'report_tablet_interval_seconds=1',
+        'report_disk_state_interval_seconds=1'
+    ]
+    options.beDisks = ['HDD=1', 'SSD=1' ]
+    docker(options) {
+        cluster.checkBeIsAlive(1, true)
+        cluster.checkBeIsAlive(2, true)
+        cluster.checkBeIsAlive(3, true)
+        sleep 2000
+
+        sql 'SET GLOBAL insert_visible_timeout_ms = 2000'
+        sql "ADMIN SET FRONTEND CONFIG ('agent_task_resend_wait_time_ms' = 
'1000')"
+
+        sql 'CREATE TABLE tbl (k1 INT, k2 INT) DISTRIBUTED BY HASH(k1) BUCKETS 
1'
+        sql 'INSERT INTO tbl VALUES (1, 10)'
+
+        checkTabletOnHDD true
+
+        // add debug point, txn will block
+        cluster.injectDebugPoints(NodeType.FE, 
['PublishVersionDaemon.stop_publish':null])
+        cluster.injectDebugPoints(NodeType.BE, 
['EnginePublishVersionTask.handle.block_add_rowsets':null])
+        sql 'INSERT INTO tbl VALUES (2, 20)'
+
+        sql "ALTER TABLE tbl MODIFY PARTITION(*) SET ( 'storage_medium' = 
'ssd' )"
+        // tablet has unfinished txn, it couldn't migrate among disks
+        checkTabletOnHDD true
+
+        order_qt_select_1 'SELECT * FROM tbl'
+
+        cluster.clearFrontendDebugPoints()
+        // tablet finished all txns, but publish thread hold the migrate lock, 
migrate will failed
+        checkTabletOnHDD true
+        order_qt_select_2 'SELECT * FROM tbl'
+
+        cluster.clearBackendDebugPoints()
+        // tablet finished all txns, and publish thread not hold migrate lock, 
migrate should succ
+        checkTabletOnHDD false
+        order_qt_select_3 'SELECT * FROM tbl'
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to