This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a5b590873 [refactor-WIP](TaskWorkerPool) add CreateTableTaskPool 
class for CREATE_TABLE task (#19734)
6a5b590873 is described below

commit 6a5b590873312b743acabd1e0f0df4fab66e4240
Author: bobhan1 <68336428+bobh...@users.noreply.github.com>
AuthorDate: Thu May 18 11:43:09 2023 +0800

    [refactor-WIP](TaskWorkerPool) add CreateTableTaskPool class for 
CREATE_TABLE task (#19734)
---
 be/src/agent/agent_server.cpp     |   4 +-
 be/src/agent/task_worker_pool.cpp | 201 +++++++++++++++++++-------------------
 be/src/agent/task_worker_pool.h   |  14 ++-
 3 files changed, 115 insertions(+), 104 deletions(-)

diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index cb84e8b2e0..174baa0323 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -82,7 +82,9 @@ AgentServer::AgentServer(ExecEnv* exec_env, const 
TMasterInfo& master_info)
 #define CREATE_AND_START_THREAD(type, pool_name)
 #endif // BE_TEST
 
-    CREATE_AND_START_POOL(CREATE_TABLE, _create_tablet_workers);
+    _create_tablet_workers.reset(
+            new CreateTableTaskPool(exec_env, 
TaskWorkerPool::ThreadModel::MULTI_THREADS));
+    _create_tablet_workers->start();
     CREATE_AND_START_POOL(DROP_TABLE, _drop_tablet_workers);
 
     // Both PUSH and REALTIME_PUSH type use _push_workers
diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index aaa4222faa..1544071ae7 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -135,99 +135,98 @@ void TaskWorkerPool::start() {
     if (_thread_model == ThreadModel::SINGLE_THREAD) {
         _worker_count = 1;
     }
-    std::function<void()> cb;
     switch (_task_worker_type) {
     case TaskWorkerType::CREATE_TABLE:
-        _worker_count = config::create_tablet_worker_count;
-        cb = 
std::bind<void>(&TaskWorkerPool::_create_tablet_worker_thread_callback, this);
         break;
     case TaskWorkerType::DROP_TABLE:
         _worker_count = config::drop_tablet_worker_count;
-        cb = 
std::bind<void>(&TaskWorkerPool::_drop_tablet_worker_thread_callback, this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_drop_tablet_worker_thread_callback, this);
         break;
     case TaskWorkerType::PUSH:
     case TaskWorkerType::REALTIME_PUSH:
         _worker_count =
                 config::push_worker_count_normal_priority + 
config::push_worker_count_high_priority;
-        cb = std::bind<void>(&TaskWorkerPool::_push_worker_thread_callback, 
this);
+        _cb = std::bind<void>(&TaskWorkerPool::_push_worker_thread_callback, 
this);
         break;
     case TaskWorkerType::PUBLISH_VERSION:
         _worker_count = config::publish_version_worker_count;
-        cb = 
std::bind<void>(&TaskWorkerPool::_publish_version_worker_thread_callback, this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_publish_version_worker_thread_callback, this);
         break;
     case TaskWorkerType::CLEAR_TRANSACTION_TASK:
         _worker_count = config::clear_transaction_task_worker_count;
-        cb = 
std::bind<void>(&TaskWorkerPool::_clear_transaction_task_worker_thread_callback,
 this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_clear_transaction_task_worker_thread_callback,
+                              this);
         break;
     case TaskWorkerType::DELETE:
         _worker_count = config::delete_worker_count;
-        cb = std::bind<void>(&TaskWorkerPool::_push_worker_thread_callback, 
this);
+        _cb = std::bind<void>(&TaskWorkerPool::_push_worker_thread_callback, 
this);
         break;
     case TaskWorkerType::ALTER_TABLE:
         _worker_count = config::alter_tablet_worker_count;
-        cb = 
std::bind<void>(&TaskWorkerPool::_alter_tablet_worker_thread_callback, this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_alter_tablet_worker_thread_callback, this);
         break;
     case TaskWorkerType::ALTER_INVERTED_INDEX:
         _worker_count = config::alter_inverted_index_worker_count;
-        cb = 
std::bind<void>(&TaskWorkerPool::_alter_inverted_index_worker_thread_callback, 
this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_alter_inverted_index_worker_thread_callback, 
this);
         break;
     case TaskWorkerType::CLONE:
         _worker_count = config::clone_worker_count;
-        cb = std::bind<void>(&TaskWorkerPool::_clone_worker_thread_callback, 
this);
+        _cb = std::bind<void>(&TaskWorkerPool::_clone_worker_thread_callback, 
this);
         break;
     case TaskWorkerType::STORAGE_MEDIUM_MIGRATE:
         _worker_count = config::storage_medium_migrate_count;
-        cb = 
std::bind<void>(&TaskWorkerPool::_storage_medium_migrate_worker_thread_callback,
 this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_storage_medium_migrate_worker_thread_callback,
+                              this);
         break;
     case TaskWorkerType::CHECK_CONSISTENCY:
         _worker_count = config::check_consistency_worker_count;
-        cb = 
std::bind<void>(&TaskWorkerPool::_check_consistency_worker_thread_callback, 
this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_check_consistency_worker_thread_callback, 
this);
         break;
     case TaskWorkerType::REPORT_TASK:
-        cb = 
std::bind<void>(&TaskWorkerPool::_report_task_worker_thread_callback, this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_report_task_worker_thread_callback, this);
         break;
     case TaskWorkerType::REPORT_DISK_STATE:
-        cb = 
std::bind<void>(&TaskWorkerPool::_report_disk_state_worker_thread_callback, 
this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_report_disk_state_worker_thread_callback, 
this);
         break;
     case TaskWorkerType::REPORT_OLAP_TABLE:
-        cb = 
std::bind<void>(&TaskWorkerPool::_report_tablet_worker_thread_callback, this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_report_tablet_worker_thread_callback, this);
         break;
     case TaskWorkerType::UPLOAD:
         _worker_count = config::upload_worker_count;
-        cb = std::bind<void>(&TaskWorkerPool::_upload_worker_thread_callback, 
this);
+        _cb = std::bind<void>(&TaskWorkerPool::_upload_worker_thread_callback, 
this);
         break;
     case TaskWorkerType::DOWNLOAD:
         _worker_count = config::download_worker_count;
-        cb = 
std::bind<void>(&TaskWorkerPool::_download_worker_thread_callback, this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_download_worker_thread_callback, this);
         break;
     case TaskWorkerType::MAKE_SNAPSHOT:
         _worker_count = config::make_snapshot_worker_count;
-        cb = std::bind<void>(&TaskWorkerPool::_make_snapshot_thread_callback, 
this);
+        _cb = std::bind<void>(&TaskWorkerPool::_make_snapshot_thread_callback, 
this);
         break;
     case TaskWorkerType::RELEASE_SNAPSHOT:
         _worker_count = config::release_snapshot_worker_count;
-        cb = 
std::bind<void>(&TaskWorkerPool::_release_snapshot_thread_callback, this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_release_snapshot_thread_callback, this);
         break;
     case TaskWorkerType::MOVE:
         _worker_count = 1;
-        cb = std::bind<void>(&TaskWorkerPool::_move_dir_thread_callback, this);
+        _cb = std::bind<void>(&TaskWorkerPool::_move_dir_thread_callback, 
this);
         break;
     case TaskWorkerType::UPDATE_TABLET_META_INFO:
         _worker_count = 1;
-        cb = 
std::bind<void>(&TaskWorkerPool::_update_tablet_meta_worker_thread_callback, 
this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_update_tablet_meta_worker_thread_callback, 
this);
         break;
     case TaskWorkerType::SUBMIT_TABLE_COMPACTION:
         _worker_count = 1;
-        cb = 
std::bind<void>(&TaskWorkerPool::_submit_table_compaction_worker_thread_callback,
-                             this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_submit_table_compaction_worker_thread_callback,
+                              this);
         break;
     case TaskWorkerType::PUSH_STORAGE_POLICY:
         _worker_count = 1;
-        cb = 
std::bind<void>(&TaskWorkerPool::_push_storage_policy_worker_thread_callback, 
this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_push_storage_policy_worker_thread_callback, 
this);
         break;
     case TaskWorkerType::PUSH_COOLDOWN_CONF:
         _worker_count = 1;
-        cb = 
std::bind<void>(&TaskWorkerPool::_push_cooldown_conf_worker_thread_callback, 
this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_push_cooldown_conf_worker_thread_callback, 
this);
         break;
     default:
         // pass
@@ -242,7 +241,7 @@ void TaskWorkerPool::start() {
             .build(&_thread_pool);
 
     for (int i = 0; i < _worker_count; i++) {
-        auto st = _thread_pool->submit_func(cb);
+        auto st = _thread_pool->submit_func(_cb);
         CHECK(st.ok()) << st;
     }
 #endif
@@ -368,80 +367,6 @@ uint32_t TaskWorkerPool::_get_next_task_index(int32_t 
thread_count,
     return index;
 }
 
-void TaskWorkerPool::_create_tablet_worker_thread_callback() {
-    while (_is_work) {
-        TAgentTaskRequest agent_task_req;
-        TCreateTabletReq create_tablet_req;
-        {
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            _worker_thread_condition_variable.wait(
-                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
-            if (!_is_work) {
-                return;
-            }
-
-            agent_task_req = _tasks.front();
-            create_tablet_req = agent_task_req.create_tablet_req;
-            _tasks.pop_front();
-        }
-
-        scoped_refptr<Trace> trace(new Trace);
-        MonotonicStopWatch watch;
-        watch.start();
-        SCOPED_CLEANUP({
-            if (watch.elapsed_time() / 1e9 > 
config::agent_task_trace_threshold_sec) {
-                LOG(WARNING) << "Trace:" << std::endl << 
trace->DumpToString(Trace::INCLUDE_ALL);
-            }
-        });
-        ADOPT_TRACE(trace.get());
-
-        DorisMetrics::instance()->create_tablet_requests_total->increment(1);
-        TRACE("start to create tablet $0", create_tablet_req.tablet_id);
-
-        std::vector<TTabletInfo> finish_tablet_infos;
-        VLOG_NOTICE << "create tablet: " << create_tablet_req;
-        Status status = 
_env->storage_engine()->create_tablet(create_tablet_req);
-        if (!status.ok()) {
-            
DorisMetrics::instance()->create_tablet_requests_failed->increment(1);
-            LOG_WARNING("failed to create tablet")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("tablet_id", create_tablet_req.tablet_id)
-                    .error(status);
-        } else {
-            ++_s_report_version;
-            // get path hash of the created tablet
-            TabletSharedPtr tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(
-                    create_tablet_req.tablet_id);
-            DCHECK(tablet != nullptr);
-            TTabletInfo tablet_info;
-            tablet_info.tablet_id = tablet->table_id();
-            tablet_info.schema_hash = tablet->schema_hash();
-            tablet_info.version = create_tablet_req.version;
-            // Useless but it is a required field in TTabletInfo
-            tablet_info.version_hash = 0;
-            tablet_info.row_count = 0;
-            tablet_info.data_size = 0;
-            tablet_info.__set_path_hash(tablet->data_dir()->path_hash());
-            tablet_info.__set_replica_id(tablet->replica_id());
-            finish_tablet_infos.push_back(tablet_info);
-            LOG_INFO("successfully create tablet")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("tablet_id", create_tablet_req.tablet_id);
-        }
-
-        TFinishTaskRequest finish_task_request;
-        finish_task_request.__set_finish_tablet_infos(finish_tablet_infos);
-        finish_task_request.__set_backend(BackendOptions::get_local_backend());
-        finish_task_request.__set_report_version(_s_report_version);
-        finish_task_request.__set_task_type(agent_task_req.task_type);
-        finish_task_request.__set_signature(agent_task_req.signature);
-        finish_task_request.__set_task_status(status.to_thrift());
-
-        _finish_task(finish_task_request);
-        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
-    }
-}
-
 void TaskWorkerPool::_drop_tablet_worker_thread_callback() {
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
@@ -1880,4 +1805,78 @@ void 
TaskWorkerPool::_push_cooldown_conf_worker_thread_callback() {
     }
 }
 
+CreateTableTaskPool::CreateTableTaskPool(ExecEnv* env, ThreadModel 
thread_model)
+        : TaskWorkerPool(TaskWorkerType::CREATE_TABLE, env, 
*env->master_info(), thread_model) {
+    _worker_count = config::create_tablet_worker_count;
+    _cb = [this]() { _create_tablet_worker_thread_callback(); };
+}
+
+void CreateTableTaskPool::_create_tablet_worker_thread_callback() {
+    while (_is_work) {
+        TAgentTaskRequest agent_task_req;
+        TCreateTabletReq create_tablet_req;
+        {
+            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
+            _worker_thread_condition_variable.wait(
+                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
+            if (!_is_work) {
+                return;
+            }
+            agent_task_req = _tasks.front();
+            create_tablet_req = agent_task_req.create_tablet_req;
+            _tasks.pop_front();
+        }
+        scoped_refptr<Trace> trace(new Trace);
+        MonotonicStopWatch watch;
+        watch.start();
+        SCOPED_CLEANUP({
+            if (watch.elapsed_time() / 1e9 > 
config::agent_task_trace_threshold_sec) {
+                LOG(WARNING) << "Trace:" << std::endl << 
trace->DumpToString(Trace::INCLUDE_ALL);
+            }
+        });
+        ADOPT_TRACE(trace.get());
+        DorisMetrics::instance()->create_tablet_requests_total->increment(1);
+        TRACE("start to create tablet $0", create_tablet_req.tablet_id);
+        std::vector<TTabletInfo> finish_tablet_infos;
+        VLOG_NOTICE << "create tablet: " << create_tablet_req;
+        Status status = 
_env->storage_engine()->create_tablet(create_tablet_req);
+        if (!status.ok()) {
+            
DorisMetrics::instance()->create_tablet_requests_failed->increment(1);
+            LOG_WARNING("failed to create tablet")
+                    .tag("signature", agent_task_req.signature)
+                    .tag("tablet_id", create_tablet_req.tablet_id)
+                    .error(status);
+        } else {
+            ++_s_report_version;
+            // get path hash of the created tablet
+            TabletSharedPtr tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(
+                    create_tablet_req.tablet_id);
+            DCHECK(tablet != nullptr);
+            TTabletInfo tablet_info;
+            tablet_info.tablet_id = tablet->table_id();
+            tablet_info.schema_hash = tablet->schema_hash();
+            tablet_info.version = create_tablet_req.version;
+            // Useless but it is a required field in TTabletInfo
+            tablet_info.version_hash = 0;
+            tablet_info.row_count = 0;
+            tablet_info.data_size = 0;
+            tablet_info.__set_path_hash(tablet->data_dir()->path_hash());
+            tablet_info.__set_replica_id(tablet->replica_id());
+            finish_tablet_infos.push_back(tablet_info);
+            LOG_INFO("successfully create tablet")
+                    .tag("signature", agent_task_req.signature)
+                    .tag("tablet_id", create_tablet_req.tablet_id);
+        }
+        TFinishTaskRequest finish_task_request;
+        finish_task_request.__set_finish_tablet_infos(finish_tablet_infos);
+        finish_task_request.__set_backend(BackendOptions::get_local_backend());
+        finish_task_request.__set_report_version(_s_report_version);
+        finish_task_request.__set_task_type(agent_task_req.task_type);
+        finish_task_request.__set_signature(agent_task_req.signature);
+        finish_task_request.__set_task_status(status.to_thrift());
+        _finish_task(finish_task_request);
+        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
+    }
+}
+
 } // namespace doris
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index 2f90f7b685..651a0a6873 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -178,7 +178,7 @@ public:
     // notify the worker. currently for task/disk/tablet report thread
     void notify_thread();
 
-private:
+protected:
     bool _register_task_info(const TTaskType::type task_type, int64_t 
signature);
     void _remove_task_info(const TTaskType::type task_type, int64_t signature);
     void _finish_task(const TFinishTaskRequest& finish_task_request);
@@ -228,7 +228,7 @@ private:
     // random sleep 1~second seconds
     void _random_sleep(int second);
 
-private:
+protected:
     std::string _name;
 
     // Reference to the ExecEnv::_master_info
@@ -255,6 +255,7 @@ private:
     // Always 1 when _thread_model is SINGLE_THREAD
     uint32_t _worker_count;
     TaskWorkerType _task_worker_type;
+    std::function<void()> _cb;
 
     static std::atomic_ulong _s_report_version;
 
@@ -263,4 +264,13 @@ private:
 
     DISALLOW_COPY_AND_ASSIGN(TaskWorkerPool);
 }; // class TaskWorkerPool
+
+class CreateTableTaskPool : public TaskWorkerPool {
+public:
+    CreateTableTaskPool(ExecEnv* env, ThreadModel thread_model);
+    void _create_tablet_worker_thread_callback();
+
+    DISALLOW_COPY_AND_ASSIGN(CreateTableTaskPool);
+};
+
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to