This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 13e9842f17b [enhancement](memtable) use shared ptr for flush token 
since it is shared between memtable write thread and flush thread (#38023) 
(#38068)
13e9842f17b is described below

commit 13e9842f17b3e8e45b7bf8bf3a31f01d40096622
Author: yiguolei <676222...@qq.com>
AuthorDate: Thu Jul 18 19:09:40 2024 +0800

    [enhancement](memtable) use shared ptr for flush token since it is shared 
between memtable write thread and flush thread (#38023) (#38068)
    
    pick https://github.com/apache/doris/pull/38023
    ---------
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
    
    Co-authored-by: yiguolei <yiguo...@gmail.com>
---
 be/src/olap/memtable_flush_executor.cpp | 30 +++++++++++++++++++-----------
 be/src/olap/memtable_flush_executor.h   | 20 ++++++++++++--------
 be/src/olap/memtable_writer.cpp         |  4 ++--
 be/src/olap/memtable_writer.h           |  4 +++-
 4 files changed, 36 insertions(+), 22 deletions(-)

diff --git a/be/src/olap/memtable_flush_executor.cpp 
b/be/src/olap/memtable_flush_executor.cpp
index 247bf0ca81b..4fc48f18edf 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -43,8 +43,10 @@ 
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_thread_num, MetricUnit::NOU
 bvar::Adder<int64_t> g_flush_task_num("memtable_flush_task_num");
 
 class MemtableFlushTask final : public Runnable {
+    ENABLE_FACTORY_CREATOR(MemtableFlushTask);
+
 public:
-    MemtableFlushTask(FlushToken* flush_token, std::unique_ptr<MemTable> 
memtable,
+    MemtableFlushTask(std::shared_ptr<FlushToken> flush_token, 
std::unique_ptr<MemTable> memtable,
                       int32_t segment_id, int64_t submit_task_time)
             : _flush_token(flush_token),
               _memtable(std::move(memtable)),
@@ -56,11 +58,16 @@ public:
     ~MemtableFlushTask() override { g_flush_task_num << -1; }
 
     void run() override {
-        _flush_token->_flush_memtable(std::move(_memtable), _segment_id, 
_submit_task_time);
+        auto token = _flush_token.lock();
+        if (token) {
+            token->_flush_memtable(std::move(_memtable), _segment_id, 
_submit_task_time);
+        } else {
+            LOG(WARNING) << "flush token is deconstructed, ignore the flush 
task";
+        }
     }
 
 private:
-    FlushToken* _flush_token;
+    std::weak_ptr<FlushToken> _flush_token;
     std::unique_ptr<MemTable> _memtable;
     int32_t _segment_id;
     int64_t _submit_task_time;
@@ -91,8 +98,9 @@ Status FlushToken::submit(std::unique_ptr<MemTable> 
mem_table) {
         return Status::OK();
     }
     int64_t submit_task_time = MonotonicNanos();
-    auto task = std::make_shared<MemtableFlushTask>(
-            this, std::move(mem_table), _rowset_writer->allocate_segment_id(), 
submit_task_time);
+    auto task = MemtableFlushTask::create_shared(shared_from_this(), 
std::move(mem_table),
+                                                 
_rowset_writer->allocate_segment_id(),
+                                                 submit_task_time);
     Status ret = _thread_pool->submit(std::move(task));
     if (ret.ok()) {
         // _wait_running_task_finish was executed after this function, so no 
need to notify _cond here
@@ -219,8 +227,8 @@ void MemTableFlushExecutor::init(const 
std::vector<DataDir*>& data_dirs) {
 }
 
 // NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are 
flushed in order.
-Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& 
flush_token,
-                                                 RowsetWriter* rowset_writer,
+Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& 
flush_token,
+                                                 std::shared_ptr<RowsetWriter> 
rowset_writer,
                                                  bool is_high_priority) {
     switch (rowset_writer->type()) {
     case ALPHA_ROWSET:
@@ -229,7 +237,7 @@ Status 
MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& fl
     case BETA_ROWSET: {
         // beta rowset can be flush in CONCURRENT, because each memtable using 
a new segment writer.
         ThreadPool* pool = is_high_priority ? _high_prio_flush_pool.get() : 
_flush_pool.get();
-        flush_token = std::make_unique<FlushToken>(pool);
+        flush_token = FlushToken::create_shared(pool);
         flush_token->set_rowset_writer(rowset_writer);
         return Status::OK();
     }
@@ -238,11 +246,11 @@ Status 
MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& fl
     }
 }
 
-Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& 
flush_token,
-                                                 RowsetWriter* rowset_writer,
+Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& 
flush_token,
+                                                 std::shared_ptr<RowsetWriter> 
rowset_writer,
                                                  ThreadPool* 
wg_flush_pool_ptr) {
     if (rowset_writer->type() == BETA_ROWSET) {
-        flush_token = std::make_unique<FlushToken>(wg_flush_pool_ptr);
+        flush_token = FlushToken::create_shared(wg_flush_pool_ptr);
     } else {
         return Status::InternalError<false>("not support alpha rowset load 
now.");
     }
diff --git a/be/src/olap/memtable_flush_executor.h 
b/be/src/olap/memtable_flush_executor.h
index 1576e68fc72..44ced2a27a9 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -55,10 +55,11 @@ std::ostream& operator<<(std::ostream& os, const 
FlushStatistic& stat);
 // 1. Immediately disallow submission of any subsequent memtable
 // 2. For the memtables that have already been submitted, there is no need to 
flush,
 //    because the entire job will definitely fail;
-class FlushToken {
+class FlushToken : public std::enable_shared_from_this<FlushToken> {
+    ENABLE_FACTORY_CREATOR(FlushToken);
+
 public:
-    explicit FlushToken(ThreadPool* thread_pool)
-            : _flush_status(Status::OK()), _thread_pool(thread_pool) {}
+    FlushToken(ThreadPool* thread_pool) : _flush_status(Status::OK()), 
_thread_pool(thread_pool) {}
 
     Status submit(std::unique_ptr<MemTable> mem_table);
 
@@ -72,7 +73,9 @@ public:
     // get flush operations' statistics
     const FlushStatistic& get_stats() const { return _stats; }
 
-    void set_rowset_writer(RowsetWriter* rowset_writer) { _rowset_writer = 
rowset_writer; }
+    void set_rowset_writer(std::shared_ptr<RowsetWriter> rowset_writer) {
+        _rowset_writer = rowset_writer;
+    }
 
     const MemTableStat& memtable_stat() { return _memtable_stat; }
 
@@ -96,7 +99,7 @@ private:
 
     FlushStatistic _stats;
 
-    RowsetWriter* _rowset_writer = nullptr;
+    std::shared_ptr<RowsetWriter> _rowset_writer = nullptr;
 
     MemTableStat _memtable_stat;
 
@@ -129,10 +132,11 @@ public:
     // because it needs path hash of each data dir.
     void init(const std::vector<DataDir*>& data_dirs);
 
-    Status create_flush_token(std::unique_ptr<FlushToken>& flush_token, 
RowsetWriter* rowset_writer,
-                              bool is_high_priority);
+    Status create_flush_token(std::shared_ptr<FlushToken>& flush_token,
+                              std::shared_ptr<RowsetWriter> rowset_writer, 
bool is_high_priority);
 
-    Status create_flush_token(std::unique_ptr<FlushToken>& flush_token, 
RowsetWriter* rowset_writer,
+    Status create_flush_token(std::shared_ptr<FlushToken>& flush_token,
+                              std::shared_ptr<RowsetWriter> rowset_writer,
                               ThreadPool* wg_flush_pool_ptr);
 
 private:
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index 93499a51cb8..29206a292cd 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -79,10 +79,10 @@ Status MemTableWriter::init(std::shared_ptr<RowsetWriter> 
rowset_writer,
     // we can make sure same keys sort in the same order in all replicas.
     if (wg_flush_pool_ptr) {
         
RETURN_IF_ERROR(StorageEngine::instance()->memtable_flush_executor()->create_flush_token(
-                _flush_token, _rowset_writer.get(), wg_flush_pool_ptr));
+                _flush_token, _rowset_writer, wg_flush_pool_ptr));
     } else {
         
RETURN_IF_ERROR(StorageEngine::instance()->memtable_flush_executor()->create_flush_token(
-                _flush_token, _rowset_writer.get(), _req.is_high_priority));
+                _flush_token, _rowset_writer, _req.is_high_priority));
     }
 
     _is_init = true;
diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h
index b34fe0baee4..ee7c8e1538a 100644
--- a/be/src/olap/memtable_writer.h
+++ b/be/src/olap/memtable_writer.h
@@ -127,7 +127,9 @@ private:
     TabletSchemaSPtr _tablet_schema;
     bool _unique_key_mow = false;
 
-    std::unique_ptr<FlushToken> _flush_token;
+    // This variable is accessed from writer thread and token flush thread
+    // use a shared ptr to avoid use after free problem.
+    std::shared_ptr<FlushToken> _flush_token;
     std::vector<std::shared_ptr<MemTracker>> _mem_table_insert_trackers;
     std::vector<std::shared_ptr<MemTracker>> _mem_table_flush_trackers;
     SpinLock _mem_table_tracker_lock;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to