This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cc5fa509ad [fix](cooldown) Fix bug in concurrent 
`update_cooldown_conf` and operations that update cooldowned data (#17086)
cc5fa509ad is described below

commit cc5fa509ad0059bc337dba84858e907782baa9d0
Author: plat1ko <platonekos...@gmail.com>
AuthorDate: Fri Mar 3 14:36:58 2023 +0800

    [fix](cooldown) Fix bug in concurrent `update_cooldown_conf` and operations 
that update cooldowned data (#17086)
---
 be/src/agent/task_worker_pool.cpp                  |   1 +
 be/src/olap/cold_data_compaction.cpp               |  17 +--
 be/src/olap/olap_define.h                          |  11 ++
 be/src/olap/olap_server.cpp                        |   2 +-
 be/src/olap/rowset/beta_rowset.cpp                 |  10 --
 be/src/olap/rowset/beta_rowset.h                   |   4 -
 be/src/olap/tablet.cpp                             | 155 ++++++++++++---------
 be/src/olap/tablet.h                               |  44 ++++--
 .../java/org/apache/doris/catalog/Replica.java     |   9 ++
 .../apache/doris/catalog/TabletInvertedIndex.java  |   5 +-
 .../org/apache/doris/master/ReportHandler.java     |  38 +++--
 .../apache/doris/service/FrontendServiceImpl.java  |  12 +-
 gensrc/thrift/MasterService.thrift                 |   2 +-
 13 files changed, 192 insertions(+), 118 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 3ed931ddaf..4056e0d176 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1848,6 +1848,7 @@ void 
TaskWorkerPool::_push_cooldown_conf_worker_thread_callback() {
             }
             tablet->update_cooldown_conf(cooldown_conf.cooldown_term,
                                          cooldown_conf.cooldown_replica_id);
+            // TODO(AlexYue): if `update_cooldown_conf` success, async call 
`write_cooldown_meta`
         }
     }
 }
