This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 08897b7e03135d2e390f3fbba40d531e387e91ae Author: wangbo <wan...@apache.org> AuthorDate: Tue Jan 30 10:12:31 2024 +0800 [Improvement](executor)Remove ThreadPoolToken from MemTableFlushExecutor #30529 --- be/src/olap/memtable_flush_executor.cpp | 63 +++++++++++++++++++++------------ be/src/olap/memtable_flush_executor.h | 21 +++++++---- be/src/olap/memtable_writer.cpp | 6 ++-- 3 files changed, 58 insertions(+), 32 deletions(-) diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index 7190597b54c..60ef194aae5 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -56,8 +56,7 @@ public: ~MemtableFlushTask() override { g_flush_task_num << -1; } void run() override { - _flush_token->_flush_memtable(_memtable.get(), _segment_id, _submit_task_time); - _memtable.reset(); + _flush_token->_flush_memtable(std::move(_memtable), _segment_id, _submit_task_time); } private: @@ -94,16 +93,35 @@ Status FlushToken::submit(std::unique_ptr<MemTable> mem_table) { int64_t submit_task_time = MonotonicNanos(); auto task = std::make_shared<MemtableFlushTask>( this, std::move(mem_table), _rowset_writer->allocate_segment_id(), submit_task_time); - _stats.flush_running_count++; - return _flush_token->submit(std::move(task)); + Status ret = _thread_pool->submit(std::move(task)); + if (ret.ok()) { + _stats.flush_running_count++; + } + return ret; +} + +// NOTE: FlushToken's submit/cancel/wait run in one thread, +// so we don't need to make them mutually exclusive, std::atomic is enough. +void FlushToken::_wait_running_task_finish() { + while (true) { + int64_t flush_running_count = _stats.flush_running_count.load(); + if (flush_running_count < 0) { + LOG(ERROR) << "flush_running_count < 0, this is not expected!"; + } + if (flush_running_count == 0) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } } void FlushToken::cancel() { - _flush_token->shutdown(); + _shutdown_flush_token(); + _wait_running_task_finish(); } Status FlushToken::wait() { - _flush_token->wait(); + _wait_running_task_finish(); { std::shared_lock rdlk(_flush_status_lock); if (!_flush_status.ok()) { @@ -134,8 +152,12 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, in return Status::OK(); } -void FlushToken::_flush_memtable(MemTable* mem_table, int32_t segment_id, +void FlushToken::_flush_memtable(std::unique_ptr<MemTable> memtable_ptr, int32_t segment_id, int64_t submit_task_time) { + Defer defer {[&]() { _stats.flush_running_count--; }}; + if (_is_shutdown()) { + return; + } uint64_t flush_wait_time_ns = MonotonicNanos() - submit_task_time; _stats.flush_wait_time_ns += flush_wait_time_ns; // If previous flush has failed, return directly @@ -148,10 +170,10 @@ void FlushToken::_flush_memtable(MemTable* mem_table, int32_t segment_id, MonotonicStopWatch timer; timer.start(); - size_t memory_usage = mem_table->memory_usage(); + size_t memory_usage = memtable_ptr->memory_usage(); int64_t flush_size; - Status s = _do_flush_memtable(mem_table, segment_id, &flush_size); + Status s = _do_flush_memtable(memtable_ptr.get(), segment_id, &flush_size); { std::shared_lock rdlk(_flush_status_lock); @@ -174,8 +196,7 @@ void FlushToken::_flush_memtable(MemTable* mem_table, int32_t segment_id, << ", mem size: " << memory_usage << ", disk size: " << flush_size; _stats.flush_time_ns += timer.elapsed_time(); _stats.flush_finish_count++; - _stats.flush_running_count--; - _stats.flush_size_bytes += mem_table->memory_usage(); + _stats.flush_size_bytes += memtable_ptr->memory_usage(); _stats.flush_disk_size_bytes += flush_size; } @@ -199,27 +220,25 @@ void MemTableFlushExecutor::init(const std::vector<DataDir*>& data_dirs) { // NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order. Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& flush_token, - RowsetWriter* rowset_writer, bool should_serial, + RowsetWriter* rowset_writer, bool is_high_priority) { if (!is_high_priority) { - if (rowset_writer->type() == BETA_ROWSET && !should_serial) { + if (rowset_writer->type() == BETA_ROWSET) { // beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer. - flush_token = std::make_unique<FlushToken>( - _flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT)); + flush_token = std::make_unique<FlushToken>(_flush_pool.get()); } else { // alpha rowset do not support flush in CONCURRENT. - flush_token = std::make_unique<FlushToken>( - _flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL)); + // and not support alpha rowset now. + return Status::InternalError<false>("not support alpha rowset load now."); } } else { - if (rowset_writer->type() == BETA_ROWSET && !should_serial) { + if (rowset_writer->type() == BETA_ROWSET) { // beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer. - flush_token = std::make_unique<FlushToken>( - _high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT)); + flush_token = std::make_unique<FlushToken>(_high_prio_flush_pool.get()); } else { // alpha rowset do not support flush in CONCURRENT. - flush_token = std::make_unique<FlushToken>( - _high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL)); + // and not support alpha rowset now. + return Status::InternalError<false>("not support alpha rowset load now."); } } flush_token->set_rowset_writer(rowset_writer); diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index a025bf43cf8..80983baa66f 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -38,7 +38,7 @@ class RowsetWriter; // use atomic because it may be updated by multi threads struct FlushStatistic { std::atomic_uint64_t flush_time_ns = 0; - std::atomic_uint64_t flush_running_count = 0; + std::atomic_int64_t flush_running_count = 0; std::atomic_uint64_t flush_finish_count = 0; std::atomic_uint64_t flush_size_bytes = 0; std::atomic_uint64_t flush_disk_size_bytes = 0; @@ -56,8 +56,8 @@ std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat); // because the entire job will definitely fail; class FlushToken { public: - explicit FlushToken(std::unique_ptr<ThreadPoolToken> flush_pool_token) - : _flush_token(std::move(flush_pool_token)), _flush_status(Status::OK()) {} + explicit FlushToken(ThreadPool* thread_pool) + : _flush_status(Status::OK()), _thread_pool(thread_pool) {} Status submit(std::unique_ptr<MemTable> mem_table); @@ -75,15 +75,19 @@ public: const MemTableStat& memtable_stat() { return _memtable_stat; } +private: + void _shutdown_flush_token() { _shutdown.store(true); } + bool _is_shutdown() { return _shutdown.load(); } + void _wait_running_task_finish(); + private: friend class MemtableFlushTask; - void _flush_memtable(MemTable* mem_table, int32_t segment_id, int64_t submit_task_time); + void _flush_memtable(std::unique_ptr<MemTable> memtable_ptr, int32_t segment_id, + int64_t submit_task_time); Status _do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size); - std::unique_ptr<ThreadPoolToken> _flush_token; - // Records the current flush status of the tablet. // Note: Once its value is set to Failed, it cannot return to SUCCESS. std::shared_mutex _flush_status_lock; @@ -94,6 +98,9 @@ private: RowsetWriter* _rowset_writer = nullptr; MemTableStat _memtable_stat; + + std::atomic<bool> _shutdown = false; + ThreadPool* _thread_pool = nullptr; }; // MemTableFlushExecutor is responsible for flushing memtables to disk. @@ -119,7 +126,7 @@ public: void init(const std::vector<DataDir*>& data_dirs); Status create_flush_token(std::unique_ptr<FlushToken>& flush_token, RowsetWriter* rowset_writer, - bool should_serial, bool is_high_priority); + bool is_high_priority); private: void _register_metrics(); diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp index c967cdbd483..9eb44af903f 100644 --- a/be/src/olap/memtable_writer.cpp +++ b/be/src/olap/memtable_writer.cpp @@ -76,9 +76,9 @@ Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer, // create flush handler // by assigning segment_id to memtable before submiting to flush executor, // we can make sure same keys sort in the same order in all replicas. - bool should_serial = false; - RETURN_IF_ERROR(StorageEngine::instance()->memtable_flush_executor()->create_flush_token( - _flush_token, _rowset_writer.get(), should_serial, _req.is_high_priority)); + RETURN_IF_ERROR( + ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->create_flush_token( + _flush_token, _rowset_writer.get(), _req.is_high_priority)); _is_init = true; return Status::OK(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org