This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 414eb2a8ebd [fix](cooldown) Fix bug in follow cooldowned data (#32801)
414eb2a8ebd is described below

commit 414eb2a8ebdff514af5e5d7335ebccfd756ec90f
Author: plat1ko <platonekos...@gmail.com>
AuthorDate: Tue Mar 26 12:04:55 2024 +0800

    [fix](cooldown) Fix bug in follow cooldowned data (#32801)
---
 be/src/olap/tablet.cpp | 44 +++++++++++++++++++++++++++++++++++++-------
 1 file changed, 37 insertions(+), 7 deletions(-)

diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index f9b12cc671d..9b745a92479 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -565,7 +565,9 @@ Status Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& 
to_add,
         // delete rowset in "to_delete" directly
         for (auto& rs : to_delete) {
             LOG(INFO) << "add unused rowset " << rs->rowset_id() << " because 
of same version";
-            StorageEngine::instance()->add_unused_rowset(rs);
+            if (rs->is_local()) {
+                StorageEngine::instance()->add_unused_rowset(rs);
+            }
         }
     }
     return Status::OK();
@@ -604,7 +606,9 @@ void Tablet::delete_rowsets(const 
std::vector<RowsetSharedPtr>& to_delete, bool
     } else {
         for (auto& rs : to_delete) {
             _timestamped_version_tracker.delete_version(rs->version());
-            StorageEngine::instance()->add_unused_rowset(rs);
+            if (rs->is_local()) {
+                StorageEngine::instance()->add_unused_rowset(rs);
+            }
         }
     }
 }
@@ -830,7 +834,9 @@ void Tablet::delete_expired_stale_rowset() {
                 auto it = 
_stale_rs_version_map.find(timestampedVersion->version());
                 if (it != _stale_rs_version_map.end()) {
                     // delete rowset
-                    StorageEngine::instance()->add_unused_rowset(it->second);
+                    if (it->second->is_local()) {
+                        
StorageEngine::instance()->add_unused_rowset(it->second);
+                    }
                     _stale_rs_version_map.erase(it);
                     VLOG_NOTICE << "delete stale rowset tablet=" << 
full_name() << " version["
                                 << timestampedVersion->version().first << ","
@@ -2311,6 +2317,12 @@ Status Tablet::_follow_cooldowned_data() {
     std::vector<RowsetSharedPtr> overlap_rowsets;
     bool version_aligned = false;
 
+    // Holding these to delete rowsets' shared ptr until save meta can avoid 
trash sweeping thread
+    // deleting these rowsets' files before rowset meta has been removed from 
disk, which may cause
+    // data loss when BE reboot before save meta to disk.
+    std::vector<RowsetSharedPtr> to_delete;
+    std::vector<RowsetSharedPtr> to_add;
+
     {
         std::lock_guard wlock(_meta_lock);
         SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
@@ -2336,17 +2348,18 @@ Status Tablet::_follow_cooldowned_data() {
             }
         }
         std::sort(overlap_rowsets.begin(), overlap_rowsets.end(), 
Rowset::comparator);
+
+        // Find different rowset in `overlap_rowsets` and 
`cooldown_meta_pb.rs_metas`
         auto rs_pb_it = cooldown_meta_pb.rs_metas().begin();
         auto rs_it = overlap_rowsets.begin();
         for (; rs_pb_it != cooldown_meta_pb.rs_metas().end() && rs_it != 
overlap_rowsets.end();
              ++rs_pb_it, ++rs_it) {
-            // skip cooldowned rowset with same version in BE
-            if ((*rs_it)->is_local() || rs_pb_it->end_version() != 
(*rs_it)->end_version()) {
+            if (rs_pb_it->rowset_id_v2() != (*rs_it)->rowset_id().to_string()) 
{
                 break;
             }
         }
-        std::vector<RowsetSharedPtr> to_delete(rs_it, overlap_rowsets.end());
-        std::vector<RowsetSharedPtr> to_add;
+
+        to_delete.assign(rs_it, overlap_rowsets.end());
         to_add.reserve(cooldown_meta_pb.rs_metas().end() - rs_pb_it);
         for (; rs_pb_it != cooldown_meta_pb.rs_metas().end(); ++rs_pb_it) {
             auto rs_meta = std::make_shared<RowsetMeta>();
@@ -2361,12 +2374,29 @@ Status Tablet::_follow_cooldowned_data() {
         // TODO(plat1ko): process primary key
         
_tablet_meta->set_cooldown_meta_id(cooldown_meta_pb.cooldown_meta_id());
     }
+
     {
         std::lock_guard rlock(_meta_lock);
         SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
         save_meta();
     }
 
+    if (!to_add.empty()) {
+        LOG(INFO) << "modify rowsets when follow cooldowned data, tablet_id=" 
<< tablet_id()
+                  << [&]() {
+                         std::stringstream ss;
+                         ss << " delete rowsets:\n";
+                         for (auto&& rs : to_delete) {
+                             ss << rs->version() << ' ' << rs->rowset_id() << 
'\n';
+                         }
+                         ss << "add rowsets:\n";
+                         for (auto&& rs : to_add) {
+                             ss << rs->version() << ' ' << rs->rowset_id() << 
'\n';
+                         }
+                         return ss.str();
+                     }();
+    }
+
     return Status::OK();
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to