diff --git a/be/src/olap/cold_data_compaction.cpp 
b/be/src/olap/cold_data_compaction.cpp
index 65b2330dc7..9fbeef8245 100644
--- a/be/src/olap/cold_data_compaction.cpp
+++ b/be/src/olap/cold_data_compaction.cpp
@@ -45,6 +45,10 @@ Status ColdDataCompaction::execute_compact_impl() {
 #endif
     SCOPED_ATTACH_TASK(_mem_tracker);
     int64_t permits = get_compaction_permits();
+    std::shared_lock cooldown_conf_rlock(_tablet->get_cooldown_conf_lock());
+    if (_tablet->cooldown_conf_unlocked().first != _tablet->replica_id()) {
+        return Status::Aborted("this replica is not cooldown replica");
+    }
     RETURN_IF_ERROR(do_compaction(permits));
     _state = CompactionState::SUCCESS;
     return Status::OK();
@@ -62,16 +66,6 @@ Status ColdDataCompaction::pick_rowsets_to_compact() {
 
 Status ColdDataCompaction::modify_rowsets(const Merger::Statistics* stats) {
     UniqueId cooldown_meta_id = UniqueId::gen_uid();
-
-    // write remote tablet meta
-    std::shared_ptr<io::RemoteFileSystem> fs;
-    RETURN_IF_ERROR(get_remote_file_system(_tablet->storage_policy_id(), &fs));
-    std::vector<RowsetMetaSharedPtr> to_deletes;
-    for (auto& rs : _input_rowsets) {
-        to_deletes.emplace_back(rs->rowset_meta());
-    }
-    RETURN_IF_ERROR(_tablet->write_cooldown_meta(fs, cooldown_meta_id,
-                                                 
_output_rowset->rowset_meta(), to_deletes));
     {
         std::lock_guard wlock(_tablet->get_header_lock());
         // Merged cooldowned rowsets MUST NOT be managed by version graph, 
they will be reclaimed by `remove_unused_remote_files`.
@@ -85,6 +79,9 @@ Status ColdDataCompaction::modify_rowsets(const 
Merger::Statistics* stats) {
         std::shared_lock rlock(_tablet->get_header_lock());
         _tablet->save_meta();
     }
+    // write remote tablet meta
+    // TODO(AlexYue): async call `write_cooldown_meta`
+    RETURN_IF_ERROR(_tablet->write_cooldown_meta());
     return Status::OK();
 }
 
diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h
index 367d0d8ba7..fbc4f24557 100644
--- a/be/src/olap/olap_define.h
+++ b/be/src/olap/olap_define.h
@@ -98,6 +98,17 @@ static const std::string PENDING_DELTA_PREFIX = 
"pending_delta";
 static const std::string INCREMENTAL_DELTA_PREFIX = "incremental_delta";
 static const std::string CLONE_PREFIX = "clone";
 
+// define paths
+static inline std::string remote_tablet_path(int64_t tablet_id) {
+    // data/{tablet_id}
+    return fmt::format("{}/{}", DATA_PREFIX, tablet_id);
+}
+static inline std::string remote_tablet_meta_path(int64_t tablet_id, int64_t 
replica_id,
+                                                  int64_t cooldown_term) {
+    // data/{tablet_id}/{replica_id}.{cooldown_term}.meta
+    return fmt::format("{}/{}.{}.meta", remote_tablet_path(tablet_id), 
replica_id, cooldown_term);
+}
+
 static const std::string TABLET_UID = "tablet_uid";
 static const std::string STORAGE_NAME = "storage_name";
 
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 36e67bc1f9..e621e893f8 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -797,7 +797,7 @@ void 
StorageEngine::_cold_data_compaction_producer_callback() {
         tablet_to_follow.reserve(n + 1);
 
         for (auto& t : tablets) {
-            if (t->replica_id() == t->cooldown_replica_id()) {
+            if (t->replica_id() == t->cooldown_conf_unlocked().first) {
                 auto score = t->calc_cold_data_compaction_score();
                 if (score < 4) {
                     continue;
diff --git a/be/src/olap/rowset/beta_rowset.cpp 
b/be/src/olap/rowset/beta_rowset.cpp
index 26d4b0b7ce..4f459d8365 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -69,16 +69,6 @@ std::string BetaRowset::segment_file_path(const std::string& 
rowset_dir, const R
     return fmt::format("{}/{}_{}.dat", rowset_dir, rowset_id.to_string(), 
segment_id);
 }
 
-std::string BetaRowset::remote_tablet_path(int64_t tablet_id) {
-    // data/{tablet_id}
-    return fmt::format("{}/{}", DATA_PREFIX, tablet_id);
-}
-
-std::string BetaRowset::remote_tablet_meta_path(int64_t tablet_id, int64_t 
replica_id) {
-    // data/{tablet_id}/{replica_id}.meta
-    return fmt::format("{}/{}.meta", remote_tablet_path(tablet_id), 
replica_id);
-}
-
 std::string BetaRowset::remote_segment_path(int64_t tablet_id, const RowsetId& 
rowset_id,
                                             int segment_id) {
     // data/{tablet_id}/{rowset_id}_{seg_num}.dat
diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h
index c2a86f13c8..5401c60860 100644
--- a/be/src/olap/rowset/beta_rowset.h
+++ b/be/src/olap/rowset/beta_rowset.h
@@ -61,10 +61,6 @@ public:
     static std::string remote_segment_path(int64_t tablet_id, const 
std::string& rowset_id,
                                            int segment_id);
 
-    static std::string remote_tablet_path(int64_t tablet_id);
-
-    static std::string remote_tablet_meta_path(int64_t tablet_id, int64_t 
replica_id);
-
     Status remove() override;
 
     Status link_files_to(const std::string& dir, RowsetId new_rowset_id,
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 62d8ceae60..0563e3dd76 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1409,13 +1409,14 @@ void Tablet::build_tablet_report_info(TTabletInfo* 
tablet_info,
     
tablet_info->__set_is_in_memory(_tablet_meta->tablet_schema()->is_in_memory());
     tablet_info->__set_replica_id(replica_id());
     tablet_info->__set_remote_data_size(_tablet_meta->tablet_remote_size());
-    if (tablet_state() == TABLET_RUNNING && _tablet_meta->storage_policy_id() 
> 0) {
-        tablet_info->__set_cooldown_replica_id(_cooldown_replica_id);
+    if (_tablet_meta->cooldown_meta_id().initialized()) { // has cooldowned 
data
         tablet_info->__set_cooldown_term(_cooldown_term);
-    }
-    if (_tablet_meta->cooldown_meta_id().initialized()) {
         
tablet_info->__set_cooldown_meta_id(_tablet_meta->cooldown_meta_id().to_thrift());
     }
+    if (tablet_state() == TABLET_RUNNING && _tablet_meta->storage_policy_id() 
> 0) {
+        // tablet may not have cooldowned data, but the storage policy is set
+        tablet_info->__set_cooldown_term(_cooldown_term);
+    }
 }
 
 // should use this method to get a copy of current tablet meta
@@ -1633,7 +1634,7 @@ void 
Tablet::_init_context_common_fields(RowsetWriterContext& context) {
         context.rowset_type = StorageEngine::instance()->default_rowset_type();
     }
     if (context.fs != nullptr && context.fs->type() != 
io::FileSystemType::LOCAL) {
-        context.rowset_dir = BetaRowset::remote_tablet_path(tablet_id());
+        context.rowset_dir = remote_tablet_path(tablet_id());
     } else {
         context.rowset_dir = tablet_path();
     }
@@ -1648,37 +1649,38 @@ Status Tablet::create_rowset(const RowsetMetaSharedPtr& 
rowset_meta, RowsetShare
 Status Tablet::cooldown() {
     std::unique_lock schema_change_lock(_schema_change_lock, std::try_to_lock);
     if (!schema_change_lock.owns_lock()) {
-        LOG(WARNING) << "Failed to own schema_change_lock. tablet=" << 
tablet_id();
-        return Status::Error<TRY_LOCK_FAILED>();
+        return Status::Error<TRY_LOCK_FAILED>("try schema_change_lock failed");
     }
     // Check executing serially with compaction task.
     std::unique_lock base_compaction_lock(_base_compaction_lock, 
std::try_to_lock);
     if (!base_compaction_lock.owns_lock()) {
-        LOG(WARNING) << "Failed to own base_compaction_lock. tablet=" << 
tablet_id();
-        return Status::Error<TRY_LOCK_FAILED>();
+        return Status::Error<TRY_LOCK_FAILED>("try base_compaction_lock 
failed");
     }
     std::unique_lock cumu_compaction_lock(_cumulative_compaction_lock, 
std::try_to_lock);
     if (!cumu_compaction_lock.owns_lock()) {
-        LOG(WARNING) << "Failed to own cumu_compaction_lock. tablet=" << 
tablet_id();
-        return Status::Error<TRY_LOCK_FAILED>();
+        return Status::Error<TRY_LOCK_FAILED>("try cumu_compaction_lock 
failed");
     }
-    int64_t cooldown_replica_id = _cooldown_replica_id;
-    if (cooldown_replica_id <= 0) { // wait for FE to push cooldown conf
+    std::shared_lock cooldown_conf_rlock(_cooldown_conf_lock);
+    if (_cooldown_replica_id <= 0) { // wait for FE to push cooldown conf
         return Status::InternalError("invalid cooldown_replica_id");
     }
 
-    std::shared_ptr<io::RemoteFileSystem> fs;
-    RETURN_IF_ERROR(get_remote_file_system(storage_policy_id(), &fs));
-
-    if (cooldown_replica_id == replica_id()) {
-        RETURN_IF_ERROR(_cooldown_data(fs));
+    if (_cooldown_replica_id == replica_id()) {
+        // this replica is cooldown replica
+        RETURN_IF_ERROR(_cooldown_data());
     } else {
-        RETURN_IF_ERROR(_follow_cooldowned_data(fs, cooldown_replica_id));
+        // try to follow cooldowned data from cooldown replica
+        RETURN_IF_ERROR(_follow_cooldowned_data());
     }
     return Status::OK();
 }
 
-Status Tablet::_cooldown_data(const std::shared_ptr<io::RemoteFileSystem>& 
dest_fs) {
+// hold SHARED `cooldown_conf_lock`
+Status Tablet::_cooldown_data() {
+    DCHECK(_cooldown_replica_id == replica_id());
+
+    std::shared_ptr<io::RemoteFileSystem> dest_fs;
+    RETURN_IF_ERROR(get_remote_file_system(storage_policy_id(), &dest_fs));
     auto old_rowset = pick_cooldown_rowset();
     if (!old_rowset) {
         return Status::InternalError("cannot pick cooldown rowset in tablet 
{}", tablet_id());
@@ -1711,13 +1713,6 @@ Status Tablet::_cooldown_data(const 
std::shared_ptr<io::RemoteFileSystem>& dest_
     new_rowset_meta->set_fs(dest_fs);
     new_rowset_meta->set_creation_time(time(nullptr));
     UniqueId cooldown_meta_id = UniqueId::gen_uid();
-
-    // upload cooldowned rowset meta to remote fs
-    st = write_cooldown_meta(dest_fs, cooldown_meta_id, new_rowset_meta, {});
-    if (!st.ok()) {
-        return st;
-    }
-
     RowsetSharedPtr new_rowset;
     RowsetFactory::create_rowset(_schema, _tablet_path, new_rowset_meta, 
&new_rowset);
 
@@ -1735,13 +1730,17 @@ Status Tablet::_cooldown_data(const 
std::shared_ptr<io::RemoteFileSystem>& dest_
         std::unique_lock meta_rlock(_meta_lock);
         save_meta();
     }
+    // upload cooldowned rowset meta to remote fs
+    // TODO(AlexYue): async call `write_cooldown_meta`
+    RETURN_IF_ERROR(write_cooldown_meta());
     return Status::OK();
 }
 
+// hold SHARED `cooldown_conf_lock`
 Status Tablet::_read_cooldown_meta(const 
std::shared_ptr<io::RemoteFileSystem>& fs,
-                                   int64_t cooldown_replica_id, TabletMetaPB* 
tablet_meta_pb) {
+                                   TabletMetaPB* tablet_meta_pb) {
     std::string remote_meta_path =
-            BetaRowset::remote_tablet_meta_path(tablet_id(), 
cooldown_replica_id);
+            remote_tablet_meta_path(tablet_id(), _cooldown_replica_id, 
_cooldown_term);
     IOContext io_ctx;
     io::FileReaderSPtr tablet_meta_reader;
     RETURN_IF_ERROR(fs->open_file(remote_meta_path, &tablet_meta_reader, 
&io_ctx));
@@ -1756,46 +1755,50 @@ Status Tablet::_read_cooldown_meta(const 
std::shared_ptr<io::RemoteFileSystem>&
     return Status::OK();
 }
 
-Status Tablet::write_cooldown_meta(const 
std::shared_ptr<io::RemoteFileSystem>& fs,
-                                   UniqueId cooldown_meta_id,
-                                   const RowsetMetaSharedPtr& new_rs_meta,
-                                   const std::vector<RowsetMetaSharedPtr>& 
to_deletes) {
-    std::unordered_set<Version, HashOfVersion> to_delete_set;
-    for (auto& rs_meta : to_deletes) {
-        to_delete_set.emplace(rs_meta->version());
+// `rs_metas` MUST already be sorted by `RowsetMeta::comparator`
+Status check_version_continuity(const std::vector<RowsetMetaSharedPtr>& 
rs_metas) {
+    if (rs_metas.size() < 2) {
+        return Status::OK();
+    }
+    auto prev = rs_metas.begin();
+    for (auto it = rs_metas.begin() + 1; it != rs_metas.end(); ++it) {
+        if ((*prev)->end_version() + 1 != (*it)->start_version()) {
+            return Status::InternalError("versions are not continuity: prev={} 
cur={}",
+                                         (*prev)->version().to_string(),
+                                         (*it)->version().to_string());
+        }
+        prev = it;
+    }
+    return Status::OK();
+}
+
+Status Tablet::write_cooldown_meta() {
+    auto [cooldown_replica_id, cooldown_term] = cooldown_conf();
+    if (cooldown_replica_id != replica_id()) {
+        return Status::Aborted("this replica is not cooldown replica");
     }
 
+    std::shared_ptr<io::RemoteFileSystem> fs;
+    RETURN_IF_ERROR(get_remote_file_system(storage_policy_id(), &fs));
+
     std::vector<RowsetMetaSharedPtr> cooldowned_rs_metas;
+    UniqueId cooldown_meta_id;
     {
         std::shared_lock meta_rlock(_meta_lock);
         for (auto& rs_meta : _tablet_meta->all_rs_metas()) {
             if (!rs_meta->is_local()) {
-                if (to_delete_set.find(rs_meta->version()) != 
to_delete_set.end()) {
-                    continue;
-                }
-                cooldowned_rs_metas.emplace_back(rs_meta);
+                cooldowned_rs_metas.push_back(rs_meta);
             }
         }
+        cooldown_meta_id = _tablet_meta->cooldown_meta_id();
     }
-    cooldowned_rs_metas.emplace_back(new_rs_meta);
-    std::sort(cooldowned_rs_metas.begin(), cooldowned_rs_metas.end(), 
RowsetMeta::comparator);
-
-    // check_version_continuity
-    if (!cooldowned_rs_metas.empty()) {
-        RowsetMetaSharedPtr prev_rowset_meta = cooldowned_rs_metas.front();
-        for (size_t i = 1; i < cooldowned_rs_metas.size(); ++i) {
-            RowsetMetaSharedPtr rowset_meta = cooldowned_rs_metas[i];
-            if (rowset_meta->start_version() != 
prev_rowset_meta->end_version() + 1) {
-                LOG(WARNING) << "There are missed versions among rowsets. "
-                             << "prev_rowset_meta version=" << 
prev_rowset_meta->start_version()
-                             << "-" << prev_rowset_meta->end_version()
-                             << ", rowset_meta version=" << 
rowset_meta->start_version() << "-"
-                             << rowset_meta->end_version();
-                return Status::Error<CUMULATIVE_MISS_VERSION>();
-            }
-            prev_rowset_meta = rowset_meta;
-        }
+    if (cooldowned_rs_metas.empty()) {
+        LOG(INFO) << "no cooldown meta to write, tablet_id=" << tablet_id();
+        return Status::OK();
     }
+    std::sort(cooldowned_rs_metas.begin(), cooldowned_rs_metas.end(), 
RowsetMeta::comparator);
+    DCHECK(cooldowned_rs_metas.front()->start_version() == 0);
+    RETURN_IF_ERROR(check_version_continuity(cooldowned_rs_metas));
 
     TabletMetaPB tablet_meta_pb;
     auto rs_metas = tablet_meta_pb.mutable_rs_metas();
@@ -1807,7 +1810,7 @@ Status Tablet::write_cooldown_meta(const 
std::shared_ptr<io::RemoteFileSystem>&
     tablet_meta_pb.mutable_cooldown_meta_id()->set_lo(cooldown_meta_id.lo);
 
     std::string remote_meta_path =
-            BetaRowset::remote_tablet_meta_path(tablet_id(), 
_tablet_meta->replica_id());
+            remote_tablet_meta_path(tablet_id(), cooldown_replica_id, 
cooldown_term);
     io::FileWriterPtr tablet_meta_writer;
     RETURN_IF_ERROR(fs->create_file(remote_meta_path, &tablet_meta_writer));
     auto val = tablet_meta_pb.SerializeAsString();
@@ -1815,11 +1818,15 @@ Status Tablet::write_cooldown_meta(const 
std::shared_ptr<io::RemoteFileSystem>&
     return tablet_meta_writer->close();
 }
 
-Status Tablet::_follow_cooldowned_data(const 
std::shared_ptr<io::RemoteFileSystem>& fs,
-                                       int64_t cooldown_replica_id) {
+// hold SHARED `cooldown_conf_lock`
+Status Tablet::_follow_cooldowned_data() {
+    DCHECK(_cooldown_replica_id != replica_id());
     LOG(INFO) << "try to follow cooldowned data. tablet_id=" << tablet_id()
-              << " cooldown_replica_id=" << cooldown_replica_id
+              << " cooldown_replica_id=" << _cooldown_replica_id
               << " local replica=" << replica_id();
+
+    std::shared_ptr<io::RemoteFileSystem> fs;
+    RETURN_IF_ERROR(get_remote_file_system(storage_policy_id(), &fs));
     // MUST executing serially with cold data compaction, because compaction 
input rowsets may be deleted by this function
     std::unique_lock cold_compaction_lock(_cold_compaction_lock, 
std::try_to_lock);
     if (!cold_compaction_lock.owns_lock()) {
@@ -1827,7 +1834,7 @@ Status Tablet::_follow_cooldowned_data(const 
std::shared_ptr<io::RemoteFileSyste
     }
 
     TabletMetaPB cooldown_meta_pb;
-    RETURN_IF_ERROR(_read_cooldown_meta(fs, cooldown_replica_id, 
&cooldown_meta_pb));
+    RETURN_IF_ERROR(_read_cooldown_meta(fs, &cooldown_meta_pb));
     DCHECK(cooldown_meta_pb.rs_metas_size() > 0);
     if (_tablet_meta->cooldown_meta_id() == 
cooldown_meta_pb.cooldown_meta_id()) {
         // cooldowned rowsets are same, no need to follow
@@ -2047,11 +2054,18 @@ void Tablet::remove_unused_remote_files() {
         DCHECK(atol(dest_fs->id().c_str()) == storage_policy->resource_id);
         DCHECK(dest_fs->type() != io::FileSystemType::LOCAL);
 
-        Status st;
+        std::shared_ptr<io::RemoteFileSystem> fs;
+        auto st = get_remote_file_system(t->storage_policy_id(), &fs);
+        if (!st.ok()) {
+            LOG(WARNING) << "encounter error when remove unused remote files, 
tablet_id="
+                         << t->tablet_id() << " : " << st;
+            return;
+        }
+
         std::vector<io::Path> files;
         // FIXME(plat1ko): What if user reset resource in storage policy to 
another resource?
         //  Maybe we should also list files in previously uploaded resources.
-        st = dest_fs->list(BetaRowset::remote_tablet_path(t->tablet_id()), 
&files);
+        st = dest_fs->list(remote_tablet_path(t->tablet_id()), &files);
         if (!st.ok()) {
             LOG(WARNING) << "encounter error when remove unused remote files, 
tablet_id="
                          << t->tablet_id() << " : " << st;
@@ -2075,8 +2089,13 @@ void Tablet::remove_unused_remote_files() {
             }
             cooldown_meta_id = t->_tablet_meta->cooldown_meta_id();
         }
-        // {replica_id}.meta
-        std::string remote_meta_path = std::to_string(t->replica_id()) + 
".meta";
+        auto [cooldown_replica_id, cooldown_term] = t->cooldown_conf();
+        if (cooldown_replica_id != t->replica_id()) {
+            return;
+        }
+        // {cooldown_replica_id}.{cooldown_term}.meta
+        std::string remote_meta_path =
+                fmt::format("{}.{}.meta", cooldown_replica_id, cooldown_term);
         // filter out the paths that should be reserved
         // clang-format off
         files.erase(std::remove_if(files.begin(), files.end(), [&](io::Path& 
path) {
@@ -2111,7 +2130,7 @@ void Tablet::remove_unused_remote_files() {
         buffer.insert({t->tablet_id(), {std::move(dest_fs), 
std::move(files)}});
         auto& info = req.confirm_list.emplace_back();
         info.__set_tablet_id(t->tablet_id());
-        info.__set_cooldown_replica_id(t->replica_id());
+        info.__set_cooldown_replica_id(cooldown_replica_id);
         info.__set_cooldown_meta_id(cooldown_meta_id.to_thrift());
     };
 
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index a4114d2ca5..c14f27e748 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <atomic>
 #include <functional>
 #include <memory>
 #include <mutex>
@@ -308,8 +309,6 @@ public:
     
////////////////////////////////////////////////////////////////////////////
     // begin cooldown functions
     
////////////////////////////////////////////////////////////////////////////
-    int64_t cooldown_replica_id() const { return _cooldown_replica_id; }
-
     // Cooldown to remote fs.
     Status cooldown();
 
@@ -317,7 +316,22 @@ public:
 
     bool need_cooldown(int64_t* cooldown_timestamp, size_t* file_size);
 
-    void update_cooldown_conf(int64_t cooldown_term, int64_t 
cooldown_replica_id) {
+    std::pair<int64_t, int64_t> cooldown_conf() const {
+        std::shared_lock rlock(_cooldown_conf_lock);
+        return {_cooldown_replica_id, _cooldown_term};
+    }
+
+    std::pair<int64_t, int64_t> cooldown_conf_unlocked() const {
+        return {_cooldown_replica_id, _cooldown_term};
+    }
+
+    // return true if update success
+    bool update_cooldown_conf(int64_t cooldown_term, int64_t 
cooldown_replica_id) {
+        std::unique_lock wlock(_cooldown_conf_lock, std::try_to_lock);
+        if (!wlock.owns_lock()) {
+            LOG(INFO) << "try cooldown_conf_lock failed, tablet_id=" << 
tablet_id();
+            return false;
+        }
         if (cooldown_term > _cooldown_term) {
             LOG(INFO) << "update cooldown conf. tablet_id=" << tablet_id()
                       << " cooldown_replica_id: " << _cooldown_replica_id << " 
-> "
@@ -325,7 +339,9 @@ public:
                       << cooldown_term;
             _cooldown_replica_id = cooldown_replica_id;
             _cooldown_term = cooldown_term;
+            return true;
         }
+        return false;
     }
 
     Status remove_all_remote_rowsets();
@@ -344,6 +360,10 @@ public:
     uint32_t calc_cold_data_compaction_score() const;
 
     std::mutex& get_cold_compaction_lock() { return _cold_compaction_lock; }
+
+    std::shared_mutex& get_cooldown_conf_lock() { return _cooldown_conf_lock; }
+
+    Status write_cooldown_meta();
     
////////////////////////////////////////////////////////////////////////////
     // end cooldown functions
     
////////////////////////////////////////////////////////////////////////////
@@ -412,10 +432,6 @@ public:
         return config::max_tablet_io_errors > 0 && _io_error_times >= 
config::max_tablet_io_errors;
     }
 
-    Status write_cooldown_meta(const std::shared_ptr<io::RemoteFileSystem>& fs,
-                               UniqueId cooldown_meta_id, const 
RowsetMetaSharedPtr& new_rs_meta,
-                               const std::vector<RowsetMetaSharedPtr>& 
to_deletes);
-
 private:
     Status _init_once_action();
     void _print_missed_versions(const std::vector<Version>& missed_versions) 
const;
@@ -455,11 +471,10 @@ private:
     
////////////////////////////////////////////////////////////////////////////
     // begin cooldown functions
     
////////////////////////////////////////////////////////////////////////////
-    Status _cooldown_data(const std::shared_ptr<io::RemoteFileSystem>& 
dest_fs);
-    Status _follow_cooldowned_data(const 
std::shared_ptr<io::RemoteFileSystem>& fs,
-                                   int64_t cooldown_replica_id);
+    Status _cooldown_data();
+    Status _follow_cooldowned_data();
     Status _read_cooldown_meta(const std::shared_ptr<io::RemoteFileSystem>& fs,
-                               int64_t cooldown_replica_id, TabletMetaPB* 
tablet_meta_pb);
+                               TabletMetaPB* tablet_meta_pb);
     
////////////////////////////////////////////////////////////////////////////
     // end cooldown functions
     
////////////////////////////////////////////////////////////////////////////
@@ -539,6 +554,13 @@ private:
     // cooldown related
     int64_t _cooldown_replica_id = -1;
     int64_t _cooldown_term = -1;
+    // `_cooldown_conf_lock` is used to serialize update cooldown conf and all 
operations that:
+    // 1. read cooldown conf
+    // 2. upload rowsets to remote storage
+    // 3. update cooldown meta id
+    mutable std::shared_mutex _cooldown_conf_lock;
+    // `_cold_compaction_lock` is used to serialize cold data compaction and 
all operations that
+    // may delete compaction input rowsets.
     std::mutex _cold_compaction_lock;
 
     DISALLOW_COPY_AND_ASSIGN(Tablet);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index fb8b834787..0f694348ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -111,6 +111,7 @@ public class Replica implements Writable {
     private boolean bad = false;
 
     private TUniqueId cooldownMetaId;
+    private long cooldownTerm = -1;
 
     /*
      * If set to true, with means this replica need to be repaired. explicitly.
@@ -246,6 +247,14 @@ public class Replica implements Writable {
         this.cooldownMetaId = cooldownMetaId;
     }
 
+    public long getCooldownTerm() {
+        return cooldownTerm;
+    }
+
+    public void setCooldownTerm(long cooldownTerm) {
+        this.cooldownTerm = cooldownTerm;
+    }
+
     public boolean needFurtherRepair() {
         if (needFurtherRepair && System.currentTimeMillis() - 
this.furtherRepairSetTime < FURTHER_REPAIR_TIMEOUT_MS) {
             return true;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index 726e86544a..2068b93755 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -190,10 +190,11 @@ public class TabletInvertedIndex {
                                 }
                             }
 
-                            if (Config.enable_storage_policy && 
backendTabletInfo.isSetCooldownReplicaId()) {
+                            if (Config.enable_storage_policy && 
backendTabletInfo.isSetCooldownTerm()) {
                                 handleCooldownConf(tabletMeta, 
backendTabletInfo, cooldownConfToPush,
                                         cooldownConfToUpdate);
                                 
replica.setCooldownMetaId(backendTabletInfo.getCooldownMetaId());
+                                
replica.setCooldownTerm(backendTabletInfo.getCooldownTerm());
                             }
 
                             long partitionId = tabletMeta.getPartitionId();
@@ -395,7 +396,7 @@ public class TabletInvertedIndex {
             return;
         }
 
-        if (cooldownConf.first != beTabletInfo.getCooldownReplicaId()) {
+        if (beTabletInfo.getCooldownTerm() < cooldownConf.second) {
             CooldownConf conf = new CooldownConf(beTabletInfo.tablet_id, 
cooldownConf.first, cooldownConf.second);
             synchronized (cooldownConfToPush) {
                 cooldownConfToPush.add(conf);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 1f4a8715e9..ec22328c55 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -1147,18 +1147,40 @@ public class ReportHandler extends Daemon {
                 if (backendTabletInfo.isSetCooldownMetaId()) {
                     // replica has cooldowned data
                     do {
-                        if (backendTabletInfo.getReplicaId() == 
tablet.getCooldownConf().first) {
+                        Pair<Long, Long> cooldownConf = 
tablet.getCooldownConf();
+                        if (backendTabletInfo.getCooldownTerm() > 
cooldownConf.second) {
+                            // should not be here
+                            LOG.warn("report cooldownTerm({}) > cooldownTerm 
in TabletMeta({}), tabletId={}",
+                                    backendTabletInfo.getCooldownTerm(), 
cooldownConf.second, tabletId);
+                            return false;
+                        }
+                        if (backendTabletInfo.getReplicaId() == 
cooldownConf.first) {
                             // this replica is true cooldown replica, so 
replica's cooldowned data must not be deleted
                             break;
                         }
-                        if (backendTabletInfo.getReplicaId() != 
backendTabletInfo.getCooldownReplicaId()
-                                && 
Env.getCurrentInvertedIndex().getReplicas(tabletId).stream()
-                                .anyMatch(r -> 
backendTabletInfo.getCooldownMetaId().equals(r.getCooldownMetaId()))) {
-                            // this replica can not cooldown data, and shares 
same cooldowned data with others replica,
-                            // so replica's cooldowned data must not be deleted
-                            break;
+                        List<Replica> replicas = 
Env.getCurrentInvertedIndex().getReplicas(tabletId);
+                        if (backendTabletInfo.getCooldownTerm() <= 0) {
+                            if (replicas.stream().anyMatch(
+                                    r -> 
backendTabletInfo.getCooldownMetaId().equals(r.getCooldownMetaId()))) {
+                                // this backend is just restarted, and shares 
same cooldowned data with others replica,
+                                // so replica's cooldowned data must not be 
deleted
+                                break;
+                            }
+                        }
+                        long minCooldownTerm = Long.MAX_VALUE;
+                        for (Replica r : replicas) {
+                            minCooldownTerm = Math.min(r.getCooldownTerm(), 
minCooldownTerm);
+                        }
+                        if (backendTabletInfo.getCooldownTerm() >= 
minCooldownTerm) {
+                            if (replicas.stream().anyMatch(
+                                    r -> 
backendTabletInfo.getCooldownMetaId().equals(r.getCooldownMetaId()))) {
+                                // this replica shares same cooldowned data 
with others replica, and won't follow data
+                                // of lower cooldown term, so replica's 
cooldowned data must not be deleted
+                                break;
+                            }
                         }
-                        LOG.warn("replica's cooldowned data may have been 
deleted");
+                        LOG.warn("replica's cooldowned data may have been 
deleted. tabletId={}, replicaId={}", tabletId,
+                                replicaId);
                         return false;
                     } while (false);
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 52341ebf45..773bae3f7f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -48,6 +48,7 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.DuplicatedRequestException;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.PatternMatcher;
 import org.apache.doris.common.PatternMatcherException;
 import org.apache.doris.common.ThriftServerContext;
@@ -220,9 +221,9 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                 return;
             }
             // check cooldownReplicaId
-            long cooldownReplicaId = tablet.getCooldownConf().first;
-            if (cooldownReplicaId != info.cooldown_replica_id) {
-                LOG.info("cooldown replica id not match({} vs {}), tablet={}", 
cooldownReplicaId,
+            Pair<Long, Long> cooldownConf = tablet.getCooldownConf();
+            if (cooldownConf.first != info.cooldown_replica_id) {
+                LOG.info("cooldown replica id not match({} vs {}), tablet={}", 
cooldownConf.first,
                         info.cooldown_replica_id, info.tablet_id);
                 return;
             }
@@ -239,6 +240,11 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                     LOG.info("replica is not alive, tablet={}, replica={}", 
info.tablet_id, replica.getId());
                     return;
                 }
+                if (replica.getCooldownTerm() != cooldownConf.second) {
+                    LOG.info("replica's cooldown term not match({} vs {}), 
tablet={}", cooldownConf.second,
+                            replica.getCooldownTerm(), info.tablet_id);
+                    return;
+                }
                 if 
(!info.cooldown_meta_id.equals(replica.getCooldownMetaId())) {
                     LOG.info("cooldown meta id are not same, tablet={}", 
info.tablet_id);
                     return;
diff --git a/gensrc/thrift/MasterService.thrift 
b/gensrc/thrift/MasterService.thrift
index 3c1d4ced03..99ca74a22b 100644
--- a/gensrc/thrift/MasterService.thrift
+++ b/gensrc/thrift/MasterService.thrift
@@ -42,7 +42,7 @@ struct TTabletInfo {
     15: optional Types.TReplicaId replica_id
     // data size on remote storage
     16: optional Types.TSize remote_data_size
-    17: optional Types.TReplicaId cooldown_replica_id
+    // 17: optional Types.TReplicaId cooldown_replica_id
     // 18: optional bool is_cooldown
     19: optional i64 cooldown_term
     20: optional Types.TUniqueId cooldown_meta_id


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to