This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 52f9e03eea [fix](cooldown)  Use `pending_remote_rowsets` to avoid 
deleting rowset files being uploaded (#16803)
52f9e03eea is described below

commit 52f9e03eea17b3ed3351f6229e3e901d154031ed
Author: plat1ko <platonekos...@gmail.com>
AuthorDate: Tue Feb 21 21:58:20 2023 +0800

    [fix](cooldown)  Use `pending_remote_rowsets` to avoid deleting rowset 
files being uploaded (#16803)
---
 be/src/olap/cold_data_compaction.cpp |  2 +-
 be/src/olap/compaction.cpp           | 62 ++++++++++++++++--------------------
 be/src/olap/tablet.cpp               | 56 +++++++++++++++++++-------------
 be/src/olap/tablet.h                 |  7 ++--
 4 files changed, 67 insertions(+), 60 deletions(-)

diff --git a/be/src/olap/cold_data_compaction.cpp 
b/be/src/olap/cold_data_compaction.cpp
index 9f24c9c170..9a92d9eead 100644
--- a/be/src/olap/cold_data_compaction.cpp
+++ b/be/src/olap/cold_data_compaction.cpp
@@ -80,7 +80,7 @@ Status ColdDataCompaction::modify_rowsets() {
         // TODO(plat1ko): process primary key
         _tablet->tablet_meta()->set_cooldown_meta_id(cooldown_meta_id);
     }
-
+    
Tablet::erase_pending_remote_rowset(_output_rowset->rowset_id().to_string());
     {
         std::shared_lock rlock(_tablet->get_header_lock());
         _tablet->save_meta();
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index d585516954..ecfc3d8e56 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -272,6 +272,9 @@ Status Compaction::do_compaction_impl(int64_t permits) {
     bool vertical_compaction = should_vertical_compaction();
     RETURN_NOT_OK(construct_input_rowset_readers());
     RETURN_NOT_OK(construct_output_rowset_writer(vertical_compaction));
+    if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) {
+        
Tablet::add_pending_remote_rowset(_output_rs_writer->rowset_id().to_string());
+    }
     TRACE("prepare finished");
 
     // 2. write merged rows to output rowset
@@ -282,45 +285,35 @@ Status Compaction::do_compaction_impl(int64_t permits) {
         stats.rowid_conversion = &_rowid_conversion;
     }
 
-    auto build_output_rowset = [&]() {
-        Status res;
-        if (use_vectorized_compaction) {
-            if (vertical_compaction) {
-                res = Merger::vertical_merge_rowsets(_tablet, 
compaction_type(), _cur_tablet_schema,
-                                                     _input_rs_readers, 
_output_rs_writer.get(),
-                                                     get_avg_segment_rows(), 
&stats);
-            } else {
-                res = Merger::vmerge_rowsets(_tablet, compaction_type(), 
_cur_tablet_schema,
-                                             _input_rs_readers, 
_output_rs_writer.get(), &stats);
-            }
+    Status res;
+    if (use_vectorized_compaction) {
+        if (vertical_compaction) {
+            res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), 
_cur_tablet_schema,
+                                                 _input_rs_readers, 
_output_rs_writer.get(),
+                                                 get_avg_segment_rows(), 
&stats);
         } else {
-            LOG(FATAL) << "Only support vectorized compaction";
+            res = Merger::vmerge_rowsets(_tablet, compaction_type(), 
_cur_tablet_schema,
+                                         _input_rs_readers, 
_output_rs_writer.get(), &stats);
         }
+    } else {
+        LOG(FATAL) << "Only support vectorized compaction";
+    }
 
-        if (!res.ok()) {
-            LOG(WARNING) << "fail to do " << merge_type << compaction_name() 
<< ". res=" << res
-                         << ", tablet=" << _tablet->full_name()
-                         << ", output_version=" << _output_version;
-            return res;
-        }
-        TRACE("merge rowsets finished");
-        TRACE_COUNTER_INCREMENT("merged_rows", stats.merged_rows);
-        TRACE_COUNTER_INCREMENT("filtered_rows", stats.filtered_rows);
-
-        _output_rowset = _output_rs_writer->build();
-        if (_output_rowset == nullptr) {
-            LOG(WARNING) << "rowset writer build failed. writer version:"
-                         << ", output_version=" << _output_version;
-            return Status::Error<ROWSET_BUILDER_INIT>();
-        }
+    if (!res.ok()) {
+        LOG(WARNING) << "fail to do " << merge_type << compaction_name() << ". 
res=" << res
+                     << ", tablet=" << _tablet->full_name()
+                     << ", output_version=" << _output_version;
         return res;
-    };
+    }
+    TRACE("merge rowsets finished");
+    TRACE_COUNTER_INCREMENT("merged_rows", stats.merged_rows);
+    TRACE_COUNTER_INCREMENT("filtered_rows", stats.filtered_rows);
 
-    if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) {
-        std::shared_lock slock(_tablet->get_remote_files_lock());
-        RETURN_IF_ERROR(build_output_rowset());
-    } else {
-        RETURN_IF_ERROR(build_output_rowset());
+    _output_rowset = _output_rs_writer->build();
+    if (_output_rowset == nullptr) {
+        LOG(WARNING) << "rowset writer build failed. writer version:"
+                     << ", output_version=" << _output_version;
+        return Status::Error<ROWSET_BUILDER_INIT>();
     }
 
     TRACE_COUNTER_INCREMENT("output_rowset_data_size", 
_output_rowset->data_disk_size());
@@ -457,6 +450,7 @@ Status Compaction::modify_rowsets() {
 void Compaction::gc_output_rowset() {
     if (_state != CompactionState::SUCCESS && _output_rowset != nullptr) {
         if (!_output_rowset->is_local()) {
+            
Tablet::erase_pending_remote_rowset(_output_rowset->rowset_id().to_string());
             _tablet->record_unused_remote_rowset(_output_rowset->rowset_id(),
                                                  
_output_rowset->rowset_meta()->resource_id(),
                                                  
_output_rowset->num_segments());
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 8cbafa7f6e..75c1bff53e 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1728,20 +1728,17 @@ Status Tablet::_cooldown_data(const 
std::shared_ptr<io::RemoteFileSystem>& dest_
         return Status::InternalError("cannot pick cooldown rowset in tablet 
{}", tablet_id());
     }
     RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id();
-
-    auto start = std::chrono::steady_clock::now();
-
+    add_pending_remote_rowset(new_rowset_id.to_string());
     Status st;
-    {
-        std::shared_lock slock(_remote_files_lock, std::try_to_lock);
-        if (!slock.owns_lock()) {
-            return Status::Status::Error<TRY_LOCK_FAILED>("try 
remote_files_lock failed");
+    Defer defer {[&] {
+        if (!st.ok()) {
+            erase_pending_remote_rowset(new_rowset_id.to_string());
+            // reclaim the incomplete rowset data in remote storage
+            record_unused_remote_rowset(new_rowset_id, dest_fs->id(), 
old_rowset->num_segments());
         }
-        st = old_rowset->upload_to(dest_fs.get(), new_rowset_id);
-    }
-    if (!st.ok()) {
-        // reclaim the incomplete rowset data in remote storage
-        record_unused_remote_rowset(new_rowset_id, dest_fs->id(), 
old_rowset->num_segments());
+    }};
+    auto start = std::chrono::steady_clock::now();
+    if (st = old_rowset->upload_to(dest_fs.get(), new_rowset_id); !st.ok()) {
         return st;
     }
 
@@ -1760,7 +1757,10 @@ Status Tablet::_cooldown_data(const 
std::shared_ptr<io::RemoteFileSystem>& dest_
     UniqueId cooldown_meta_id = UniqueId::gen_uid();
 
     // upload cooldowned rowset meta to remote fs
-    RETURN_IF_ERROR(write_cooldown_meta(dest_fs, cooldown_meta_id, 
new_rowset_meta, {}));
+    st = write_cooldown_meta(dest_fs, cooldown_meta_id, new_rowset_meta, {});
+    if (!st.ok()) {
+        return st;
+    }
 
     RowsetSharedPtr new_rowset;
     RowsetFactory::create_rowset(_schema, _tablet_path, new_rowset_meta, 
&new_rowset);
@@ -1774,6 +1774,7 @@ Status Tablet::_cooldown_data(const 
std::shared_ptr<io::RemoteFileSystem>& dest_
             _tablet_meta->set_cooldown_meta_id(cooldown_meta_id);
         }
     }
+    erase_pending_remote_rowset(new_rowset_id.to_string());
     {
         std::unique_lock meta_rlock(_meta_lock);
         save_meta();
@@ -2047,6 +2048,18 @@ Status Tablet::remove_all_remote_rowsets() {
                                       gc_pb.SerializeAsString());
 }
 
+static std::unordered_set<std::string> s_pending_remote_rowsets;
+static std::mutex s_pending_remote_rowsets_mtx;
+
+void Tablet::add_pending_remote_rowset(std::string rowset_id) {
+    std::lock_guard lock(s_pending_remote_rowsets_mtx);
+    s_pending_remote_rowsets.insert(std::move(rowset_id));
+}
+void Tablet::erase_pending_remote_rowset(const std::string& rowset_id) {
+    std::lock_guard lock(s_pending_remote_rowsets_mtx);
+    s_pending_remote_rowsets.erase(rowset_id);
+}
+
 void Tablet::remove_unused_remote_files() {
     auto tablets = 
StorageEngine::instance()->tablet_manager()->get_all_tablet([](Tablet* t) {
         return t->tablet_meta()->cooldown_meta_id().initialized() && 
t->is_used() &&
@@ -2080,16 +2093,9 @@ void Tablet::remove_unused_remote_files() {
 
         Status st;
         std::vector<io::Path> files;
-        {
-            std::unique_lock xlock(t->_remote_files_lock, std::try_to_lock);
-            if (!xlock.owns_lock()) {
-                LOG(WARNING) << "try remote_files_lock failed. tablet_id=" << 
t->tablet_id();
-                return;
-            }
-            // FIXME(plat1ko): What if user reset resource in storage policy 
to another resource?
-            //  Maybe we should also list files in previously uploaded 
resources.
-            st = dest_fs->list(BetaRowset::remote_tablet_path(t->tablet_id()), 
&files);
-        }
+        // FIXME(plat1ko): What if user reset resource in storage policy to 
another resource?
+        //  Maybe we should also list files in previously uploaded resources.
+        st = dest_fs->list(BetaRowset::remote_tablet_path(t->tablet_id()), 
&files);
         if (!st.ok()) {
             LOG(WARNING) << "encounter error when remove unused remote files, 
tablet_id="
                          << t->tablet_id() << " : " << st;
@@ -2099,6 +2105,10 @@ void Tablet::remove_unused_remote_files() {
         }
         // get all cooldowned rowsets
         std::unordered_set<std::string> cooldowned_rowsets;
+        {
+            std::lock_guard lock(s_pending_remote_rowsets_mtx);
+            cooldowned_rowsets = s_pending_remote_rowsets;
+        }
         UniqueId cooldown_meta_id;
         {
             std::shared_lock rlock(t->_meta_lock);
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 82f42dccdd..73a200b383 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -328,7 +328,11 @@ public:
 
     static void remove_unused_remote_files();
 
-    std::shared_mutex& get_remote_files_lock() { return _remote_files_lock; }
+    // If a rowset is to be written to remote filesystem, MUST add it to 
`pending_remote_rowsets` before uploading,
+    // and then erase it from `pending_remote_rowsets` after it has been 
insert to the Tablet.
+    // `remove_unused_remote_files` MUST NOT delete files of these pending 
rowsets.
+    static void add_pending_remote_rowset(std::string rowset_id);
+    static void erase_pending_remote_rowset(const std::string& rowset_id);
 
     uint32_t calc_cold_data_compaction_score() const;
 
@@ -524,7 +528,6 @@ private:
     // cooldown related
     int64_t _cooldown_replica_id = -1;
     int64_t _cooldown_term = -1;
-    std::shared_mutex _remote_files_lock;
     std::mutex _cold_compaction_lock;
 
     DISALLOW_COPY_AND_ASSIGN(Tablet);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to