This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 6a5b590873 [refactor-WIP](TaskWorkerPool) add CreateTableTaskPool class for CREATE_TABLE task (#19734) 6a5b590873 is described below commit 6a5b590873312b743acabd1e0f0df4fab66e4240 Author: bobhan1 <68336428+bobh...@users.noreply.github.com> AuthorDate: Thu May 18 11:43:09 2023 +0800 [refactor-WIP](TaskWorkerPool) add CreateTableTaskPool class for CREATE_TABLE task (#19734) --- be/src/agent/agent_server.cpp | 4 +- be/src/agent/task_worker_pool.cpp | 201 +++++++++++++++++++------------------- be/src/agent/task_worker_pool.h | 14 ++- 3 files changed, 115 insertions(+), 104 deletions(-) diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp index cb84e8b2e0..174baa0323 100644 --- a/be/src/agent/agent_server.cpp +++ b/be/src/agent/agent_server.cpp @@ -82,7 +82,9 @@ AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info) #define CREATE_AND_START_THREAD(type, pool_name) #endif // BE_TEST - CREATE_AND_START_POOL(CREATE_TABLE, _create_tablet_workers); + _create_tablet_workers.reset( + new CreateTableTaskPool(exec_env, TaskWorkerPool::ThreadModel::MULTI_THREADS)); + _create_tablet_workers->start(); CREATE_AND_START_POOL(DROP_TABLE, _drop_tablet_workers); // Both PUSH and REALTIME_PUSH type use _push_workers diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index aaa4222faa..1544071ae7 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -135,99 +135,98 @@ void TaskWorkerPool::start() { if (_thread_model == ThreadModel::SINGLE_THREAD) { _worker_count = 1; } - std::function<void()> cb; switch (_task_worker_type) { case TaskWorkerType::CREATE_TABLE: - _worker_count = config::create_tablet_worker_count; - cb = std::bind<void>(&TaskWorkerPool::_create_tablet_worker_thread_callback, this); break; case TaskWorkerType::DROP_TABLE: _worker_count = config::drop_tablet_worker_count; - cb = std::bind<void>(&TaskWorkerPool::_drop_tablet_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_drop_tablet_worker_thread_callback, this); break; case TaskWorkerType::PUSH: case TaskWorkerType::REALTIME_PUSH: _worker_count = config::push_worker_count_normal_priority + config::push_worker_count_high_priority; - cb = std::bind<void>(&TaskWorkerPool::_push_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_push_worker_thread_callback, this); break; case TaskWorkerType::PUBLISH_VERSION: _worker_count = config::publish_version_worker_count; - cb = std::bind<void>(&TaskWorkerPool::_publish_version_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_publish_version_worker_thread_callback, this); break; case TaskWorkerType::CLEAR_TRANSACTION_TASK: _worker_count = config::clear_transaction_task_worker_count; - cb = std::bind<void>(&TaskWorkerPool::_clear_transaction_task_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_clear_transaction_task_worker_thread_callback, + this); break; case TaskWorkerType::DELETE: _worker_count = config::delete_worker_count; - cb = std::bind<void>(&TaskWorkerPool::_push_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_push_worker_thread_callback, this); break; case TaskWorkerType::ALTER_TABLE: _worker_count = config::alter_tablet_worker_count; - cb = std::bind<void>(&TaskWorkerPool::_alter_tablet_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_alter_tablet_worker_thread_callback, this); break; case TaskWorkerType::ALTER_INVERTED_INDEX: _worker_count = config::alter_inverted_index_worker_count; - cb = std::bind<void>(&TaskWorkerPool::_alter_inverted_index_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_alter_inverted_index_worker_thread_callback, this); break; case TaskWorkerType::CLONE: _worker_count = config::clone_worker_count; - cb = std::bind<void>(&TaskWorkerPool::_clone_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_clone_worker_thread_callback, this); break; case TaskWorkerType::STORAGE_MEDIUM_MIGRATE: _worker_count = config::storage_medium_migrate_count; - cb = std::bind<void>(&TaskWorkerPool::_storage_medium_migrate_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_storage_medium_migrate_worker_thread_callback, + this); break; case TaskWorkerType::CHECK_CONSISTENCY: _worker_count = config::check_consistency_worker_count; - cb = std::bind<void>(&TaskWorkerPool::_check_consistency_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_check_consistency_worker_thread_callback, this); break; case TaskWorkerType::REPORT_TASK: - cb = std::bind<void>(&TaskWorkerPool::_report_task_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_report_task_worker_thread_callback, this); break; case TaskWorkerType::REPORT_DISK_STATE: - cb = std::bind<void>(&TaskWorkerPool::_report_disk_state_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_report_disk_state_worker_thread_callback, this); break; case TaskWorkerType::REPORT_OLAP_TABLE: - cb = std::bind<void>(&TaskWorkerPool::_report_tablet_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_report_tablet_worker_thread_callback, this); break; case TaskWorkerType::UPLOAD: _worker_count = config::upload_worker_count; - cb = std::bind<void>(&TaskWorkerPool::_upload_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_upload_worker_thread_callback, this); break; case TaskWorkerType::DOWNLOAD: _worker_count = config::download_worker_count; - cb = std::bind<void>(&TaskWorkerPool::_download_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_download_worker_thread_callback, this); break; case TaskWorkerType::MAKE_SNAPSHOT: _worker_count = config::make_snapshot_worker_count; - cb = std::bind<void>(&TaskWorkerPool::_make_snapshot_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_make_snapshot_thread_callback, this); break; case TaskWorkerType::RELEASE_SNAPSHOT: _worker_count = config::release_snapshot_worker_count; - cb = std::bind<void>(&TaskWorkerPool::_release_snapshot_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_release_snapshot_thread_callback, this); break; case TaskWorkerType::MOVE: _worker_count = 1; - cb = std::bind<void>(&TaskWorkerPool::_move_dir_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_move_dir_thread_callback, this); break; case TaskWorkerType::UPDATE_TABLET_META_INFO: _worker_count = 1; - cb = std::bind<void>(&TaskWorkerPool::_update_tablet_meta_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_update_tablet_meta_worker_thread_callback, this); break; case TaskWorkerType::SUBMIT_TABLE_COMPACTION: _worker_count = 1; - cb = std::bind<void>(&TaskWorkerPool::_submit_table_compaction_worker_thread_callback, - this); + _cb = std::bind<void>(&TaskWorkerPool::_submit_table_compaction_worker_thread_callback, + this); break; case TaskWorkerType::PUSH_STORAGE_POLICY: _worker_count = 1; - cb = std::bind<void>(&TaskWorkerPool::_push_storage_policy_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_push_storage_policy_worker_thread_callback, this); break; case TaskWorkerType::PUSH_COOLDOWN_CONF: _worker_count = 1; - cb = std::bind<void>(&TaskWorkerPool::_push_cooldown_conf_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_push_cooldown_conf_worker_thread_callback, this); break; default: // pass @@ -242,7 +241,7 @@ void TaskWorkerPool::start() { .build(&_thread_pool); for (int i = 0; i < _worker_count; i++) { - auto st = _thread_pool->submit_func(cb); + auto st = _thread_pool->submit_func(_cb); CHECK(st.ok()) << st; } #endif @@ -368,80 +367,6 @@ uint32_t TaskWorkerPool::_get_next_task_index(int32_t thread_count, return index; } -void TaskWorkerPool::_create_tablet_worker_thread_callback() { - while (_is_work) { - TAgentTaskRequest agent_task_req; - TCreateTabletReq create_tablet_req; - { - std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); - _worker_thread_condition_variable.wait( - worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); - if (!_is_work) { - return; - } - - agent_task_req = _tasks.front(); - create_tablet_req = agent_task_req.create_tablet_req; - _tasks.pop_front(); - } - - scoped_refptr<Trace> trace(new Trace); - MonotonicStopWatch watch; - watch.start(); - SCOPED_CLEANUP({ - if (watch.elapsed_time() / 1e9 > config::agent_task_trace_threshold_sec) { - LOG(WARNING) << "Trace:" << std::endl << trace->DumpToString(Trace::INCLUDE_ALL); - } - }); - ADOPT_TRACE(trace.get()); - - DorisMetrics::instance()->create_tablet_requests_total->increment(1); - TRACE("start to create tablet $0", create_tablet_req.tablet_id); - - std::vector<TTabletInfo> finish_tablet_infos; - VLOG_NOTICE << "create tablet: " << create_tablet_req; - Status status = _env->storage_engine()->create_tablet(create_tablet_req); - if (!status.ok()) { - DorisMetrics::instance()->create_tablet_requests_failed->increment(1); - LOG_WARNING("failed to create tablet") - .tag("signature", agent_task_req.signature) - .tag("tablet_id", create_tablet_req.tablet_id) - .error(status); - } else { - ++_s_report_version; - // get path hash of the created tablet - TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet( - create_tablet_req.tablet_id); - DCHECK(tablet != nullptr); - TTabletInfo tablet_info; - tablet_info.tablet_id = tablet->table_id(); - tablet_info.schema_hash = tablet->schema_hash(); - tablet_info.version = create_tablet_req.version; - // Useless but it is a required field in TTabletInfo - tablet_info.version_hash = 0; - tablet_info.row_count = 0; - tablet_info.data_size = 0; - tablet_info.__set_path_hash(tablet->data_dir()->path_hash()); - tablet_info.__set_replica_id(tablet->replica_id()); - finish_tablet_infos.push_back(tablet_info); - LOG_INFO("successfully create tablet") - .tag("signature", agent_task_req.signature) - .tag("tablet_id", create_tablet_req.tablet_id); - } - - TFinishTaskRequest finish_task_request; - finish_task_request.__set_finish_tablet_infos(finish_tablet_infos); - finish_task_request.__set_backend(BackendOptions::get_local_backend()); - finish_task_request.__set_report_version(_s_report_version); - finish_task_request.__set_task_type(agent_task_req.task_type); - finish_task_request.__set_signature(agent_task_req.signature); - finish_task_request.__set_task_status(status.to_thrift()); - - _finish_task(finish_task_request); - _remove_task_info(agent_task_req.task_type, agent_task_req.signature); - } -} - void TaskWorkerPool::_drop_tablet_worker_thread_callback() { while (_is_work) { TAgentTaskRequest agent_task_req; @@ -1880,4 +1805,78 @@ void TaskWorkerPool::_push_cooldown_conf_worker_thread_callback() { } } +CreateTableTaskPool::CreateTableTaskPool(ExecEnv* env, ThreadModel thread_model) + : TaskWorkerPool(TaskWorkerType::CREATE_TABLE, env, *env->master_info(), thread_model) { + _worker_count = config::create_tablet_worker_count; + _cb = [this]() { _create_tablet_worker_thread_callback(); }; +} + +void CreateTableTaskPool::_create_tablet_worker_thread_callback() { + while (_is_work) { + TAgentTaskRequest agent_task_req; + TCreateTabletReq create_tablet_req; + { + std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); + _worker_thread_condition_variable.wait( + worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); + if (!_is_work) { + return; + } + agent_task_req = _tasks.front(); + create_tablet_req = agent_task_req.create_tablet_req; + _tasks.pop_front(); + } + scoped_refptr<Trace> trace(new Trace); + MonotonicStopWatch watch; + watch.start(); + SCOPED_CLEANUP({ + if (watch.elapsed_time() / 1e9 > config::agent_task_trace_threshold_sec) { + LOG(WARNING) << "Trace:" << std::endl << trace->DumpToString(Trace::INCLUDE_ALL); + } + }); + ADOPT_TRACE(trace.get()); + DorisMetrics::instance()->create_tablet_requests_total->increment(1); + TRACE("start to create tablet $0", create_tablet_req.tablet_id); + std::vector<TTabletInfo> finish_tablet_infos; + VLOG_NOTICE << "create tablet: " << create_tablet_req; + Status status = _env->storage_engine()->create_tablet(create_tablet_req); + if (!status.ok()) { + DorisMetrics::instance()->create_tablet_requests_failed->increment(1); + LOG_WARNING("failed to create tablet") + .tag("signature", agent_task_req.signature) + .tag("tablet_id", create_tablet_req.tablet_id) + .error(status); + } else { + ++_s_report_version; + // get path hash of the created tablet + TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet( + create_tablet_req.tablet_id); + DCHECK(tablet != nullptr); + TTabletInfo tablet_info; + tablet_info.tablet_id = tablet->table_id(); + tablet_info.schema_hash = tablet->schema_hash(); + tablet_info.version = create_tablet_req.version; + // Useless but it is a required field in TTabletInfo + tablet_info.version_hash = 0; + tablet_info.row_count = 0; + tablet_info.data_size = 0; + tablet_info.__set_path_hash(tablet->data_dir()->path_hash()); + tablet_info.__set_replica_id(tablet->replica_id()); + finish_tablet_infos.push_back(tablet_info); + LOG_INFO("successfully create tablet") + .tag("signature", agent_task_req.signature) + .tag("tablet_id", create_tablet_req.tablet_id); + } + TFinishTaskRequest finish_task_request; + finish_task_request.__set_finish_tablet_infos(finish_tablet_infos); + finish_task_request.__set_backend(BackendOptions::get_local_backend()); + finish_task_request.__set_report_version(_s_report_version); + finish_task_request.__set_task_type(agent_task_req.task_type); + finish_task_request.__set_signature(agent_task_req.signature); + finish_task_request.__set_task_status(status.to_thrift()); + _finish_task(finish_task_request); + _remove_task_info(agent_task_req.task_type, agent_task_req.signature); + } +} + } // namespace doris diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index 2f90f7b685..651a0a6873 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -178,7 +178,7 @@ public: // notify the worker. currently for task/disk/tablet report thread void notify_thread(); -private: +protected: bool _register_task_info(const TTaskType::type task_type, int64_t signature); void _remove_task_info(const TTaskType::type task_type, int64_t signature); void _finish_task(const TFinishTaskRequest& finish_task_request); @@ -228,7 +228,7 @@ private: // random sleep 1~second seconds void _random_sleep(int second); -private: +protected: std::string _name; // Reference to the ExecEnv::_master_info @@ -255,6 +255,7 @@ private: // Always 1 when _thread_model is SINGLE_THREAD uint32_t _worker_count; TaskWorkerType _task_worker_type; + std::function<void()> _cb; static std::atomic_ulong _s_report_version; @@ -263,4 +264,13 @@ private: DISALLOW_COPY_AND_ASSIGN(TaskWorkerPool); }; // class TaskWorkerPool + +class CreateTableTaskPool : public TaskWorkerPool { +public: + CreateTableTaskPool(ExecEnv* env, ThreadModel thread_model); + void _create_tablet_worker_thread_callback(); + + DISALLOW_COPY_AND_ASSIGN(CreateTableTaskPool); +}; + } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org