This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 08897b7e03135d2e390f3fbba40d531e387e91ae
Author: wangbo <wan...@apache.org>
AuthorDate: Tue Jan 30 10:12:31 2024 +0800

    [Improvement](executor)Remove ThreadPoolToken from MemTableFlushExecutor 
#30529
---
 be/src/olap/memtable_flush_executor.cpp | 63 +++++++++++++++++++++------------
 be/src/olap/memtable_flush_executor.h   | 21 +++++++----
 be/src/olap/memtable_writer.cpp         |  6 ++--
 3 files changed, 58 insertions(+), 32 deletions(-)

diff --git a/be/src/olap/memtable_flush_executor.cpp 
b/be/src/olap/memtable_flush_executor.cpp
index 7190597b54c..60ef194aae5 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -56,8 +56,7 @@ public:
     ~MemtableFlushTask() override { g_flush_task_num << -1; }
 
     void run() override {
-        _flush_token->_flush_memtable(_memtable.get(), _segment_id, 
_submit_task_time);
-        _memtable.reset();
+        _flush_token->_flush_memtable(std::move(_memtable), _segment_id, 
_submit_task_time);
     }
 
 private:
@@ -94,16 +93,35 @@ Status FlushToken::submit(std::unique_ptr<MemTable> 
mem_table) {
     int64_t submit_task_time = MonotonicNanos();
     auto task = std::make_shared<MemtableFlushTask>(
             this, std::move(mem_table), _rowset_writer->allocate_segment_id(), 
submit_task_time);
-    _stats.flush_running_count++;
-    return _flush_token->submit(std::move(task));
+    Status ret = _thread_pool->submit(std::move(task));
+    if (ret.ok()) {
+        _stats.flush_running_count++;
+    }
+    return ret;
+}
+
+// NOTE: FlushToken's submit/cancel/wait run in one thread,
+// so we don't need to make them mutually exclusive, std::atomic is enough.
+void FlushToken::_wait_running_task_finish() {
+    while (true) {
+        int64_t flush_running_count = _stats.flush_running_count.load();
+        if (flush_running_count < 0) {
+            LOG(ERROR) << "flush_running_count < 0, this is not expected!";
+        }
+        if (flush_running_count == 0) {
+            break;
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(50));
+    }
 }
 
 void FlushToken::cancel() {
-    _flush_token->shutdown();
+    _shutdown_flush_token();
+    _wait_running_task_finish();
 }
 
 Status FlushToken::wait() {
-    _flush_token->wait();
+    _wait_running_task_finish();
     {
         std::shared_lock rdlk(_flush_status_lock);
         if (!_flush_status.ok()) {
@@ -134,8 +152,12 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, 
int32_t segment_id, in
     return Status::OK();
 }
 
-void FlushToken::_flush_memtable(MemTable* mem_table, int32_t segment_id,
+void FlushToken::_flush_memtable(std::unique_ptr<MemTable> memtable_ptr, 
int32_t segment_id,
                                  int64_t submit_task_time) {
+    Defer defer {[&]() { _stats.flush_running_count--; }};
+    if (_is_shutdown()) {
+        return;
+    }
     uint64_t flush_wait_time_ns = MonotonicNanos() - submit_task_time;
     _stats.flush_wait_time_ns += flush_wait_time_ns;
     // If previous flush has failed, return directly
@@ -148,10 +170,10 @@ void FlushToken::_flush_memtable(MemTable* mem_table, 
int32_t segment_id,
 
     MonotonicStopWatch timer;
     timer.start();
-    size_t memory_usage = mem_table->memory_usage();
+    size_t memory_usage = memtable_ptr->memory_usage();
 
     int64_t flush_size;
-    Status s = _do_flush_memtable(mem_table, segment_id, &flush_size);
+    Status s = _do_flush_memtable(memtable_ptr.get(), segment_id, &flush_size);
 
     {
         std::shared_lock rdlk(_flush_status_lock);
@@ -174,8 +196,7 @@ void FlushToken::_flush_memtable(MemTable* mem_table, 
int32_t segment_id,
                   << ", mem size: " << memory_usage << ", disk size: " << 
flush_size;
     _stats.flush_time_ns += timer.elapsed_time();
     _stats.flush_finish_count++;
-    _stats.flush_running_count--;
-    _stats.flush_size_bytes += mem_table->memory_usage();
+    _stats.flush_size_bytes += memtable_ptr->memory_usage();
     _stats.flush_disk_size_bytes += flush_size;
 }
 
@@ -199,27 +220,25 @@ void MemTableFlushExecutor::init(const 
std::vector<DataDir*>& data_dirs) {
 
 // NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are 
flushed in order.
 Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& 
flush_token,
-                                                 RowsetWriter* rowset_writer, 
bool should_serial,
+                                                 RowsetWriter* rowset_writer,
                                                  bool is_high_priority) {
     if (!is_high_priority) {
-        if (rowset_writer->type() == BETA_ROWSET && !should_serial) {
+        if (rowset_writer->type() == BETA_ROWSET) {
             // beta rowset can be flush in CONCURRENT, because each memtable 
using a new segment writer.
-            flush_token = std::make_unique<FlushToken>(
-                    
_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT));
+            flush_token = std::make_unique<FlushToken>(_flush_pool.get());
         } else {
             // alpha rowset do not support flush in CONCURRENT.
-            flush_token = std::make_unique<FlushToken>(
-                    _flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL));
+            // and not support alpha rowset now.
+            return Status::InternalError<false>("not support alpha rowset load 
now.");
         }
     } else {
-        if (rowset_writer->type() == BETA_ROWSET && !should_serial) {
+        if (rowset_writer->type() == BETA_ROWSET) {
             // beta rowset can be flush in CONCURRENT, because each memtable 
using a new segment writer.
-            flush_token = std::make_unique<FlushToken>(
-                    
_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT));
+            flush_token = 
std::make_unique<FlushToken>(_high_prio_flush_pool.get());
         } else {
             // alpha rowset do not support flush in CONCURRENT.
-            flush_token = std::make_unique<FlushToken>(
-                    
_high_prio_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL));
+            // and not support alpha rowset now.
+            return Status::InternalError<false>("not support alpha rowset load 
now.");
         }
     }
     flush_token->set_rowset_writer(rowset_writer);
diff --git a/be/src/olap/memtable_flush_executor.h 
b/be/src/olap/memtable_flush_executor.h
index a025bf43cf8..80983baa66f 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -38,7 +38,7 @@ class RowsetWriter;
 // use atomic because it may be updated by multi threads
 struct FlushStatistic {
     std::atomic_uint64_t flush_time_ns = 0;
-    std::atomic_uint64_t flush_running_count = 0;
+    std::atomic_int64_t flush_running_count = 0;
     std::atomic_uint64_t flush_finish_count = 0;
     std::atomic_uint64_t flush_size_bytes = 0;
     std::atomic_uint64_t flush_disk_size_bytes = 0;
@@ -56,8 +56,8 @@ std::ostream& operator<<(std::ostream& os, const 
FlushStatistic& stat);
 //    because the entire job will definitely fail;
 class FlushToken {
 public:
-    explicit FlushToken(std::unique_ptr<ThreadPoolToken> flush_pool_token)
-            : _flush_token(std::move(flush_pool_token)), 
_flush_status(Status::OK()) {}
+    explicit FlushToken(ThreadPool* thread_pool)
+            : _flush_status(Status::OK()), _thread_pool(thread_pool) {}
 
     Status submit(std::unique_ptr<MemTable> mem_table);
 
@@ -75,15 +75,19 @@ public:
 
     const MemTableStat& memtable_stat() { return _memtable_stat; }
 
+private:
+    void _shutdown_flush_token() { _shutdown.store(true); }
+    bool _is_shutdown() { return _shutdown.load(); }
+    void _wait_running_task_finish();
+
 private:
     friend class MemtableFlushTask;
 
-    void _flush_memtable(MemTable* mem_table, int32_t segment_id, int64_t 
submit_task_time);
+    void _flush_memtable(std::unique_ptr<MemTable> memtable_ptr, int32_t 
segment_id,
+                         int64_t submit_task_time);
 
     Status _do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* 
flush_size);
 
-    std::unique_ptr<ThreadPoolToken> _flush_token;
-
     // Records the current flush status of the tablet.
     // Note: Once its value is set to Failed, it cannot return to SUCCESS.
     std::shared_mutex _flush_status_lock;
@@ -94,6 +98,9 @@ private:
     RowsetWriter* _rowset_writer = nullptr;
 
     MemTableStat _memtable_stat;
+
+    std::atomic<bool> _shutdown = false;
+    ThreadPool* _thread_pool = nullptr;
 };
 
 // MemTableFlushExecutor is responsible for flushing memtables to disk.
@@ -119,7 +126,7 @@ public:
     void init(const std::vector<DataDir*>& data_dirs);
 
     Status create_flush_token(std::unique_ptr<FlushToken>& flush_token, 
RowsetWriter* rowset_writer,
-                              bool should_serial, bool is_high_priority);
+                              bool is_high_priority);
 
 private:
     void _register_metrics();
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index c967cdbd483..9eb44af903f 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -76,9 +76,9 @@ Status MemTableWriter::init(std::shared_ptr<RowsetWriter> 
rowset_writer,
     // create flush handler
     // by assigning segment_id to memtable before submiting to flush executor,
     // we can make sure same keys sort in the same order in all replicas.
-    bool should_serial = false;
-    
RETURN_IF_ERROR(StorageEngine::instance()->memtable_flush_executor()->create_flush_token(
-            _flush_token, _rowset_writer.get(), should_serial, 
_req.is_high_priority));
+    RETURN_IF_ERROR(
+            
ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->create_flush_token(
+                    _flush_token, _rowset_writer.get(), 
_req.is_high_priority));
 
     _is_init = true;
     return Status::OK();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to