This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new eaabc2f99db [enhancement](memtable) use shared ptr for flush token since it is shared between memtable write thread and flush thread (#38023) eaabc2f99db is described below commit eaabc2f99db8da6919f908ae569580ec72b13037 Author: yiguolei <676222...@qq.com> AuthorDate: Thu Jul 18 08:00:15 2024 +0800 [enhancement](memtable) use shared ptr for flush token since it is shared between memtable write thread and flush thread (#38023) ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> --------- Co-authored-by: yiguolei <yiguo...@gmail.com> --- be/src/olap/memtable_flush_executor.cpp | 30 +++++++++++++++++++----------- be/src/olap/memtable_flush_executor.h | 20 ++++++++++++-------- be/src/olap/memtable_writer.cpp | 20 ++++++++++---------- be/src/olap/memtable_writer.h | 4 +++- 4 files changed, 44 insertions(+), 30 deletions(-) diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index dc9545d0b34..6d7143a293c 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -43,8 +43,10 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_thread_num, MetricUnit::NOU bvar::Adder<int64_t> g_flush_task_num("memtable_flush_task_num"); class MemtableFlushTask final : public Runnable { + ENABLE_FACTORY_CREATOR(MemtableFlushTask); + public: - MemtableFlushTask(FlushToken* flush_token, std::unique_ptr<MemTable> memtable, + MemtableFlushTask(std::shared_ptr<FlushToken> flush_token, std::unique_ptr<MemTable> memtable, int32_t segment_id, int64_t submit_task_time) : _flush_token(flush_token), _memtable(std::move(memtable)), @@ -56,11 +58,16 @@ public: ~MemtableFlushTask() override { g_flush_task_num << -1; } void run() override { - _flush_token->_flush_memtable(std::move(_memtable), _segment_id, _submit_task_time); + auto token = _flush_token.lock(); + if (token) { + token->_flush_memtable(std::move(_memtable), _segment_id, _submit_task_time); + } else { + LOG(WARNING) << "flush token is deconstructed, ignore the flush task"; + } } private: - FlushToken* _flush_token; + std::weak_ptr<FlushToken> _flush_token; std::unique_ptr<MemTable> _memtable; int32_t _segment_id; int64_t _submit_task_time; @@ -91,8 +98,9 @@ Status FlushToken::submit(std::unique_ptr<MemTable> mem_table) { return Status::OK(); } int64_t submit_task_time = MonotonicNanos(); - auto task = std::make_shared<MemtableFlushTask>( - this, std::move(mem_table), _rowset_writer->allocate_segment_id(), submit_task_time); + auto task = MemtableFlushTask::create_shared(shared_from_this(), std::move(mem_table), + _rowset_writer->allocate_segment_id(), + submit_task_time); Status ret = _thread_pool->submit(std::move(task)); if (ret.ok()) { // _wait_running_task_finish was executed after this function, so no need to notify _cond here @@ -224,8 +232,8 @@ void MemTableFlushExecutor::init(int num_disk) { } // NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order. -Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& flush_token, - RowsetWriter* rowset_writer, +Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& flush_token, + std::shared_ptr<RowsetWriter> rowset_writer, bool is_high_priority) { switch (rowset_writer->type()) { case ALPHA_ROWSET: @@ -234,7 +242,7 @@ Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& fl case BETA_ROWSET: { // beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer. ThreadPool* pool = is_high_priority ? _high_prio_flush_pool.get() : _flush_pool.get(); - flush_token = std::make_unique<FlushToken>(pool); + flush_token = FlushToken::create_shared(pool); flush_token->set_rowset_writer(rowset_writer); return Status::OK(); } @@ -243,11 +251,11 @@ Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& fl } } -Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& flush_token, - RowsetWriter* rowset_writer, +Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& flush_token, + std::shared_ptr<RowsetWriter> rowset_writer, ThreadPool* wg_flush_pool_ptr) { if (rowset_writer->type() == BETA_ROWSET) { - flush_token = std::make_unique<FlushToken>(wg_flush_pool_ptr); + flush_token = FlushToken::create_shared(wg_flush_pool_ptr); } else { return Status::InternalError<false>("not support alpha rowset load now."); } diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index b647c23deb9..2d20298f800 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -55,10 +55,11 @@ std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat); // 1. Immediately disallow submission of any subsequent memtable // 2. For the memtables that have already been submitted, there is no need to flush, // because the entire job will definitely fail; -class FlushToken { +class FlushToken : public std::enable_shared_from_this<FlushToken> { + ENABLE_FACTORY_CREATOR(FlushToken); + public: - explicit FlushToken(ThreadPool* thread_pool) - : _flush_status(Status::OK()), _thread_pool(thread_pool) {} + FlushToken(ThreadPool* thread_pool) : _flush_status(Status::OK()), _thread_pool(thread_pool) {} Status submit(std::unique_ptr<MemTable> mem_table); @@ -72,7 +73,9 @@ public: // get flush operations' statistics const FlushStatistic& get_stats() const { return _stats; } - void set_rowset_writer(RowsetWriter* rowset_writer) { _rowset_writer = rowset_writer; } + void set_rowset_writer(std::shared_ptr<RowsetWriter> rowset_writer) { + _rowset_writer = rowset_writer; + } const MemTableStat& memtable_stat() { return _memtable_stat; } @@ -96,7 +99,7 @@ private: FlushStatistic _stats; - RowsetWriter* _rowset_writer = nullptr; + std::shared_ptr<RowsetWriter> _rowset_writer = nullptr; MemTableStat _memtable_stat; @@ -129,10 +132,11 @@ public: // because it needs path hash of each data dir. void init(int num_disk); - Status create_flush_token(std::unique_ptr<FlushToken>& flush_token, RowsetWriter* rowset_writer, - bool is_high_priority); + Status create_flush_token(std::shared_ptr<FlushToken>& flush_token, + std::shared_ptr<RowsetWriter> rowset_writer, bool is_high_priority); - Status create_flush_token(std::unique_ptr<FlushToken>& flush_token, RowsetWriter* rowset_writer, + Status create_flush_token(std::shared_ptr<FlushToken>& flush_token, + std::shared_ptr<RowsetWriter> rowset_writer, ThreadPool* wg_flush_pool_ptr); private: diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp index ff564771468..114a7841b92 100644 --- a/be/src/olap/memtable_writer.cpp +++ b/be/src/olap/memtable_writer.cpp @@ -78,17 +78,17 @@ Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer, // by assigning segment_id to memtable before submiting to flush executor, // we can make sure same keys sort in the same order in all replicas. if (wg_flush_pool_ptr) { - RETURN_IF_ERROR(ExecEnv::GetInstance() - ->storage_engine() - .memtable_flush_executor() - ->create_flush_token(_flush_token, _rowset_writer.get(), - wg_flush_pool_ptr)); + RETURN_IF_ERROR( + ExecEnv::GetInstance() + ->storage_engine() + .memtable_flush_executor() + ->create_flush_token(_flush_token, _rowset_writer, wg_flush_pool_ptr)); } else { - RETURN_IF_ERROR(ExecEnv::GetInstance() - ->storage_engine() - .memtable_flush_executor() - ->create_flush_token(_flush_token, _rowset_writer.get(), - _req.is_high_priority)); + RETURN_IF_ERROR( + ExecEnv::GetInstance() + ->storage_engine() + .memtable_flush_executor() + ->create_flush_token(_flush_token, _rowset_writer, _req.is_high_priority)); } _is_init = true; diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h index b34fe0baee4..ee7c8e1538a 100644 --- a/be/src/olap/memtable_writer.h +++ b/be/src/olap/memtable_writer.h @@ -127,7 +127,9 @@ private: TabletSchemaSPtr _tablet_schema; bool _unique_key_mow = false; - std::unique_ptr<FlushToken> _flush_token; + // This variable is accessed from writer thread and token flush thread + // use a shared ptr to avoid use after free problem. + std::shared_ptr<FlushToken> _flush_token; std::vector<std::shared_ptr<MemTracker>> _mem_table_insert_trackers; std::vector<std::shared_ptr<MemTracker>> _mem_table_flush_trackers; SpinLock _mem_table_tracker_lock; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org