This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 13e9842f17b [enhancement](memtable) use shared ptr for flush token since it is shared between memtable write thread and flush thread (#38023) (#38068) 13e9842f17b is described below commit 13e9842f17b3e8e45b7bf8bf3a31f01d40096622 Author: yiguolei <676222...@qq.com> AuthorDate: Thu Jul 18 19:09:40 2024 +0800 [enhancement](memtable) use shared ptr for flush token since it is shared between memtable write thread and flush thread (#38023) (#38068) pick https://github.com/apache/doris/pull/38023 --------- ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> Co-authored-by: yiguolei <yiguo...@gmail.com> --- be/src/olap/memtable_flush_executor.cpp | 30 +++++++++++++++++++----------- be/src/olap/memtable_flush_executor.h | 20 ++++++++++++-------- be/src/olap/memtable_writer.cpp | 4 ++-- be/src/olap/memtable_writer.h | 4 +++- 4 files changed, 36 insertions(+), 22 deletions(-) diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index 247bf0ca81b..4fc48f18edf 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -43,8 +43,10 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_thread_num, MetricUnit::NOU bvar::Adder<int64_t> g_flush_task_num("memtable_flush_task_num"); class MemtableFlushTask final : public Runnable { + ENABLE_FACTORY_CREATOR(MemtableFlushTask); + public: - MemtableFlushTask(FlushToken* flush_token, std::unique_ptr<MemTable> memtable, + MemtableFlushTask(std::shared_ptr<FlushToken> flush_token, std::unique_ptr<MemTable> memtable, int32_t segment_id, int64_t submit_task_time) : _flush_token(flush_token), _memtable(std::move(memtable)), @@ -56,11 +58,16 @@ public: ~MemtableFlushTask() override { g_flush_task_num << -1; } void run() override { - _flush_token->_flush_memtable(std::move(_memtable), _segment_id, _submit_task_time); + auto token = _flush_token.lock(); + if (token) { + token->_flush_memtable(std::move(_memtable), _segment_id, _submit_task_time); + } else { + LOG(WARNING) << "flush token is deconstructed, ignore the flush task"; + } } private: - FlushToken* _flush_token; + std::weak_ptr<FlushToken> _flush_token; std::unique_ptr<MemTable> _memtable; int32_t _segment_id; int64_t _submit_task_time; @@ -91,8 +98,9 @@ Status FlushToken::submit(std::unique_ptr<MemTable> mem_table) { return Status::OK(); } int64_t submit_task_time = MonotonicNanos(); - auto task = std::make_shared<MemtableFlushTask>( - this, std::move(mem_table), _rowset_writer->allocate_segment_id(), submit_task_time); + auto task = MemtableFlushTask::create_shared(shared_from_this(), std::move(mem_table), + _rowset_writer->allocate_segment_id(), + submit_task_time); Status ret = _thread_pool->submit(std::move(task)); if (ret.ok()) { // _wait_running_task_finish was executed after this function, so no need to notify _cond here @@ -219,8 +227,8 @@ void MemTableFlushExecutor::init(const std::vector<DataDir*>& data_dirs) { } // NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order. -Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& flush_token, - RowsetWriter* rowset_writer, +Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& flush_token, + std::shared_ptr<RowsetWriter> rowset_writer, bool is_high_priority) { switch (rowset_writer->type()) { case ALPHA_ROWSET: @@ -229,7 +237,7 @@ Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& fl case BETA_ROWSET: { // beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer. ThreadPool* pool = is_high_priority ? _high_prio_flush_pool.get() : _flush_pool.get(); - flush_token = std::make_unique<FlushToken>(pool); + flush_token = FlushToken::create_shared(pool); flush_token->set_rowset_writer(rowset_writer); return Status::OK(); } @@ -238,11 +246,11 @@ Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& fl } } -Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& flush_token, - RowsetWriter* rowset_writer, +Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& flush_token, + std::shared_ptr<RowsetWriter> rowset_writer, ThreadPool* wg_flush_pool_ptr) { if (rowset_writer->type() == BETA_ROWSET) { - flush_token = std::make_unique<FlushToken>(wg_flush_pool_ptr); + flush_token = FlushToken::create_shared(wg_flush_pool_ptr); } else { return Status::InternalError<false>("not support alpha rowset load now."); } diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index 1576e68fc72..44ced2a27a9 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -55,10 +55,11 @@ std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat); // 1. Immediately disallow submission of any subsequent memtable // 2. For the memtables that have already been submitted, there is no need to flush, // because the entire job will definitely fail; -class FlushToken { +class FlushToken : public std::enable_shared_from_this<FlushToken> { + ENABLE_FACTORY_CREATOR(FlushToken); + public: - explicit FlushToken(ThreadPool* thread_pool) - : _flush_status(Status::OK()), _thread_pool(thread_pool) {} + FlushToken(ThreadPool* thread_pool) : _flush_status(Status::OK()), _thread_pool(thread_pool) {} Status submit(std::unique_ptr<MemTable> mem_table); @@ -72,7 +73,9 @@ public: // get flush operations' statistics const FlushStatistic& get_stats() const { return _stats; } - void set_rowset_writer(RowsetWriter* rowset_writer) { _rowset_writer = rowset_writer; } + void set_rowset_writer(std::shared_ptr<RowsetWriter> rowset_writer) { + _rowset_writer = rowset_writer; + } const MemTableStat& memtable_stat() { return _memtable_stat; } @@ -96,7 +99,7 @@ private: FlushStatistic _stats; - RowsetWriter* _rowset_writer = nullptr; + std::shared_ptr<RowsetWriter> _rowset_writer = nullptr; MemTableStat _memtable_stat; @@ -129,10 +132,11 @@ public: // because it needs path hash of each data dir. void init(const std::vector<DataDir*>& data_dirs); - Status create_flush_token(std::unique_ptr<FlushToken>& flush_token, RowsetWriter* rowset_writer, - bool is_high_priority); + Status create_flush_token(std::shared_ptr<FlushToken>& flush_token, + std::shared_ptr<RowsetWriter> rowset_writer, bool is_high_priority); - Status create_flush_token(std::unique_ptr<FlushToken>& flush_token, RowsetWriter* rowset_writer, + Status create_flush_token(std::shared_ptr<FlushToken>& flush_token, + std::shared_ptr<RowsetWriter> rowset_writer, ThreadPool* wg_flush_pool_ptr); private: diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp index 93499a51cb8..29206a292cd 100644 --- a/be/src/olap/memtable_writer.cpp +++ b/be/src/olap/memtable_writer.cpp @@ -79,10 +79,10 @@ Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer, // we can make sure same keys sort in the same order in all replicas. if (wg_flush_pool_ptr) { RETURN_IF_ERROR(StorageEngine::instance()->memtable_flush_executor()->create_flush_token( - _flush_token, _rowset_writer.get(), wg_flush_pool_ptr)); + _flush_token, _rowset_writer, wg_flush_pool_ptr)); } else { RETURN_IF_ERROR(StorageEngine::instance()->memtable_flush_executor()->create_flush_token( - _flush_token, _rowset_writer.get(), _req.is_high_priority)); + _flush_token, _rowset_writer, _req.is_high_priority)); } _is_init = true; diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h index b34fe0baee4..ee7c8e1538a 100644 --- a/be/src/olap/memtable_writer.h +++ b/be/src/olap/memtable_writer.h @@ -127,7 +127,9 @@ private: TabletSchemaSPtr _tablet_schema; bool _unique_key_mow = false; - std::unique_ptr<FlushToken> _flush_token; + // This variable is accessed from writer thread and token flush thread + // use a shared ptr to avoid use after free problem. + std::shared_ptr<FlushToken> _flush_token; std::vector<std::shared_ptr<MemTracker>> _mem_table_insert_trackers; std::vector<std::shared_ptr<MemTracker>> _mem_table_flush_trackers; SpinLock _mem_table_tracker_lock; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org