This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit c95c572dce215e92732dd1d68792af6c3112f658
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Sun Sep 10 11:41:00 2023 +0800

    [fix](create tablet) fix backend create tablet timeout (#23879)
---
 be/src/olap/tablet.cpp         |   1 +
 be/src/olap/tablet_manager.cpp | 275 +++++++++++++++++++++--------------------
 be/src/olap/tablet_manager.h   |  22 ++--
 3 files changed, 148 insertions(+), 150 deletions(-)

diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 09a41f79d2..c732b200f7 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -3512,6 +3512,7 @@ RowsetIdUnorderedSet Tablet::all_rs_id(int64_t 
max_version) const {
 }
 
 bool Tablet::check_all_rowset_segment() {
+    std::shared_lock rdlock(_meta_lock);
     for (auto& version_rowset : _rs_version_map) {
         RowsetSharedPtr rowset = version_rowset.second;
         if (!rowset->check_rowset_segment()) {
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 6bc5746d7b..aa9ef37041 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -630,6 +630,32 @@ TabletSharedPtr TabletManager::get_tablet(TTabletId 
tablet_id, bool include_dele
     return _get_tablet_unlocked(tablet_id, include_deleted, err);
 }
 
+std::vector<TabletSharedPtr> 
TabletManager::get_all_tablet(std::function<bool(Tablet*)>&& filter) {
+    std::vector<TabletSharedPtr> res;
+    for_each_tablet([&](const TabletSharedPtr& tablet) { 
res.emplace_back(tablet); },
+                    std::move(filter));
+    return res;
+}
+
+void TabletManager::for_each_tablet(std::function<void(const 
TabletSharedPtr&)>&& handler,
+                                    std::function<bool(Tablet*)>&& filter) {
+    std::vector<TabletSharedPtr> tablets;
+    for (const auto& tablets_shard : _tablets_shards) {
+        tablets.clear();
+        {
+            std::shared_lock rdlock(tablets_shard.lock);
+            for (const auto& [id, tablet] : tablets_shard.tablet_map) {
+                if (filter(tablet.get())) {
+                    tablets.emplace_back(tablet);
+                }
+            }
+        }
+        for (const auto& tablet : tablets) {
+            handler(tablet);
+        }
+    }
+}
+
 std::pair<TabletSharedPtr, Status> 
TabletManager::get_tablet_and_status(TTabletId tablet_id,
                                                                         bool 
include_deleted) {
     std::string err;
@@ -687,23 +713,15 @@ TabletSharedPtr TabletManager::get_tablet(TTabletId 
tablet_id, TabletUid tablet_
 
 uint64_t TabletManager::get_rowset_nums() {
     uint64_t rowset_nums = 0;
-    for (const auto& tablets_shard : _tablets_shards) {
-        std::shared_lock rdlock(tablets_shard.lock);
-        for (const auto& tablet_map : tablets_shard.tablet_map) {
-            rowset_nums += tablet_map.second->version_count();
-        }
-    }
+    for_each_tablet([&](const TabletSharedPtr& tablet) { rowset_nums += 
tablet->version_count(); },
+                    filter_all_tablets);
     return rowset_nums;
 }
 
 uint64_t TabletManager::get_segment_nums() {
     uint64_t segment_nums = 0;
-    for (const auto& tablets_shard : _tablets_shards) {
-        std::shared_lock rdlock(tablets_shard.lock);
-        for (const auto& tablet_map : tablets_shard.tablet_map) {
-            segment_nums += tablet_map.second->segment_count();
-        }
-    }
+    for_each_tablet([&](const TabletSharedPtr& tablet) { segment_nums += 
tablet->segment_count(); },
+                    filter_all_tablets);
     return segment_nums;
 }
 
@@ -761,65 +779,62 @@ TabletSharedPtr 
TabletManager::find_best_tablet_to_compaction(
     uint32_t highest_score = 0;
     uint32_t compaction_score = 0;
     TabletSharedPtr best_tablet;
-    for (const auto& tablets_shard : _tablets_shards) {
-        std::shared_lock rdlock(tablets_shard.lock);
-        for (const auto& tablet_map : tablets_shard.tablet_map) {
-            const TabletSharedPtr& tablet_ptr = tablet_map.second;
-            if (config::enable_skip_tablet_compaction &&
-                tablet_ptr->should_skip_compaction(compaction_type, 
UnixSeconds())) {
-                continue;
-            }
-            if (!tablet_ptr->can_do_compaction(data_dir->path_hash(), 
compaction_type)) {
-                continue;
-            }
+    auto handler = [&](const TabletSharedPtr& tablet_ptr) {
+        if (config::enable_skip_tablet_compaction &&
+            tablet_ptr->should_skip_compaction(compaction_type, 
UnixSeconds())) {
+            return;
+        }
+        if (!tablet_ptr->can_do_compaction(data_dir->path_hash(), 
compaction_type)) {
+            return;
+        }
 
-            auto search = 
tablet_submitted_compaction.find(tablet_ptr->tablet_id());
-            if (search != tablet_submitted_compaction.end()) {
-                continue;
-            }
+        auto search = 
tablet_submitted_compaction.find(tablet_ptr->tablet_id());
+        if (search != tablet_submitted_compaction.end()) {
+            return;
+        }
 
-            int64_t last_failure_ms = 
tablet_ptr->last_cumu_compaction_failure_time();
-            if (compaction_type == CompactionType::BASE_COMPACTION) {
-                last_failure_ms = 
tablet_ptr->last_base_compaction_failure_time();
-            }
-            if (now_ms - last_failure_ms <= 5000) {
-                VLOG_DEBUG << "Too often to check compaction, skip it. "
-                           << "compaction_type=" << compaction_type_str
-                           << ", last_failure_time_ms=" << last_failure_ms
-                           << ", tablet_id=" << tablet_ptr->tablet_id();
-                continue;
-            }
+        int64_t last_failure_ms = 
tablet_ptr->last_cumu_compaction_failure_time();
+        if (compaction_type == CompactionType::BASE_COMPACTION) {
+            last_failure_ms = tablet_ptr->last_base_compaction_failure_time();
+        }
+        if (now_ms - last_failure_ms <= 5000) {
+            VLOG_DEBUG << "Too often to check compaction, skip it. "
+                       << "compaction_type=" << compaction_type_str
+                       << ", last_failure_time_ms=" << last_failure_ms
+                       << ", tablet_id=" << tablet_ptr->tablet_id();
+            return;
+        }
 
-            if (compaction_type == CompactionType::BASE_COMPACTION) {
-                std::unique_lock<std::mutex> 
lock(tablet_ptr->get_base_compaction_lock(),
-                                                  std::try_to_lock);
-                if (!lock.owns_lock()) {
-                    LOG(INFO) << "can not get base lock: " << 
tablet_ptr->tablet_id();
-                    continue;
-                }
-            } else {
-                std::unique_lock<std::mutex> 
lock(tablet_ptr->get_cumulative_compaction_lock(),
-                                                  std::try_to_lock);
-                if (!lock.owns_lock()) {
-                    LOG(INFO) << "can not get cumu lock: " << 
tablet_ptr->tablet_id();
-                    continue;
-                }
-            }
-            auto cumulative_compaction_policy = 
all_cumulative_compaction_policies.at(
-                    tablet_ptr->tablet_meta()->compaction_policy());
-            uint32_t current_compaction_score = 
tablet_ptr->calc_compaction_score(
-                    compaction_type, cumulative_compaction_policy);
-            if (current_compaction_score < 5) {
-                tablet_ptr->set_skip_compaction(true, compaction_type, 
UnixSeconds());
+        if (compaction_type == CompactionType::BASE_COMPACTION) {
+            std::unique_lock<std::mutex> 
lock(tablet_ptr->get_base_compaction_lock(),
+                                              std::try_to_lock);
+            if (!lock.owns_lock()) {
+                LOG(INFO) << "can not get base lock: " << 
tablet_ptr->tablet_id();
+                return;
             }
-            if (current_compaction_score > highest_score) {
-                highest_score = current_compaction_score;
-                compaction_score = current_compaction_score;
-                best_tablet = tablet_ptr;
+        } else {
+            std::unique_lock<std::mutex> 
lock(tablet_ptr->get_cumulative_compaction_lock(),
+                                              std::try_to_lock);
+            if (!lock.owns_lock()) {
+                LOG(INFO) << "can not get cumu lock: " << 
tablet_ptr->tablet_id();
+                return;
             }
         }
-    }
+        auto cumulative_compaction_policy = 
all_cumulative_compaction_policies.at(
+                tablet_ptr->tablet_meta()->compaction_policy());
+        uint32_t current_compaction_score =
+                tablet_ptr->calc_compaction_score(compaction_type, 
cumulative_compaction_policy);
+        if (current_compaction_score < 5) {
+            tablet_ptr->set_skip_compaction(true, compaction_type, 
UnixSeconds());
+        }
+        if (current_compaction_score > highest_score) {
+            highest_score = current_compaction_score;
+            compaction_score = current_compaction_score;
+            best_tablet = tablet_ptr;
+        }
+    };
 
+    for_each_tablet(handler, filter_all_tablets);
     if (best_tablet != nullptr) {
         VLOG_CRITICAL << "Found the best tablet for compaction. "
                       << "compaction_type=" << compaction_type_str
@@ -1037,10 +1052,8 @@ Status 
TabletManager::build_all_report_tablets_info(std::map<TTabletId, TTablet>
 
     DorisMetrics::instance()->report_all_tablets_requests_total->increment(1);
     HistogramStat tablet_version_num_hist;
-    auto tablets = get_all_tablet([](Tablet*) { return true; });
     auto local_cache = std::make_shared<std::vector<TTabletStat>>();
-    local_cache->reserve(tablets.size());
-    for (auto& tablet : tablets) {
+    auto handler = [&](const TabletSharedPtr& tablet) {
         auto& t_tablet = (*tablets_info)[tablet->tablet_id()];
         TTabletInfo& tablet_info = t_tablet.tablet_infos.emplace_back();
         tablet->build_tablet_report_info(&tablet_info, true, true);
@@ -1058,7 +1071,9 @@ Status 
TabletManager::build_all_report_tablets_info(std::map<TTabletId, TTablet>
         t_tablet_stat.__set_remote_data_size(tablet_info.remote_data_size);
         t_tablet_stat.__set_row_num(tablet_info.row_count);
         t_tablet_stat.__set_version_count(tablet_info.version_count);
-    }
+    };
+    for_each_tablet(handler, filter_all_tablets);
+
     {
         std::lock_guard<std::mutex> guard(_tablet_stat_cache_mutex);
         _tablet_stat_list_cache.swap(local_cache);
@@ -1072,23 +1087,9 @@ Status 
TabletManager::build_all_report_tablets_info(std::map<TTabletId, TTablet>
 Status TabletManager::start_trash_sweep() {
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
     {
-        std::vector<TabletSharedPtr>
-                all_tablets; // we use this vector to save all tablet ptr for 
saving lock time.
-        for (auto& tablets_shard : _tablets_shards) {
-            tablet_map_t& tablet_map = tablets_shard.tablet_map;
-            {
-                std::shared_lock rdlock(tablets_shard.lock);
-                for (auto& item : tablet_map) {
-                    // try to clean empty item
-                    all_tablets.push_back(item.second);
-                }
-            }
-            // Avoid hold the shard lock too long, so we get tablet to a 
vector and clean here
-            for (const auto& tablet : all_tablets) {
-                tablet->delete_expired_stale_rowset();
-            }
-            all_tablets.clear();
-        }
+        for_each_tablet(
+                [](const TabletSharedPtr& tablet) { 
tablet->delete_expired_stale_rowset(); },
+                filter_all_tablets);
     }
 
     int32_t clean_num = 0;
@@ -1242,12 +1243,13 @@ void 
TabletManager::update_root_path_info(std::map<string, DataDirInfo>* path_ma
         return iter != path_map->end() && iter->second.is_used;
     };
 
-    auto tablets = get_all_tablet(filter);
-    for (const auto& tablet : tablets) {
+    auto handler = [&](const TabletSharedPtr& tablet) {
         auto& data_dir_info = (*path_map)[tablet->data_dir()->path()];
         data_dir_info.local_used_capacity += tablet->tablet_local_size();
         data_dir_info.remote_used_capacity += tablet->tablet_remote_size();
-    }
+    };
+
+    for_each_tablet(handler, filter);
 }
 
 void TabletManager::get_partition_related_tablets(int64_t partition_id,
@@ -1260,24 +1262,13 @@ void 
TabletManager::get_partition_related_tablets(int64_t partition_id,
 
 void TabletManager::do_tablet_meta_checkpoint(DataDir* data_dir) {
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
-    std::vector<TabletSharedPtr> related_tablets;
-    {
-        for (auto& tablets_shard : _tablets_shards) {
-            std::shared_lock rdlock(tablets_shard.lock);
-            for (auto& item : tablets_shard.tablet_map) {
-                TabletSharedPtr& tablet_ptr = item.second;
-                if (tablet_ptr->tablet_state() != TABLET_RUNNING) {
-                    continue;
-                }
+    auto filter = [data_dir](Tablet* tablet) -> bool {
+        return tablet->tablet_state() == TABLET_RUNNING &&
+               tablet->data_dir()->path_hash() == data_dir->path_hash() && 
tablet->is_used() &&
+               tablet->init_succeeded();
+    };
 
-                if (tablet_ptr->data_dir()->path_hash() != 
data_dir->path_hash() ||
-                    !tablet_ptr->is_used() || !tablet_ptr->init_succeeded()) {
-                    continue;
-                }
-                related_tablets.push_back(tablet_ptr);
-            }
-        }
-    }
+    std::vector<TabletSharedPtr> related_tablets = get_all_tablet(filter);
     int counter = 0;
     MonotonicStopWatch watch;
     watch.start();
@@ -1457,12 +1448,8 @@ void 
TabletManager::get_cooldown_tablets(std::vector<TabletSharedPtr>* tablets,
                                          std::function<bool(const 
TabletSharedPtr&)> skip_tablet) {
     std::vector<SortCtx> sort_ctx_vec;
     std::vector<std::weak_ptr<Tablet>> candidates;
-    for (const auto& tablets_shard : _tablets_shards) {
-        std::shared_lock rdlock(tablets_shard.lock);
-        std::for_each(
-                tablets_shard.tablet_map.begin(), 
tablets_shard.tablet_map.end(),
-                [&candidates](auto& tablet_pair) { 
candidates.emplace_back(tablet_pair.second); });
-    }
+    for_each_tablet([&](const TabletSharedPtr& tablet) { 
candidates.emplace_back(tablet); },
+                    filter_all_tablets);
     auto get_cooldown_tablet = [&sort_ctx_vec, 
&skip_tablet](std::weak_ptr<Tablet>& t) {
         const TabletSharedPtr& tablet = t.lock();
         if (UNLIKELY(nullptr == tablet)) {
@@ -1487,44 +1474,58 @@ void 
TabletManager::get_cooldown_tablets(std::vector<TabletSharedPtr>* tablets,
 
 void TabletManager::get_all_tablets_storage_format(TCheckStorageFormatResult* 
result) {
     DCHECK(result != nullptr);
-    for (const auto& tablets_shard : _tablets_shards) {
-        std::shared_lock rdlock(tablets_shard.lock);
-        for (const auto& item : tablets_shard.tablet_map) {
-            uint64_t tablet_id = item.first;
-            if (item.second->all_beta()) {
-                result->v2_tablets.push_back(tablet_id);
-            } else {
-                result->v1_tablets.push_back(tablet_id);
-            }
+    auto handler = [result](const TabletSharedPtr& tablet) {
+        if (tablet->all_beta()) {
+            result->v2_tablets.push_back(tablet->tablet_id());
+        } else {
+            result->v1_tablets.push_back(tablet->tablet_id());
         }
-    }
+    };
+
+    for_each_tablet(handler, filter_all_tablets);
     result->__isset.v1_tablets = true;
     result->__isset.v2_tablets = true;
 }
 
 std::set<int64_t> TabletManager::check_all_tablet_segment(bool repair) {
     std::set<int64_t> bad_tablets;
-    for (const auto& tablets_shard : _tablets_shards) {
+    std::map<int64_t, std::vector<int64_t>> repair_shard_bad_tablets;
+    auto handler = [&](const TabletSharedPtr& tablet) {
+        if (!tablet->check_all_rowset_segment()) {
+            int64_t tablet_id = tablet->tablet_id();
+            bad_tablets.insert(tablet_id);
+            if (repair) {
+                repair_shard_bad_tablets[tablet_id & 
_tablets_shards_mask].push_back(tablet_id);
+            }
+        }
+    };
+    for_each_tablet(handler, filter_all_tablets);
+
+    for (const auto& [shard_index, shard_tablets] : repair_shard_bad_tablets) {
+        auto& tablets_shard = _tablets_shards[shard_index];
+        auto& tablet_map = tablets_shard.tablet_map;
         std::lock_guard<std::shared_mutex> wrlock(tablets_shard.lock);
-        for (const auto& item : tablets_shard.tablet_map) {
-            TabletSharedPtr tablet = item.second;
-            if (!tablet->check_all_rowset_segment()) {
-                bad_tablets.insert(tablet->tablet_id());
-                if (repair) {
-                    tablet->set_tablet_state(TABLET_SHUTDOWN);
-                    tablet->save_meta();
-                    {
-                        std::lock_guard<std::shared_mutex> 
shutdown_tablets_wrlock(
-                                _shutdown_tablets_lock);
-                        _shutdown_tablets.push_back(tablet);
-                    }
-                    LOG(WARNING) << "There are some segments lost, set tablet 
to shutdown state."
-                                 << "tablet_id=" << tablet->tablet_id()
-                                 << ", tablet_path=" << tablet->tablet_path();
+        for (auto tablet_id : shard_tablets) {
+            auto it = tablet_map.find(tablet_id);
+            if (it == tablet_map.end()) {
+                bad_tablets.erase(tablet_id);
+                LOG(WARNING) << "Bad tablet has be removed. tablet_id=" << 
tablet_id;
+            } else {
+                const auto& tablet = it->second;
+                tablet->set_tablet_state(TABLET_SHUTDOWN);
+                tablet->save_meta();
+                {
+                    std::lock_guard<std::shared_mutex> shutdown_tablets_wrlock(
+                            _shutdown_tablets_lock);
+                    _shutdown_tablets.push_back(tablet);
                 }
+                LOG(WARNING) << "There are some segments lost, set tablet to 
shutdown state."
+                             << "tablet_id=" << tablet->tablet_id()
+                             << ", tablet_path=" << tablet->tablet_path();
             }
         }
     }
+
     return bad_tablets;
 }
 
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index c88a66c63c..ae7d27bded 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -90,19 +90,15 @@ public:
     TabletSharedPtr get_tablet(TTabletId tablet_id, TabletUid tablet_uid,
                                bool include_deleted = false, std::string* err 
= nullptr);
 
-    std::vector<TabletSharedPtr> get_all_tablet(std::function<bool(Tablet*)>&& 
filter =
-                                                        [](Tablet* t) { return 
t->is_used(); }) {
-        std::vector<TabletSharedPtr> res;
-        for (const auto& tablets_shard : _tablets_shards) {
-            std::shared_lock rdlock(tablets_shard.lock);
-            for (auto& [id, tablet] : tablets_shard.tablet_map) {
-                if (filter(tablet.get())) {
-                    res.emplace_back(tablet);
-                }
-            }
-        }
-        return res;
-    }
+    std::vector<TabletSharedPtr> get_all_tablet(
+            std::function<bool(Tablet*)>&& filter = filter_used_tablets);
+
+    // Handler not hold the shard lock.
+    void for_each_tablet(std::function<void(const TabletSharedPtr&)>&& handler,
+                         std::function<bool(Tablet*)>&& filter = 
filter_used_tablets);
+
+    static bool filter_all_tablets(Tablet* tablet) { return true; }
+    static bool filter_used_tablets(Tablet* tablet) { return 
tablet->is_used(); }
 
     uint64_t get_rowset_nums();
     uint64_t get_segment_nums();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to