This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push: new 95cdeaeb24 [fix-branch-1.2-lts](TaskWorkerPool) Fix task worker pool and remove unnecessary copy (#23538) 95cdeaeb24 is described below commit 95cdeaeb240dfb4766016d9b9e8c7ee954af24cd Author: bobhan1 <bh2444151...@outlook.com> AuthorDate: Mon Aug 28 11:26:11 2023 +0800 [fix-branch-1.2-lts](TaskWorkerPool) Fix task worker pool and remove unnecessary copy (#23538) cherry-pick #19822 --- be/src/agent/agent_server.cpp | 17 +++- be/src/agent/agent_server.h | 4 +- be/src/agent/task_worker_pool.cpp | 202 ++++++++++++++++---------------------- be/src/agent/task_worker_pool.h | 22 +++-- 4 files changed, 115 insertions(+), 130 deletions(-) diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp index d57777b6e6..f925baa416 100644 --- a/be/src/agent/agent_server.cpp +++ b/be/src/agent/agent_server.cpp @@ -67,13 +67,20 @@ AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info) #define CREATE_AND_START_THREAD(type, pool_name) #endif // BE_TEST +#ifndef BE_TEST + // Both PUSH and REALTIME_PUSH type use _push_load_workers + _push_load_workers.reset(new PushTaskPool(exec_env, TaskWorkerPool::ThreadModel::MULTI_THREADS, + PushTaskPool::PushWokerType::LOAD_V2)); + _push_load_workers->start(); + _push_delete_workers.reset(new PushTaskPool(exec_env, + TaskWorkerPool::ThreadModel::MULTI_THREADS, + PushTaskPool::PushWokerType::DELETE)); + _push_delete_workers->start(); +#endif CREATE_AND_START_POOL(CREATE_TABLE, _create_tablet_workers); CREATE_AND_START_POOL(DROP_TABLE, _drop_tablet_workers); - // Both PUSH and REALTIME_PUSH type use _push_workers - CREATE_AND_START_POOL(PUSH, _push_workers); CREATE_AND_START_POOL(PUBLISH_VERSION, _publish_version_workers); CREATE_AND_START_POOL(CLEAR_TRANSACTION_TASK, _clear_transaction_task_workers); - CREATE_AND_START_POOL(DELETE, _delete_workers); CREATE_AND_START_POOL(ALTER_TABLE, _alter_tablet_workers); CREATE_AND_START_POOL(CLONE, _clone_workers); CREATE_AND_START_POOL(STORAGE_MEDIUM_MIGRATE, _storage_medium_migrate_workers); @@ -165,9 +172,9 @@ void AgentServer::submit_tasks(TAgentResult& agent_result, } if (task.push_req.push_type == TPushType::LOAD || task.push_req.push_type == TPushType::LOAD_V2) { - _push_workers->submit_task(task); + _push_load_workers->submit_task(task); } else if (task.push_req.push_type == TPushType::DELETE) { - _delete_workers->submit_task(task); + _push_delete_workers->submit_task(task); } else { ret_st = Status::InvalidArgument( "task(signature={}, type={}, push_type={}) has wrong push_type", signature, diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h index bfb819dd97..32ae5ad8ee 100644 --- a/be/src/agent/agent_server.h +++ b/be/src/agent/agent_server.h @@ -55,10 +55,10 @@ private: std::unique_ptr<TaskWorkerPool> _create_tablet_workers; std::unique_ptr<TaskWorkerPool> _drop_tablet_workers; - std::unique_ptr<TaskWorkerPool> _push_workers; + std::unique_ptr<TaskWorkerPool> _push_load_workers; std::unique_ptr<TaskWorkerPool> _publish_version_workers; std::unique_ptr<TaskWorkerPool> _clear_transaction_task_workers; - std::unique_ptr<TaskWorkerPool> _delete_workers; + std::unique_ptr<TaskWorkerPool> _push_delete_workers; std::unique_ptr<TaskWorkerPool> _alter_tablet_workers; std::unique_ptr<TaskWorkerPool> _clone_workers; std::unique_ptr<TaskWorkerPool> _storage_medium_migrate_workers; diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index bdccdaf7e5..c95b782d3b 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -113,96 +113,92 @@ void TaskWorkerPool::start() { if (_thread_model == ThreadModel::SINGLE_THREAD) { _worker_count = 1; } - std::function<void()> cb; switch (_task_worker_type) { case TaskWorkerType::CREATE_TABLE: _worker_count = config::create_tablet_worker_count; - cb = std::bind<void>(&TaskWorkerPool::_create_tablet_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_create_tablet_worker_thread_callback, this); break; case TaskWorkerType::DROP_TABLE: _worker_count = config::drop_tablet_worker_count; - cb = std::bind<void>(&TaskWorkerPool::_drop_tablet_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_drop_tablet_worker_thread_callback, this); break; case TaskWorkerType::PUSH: case TaskWorkerType::REALTIME_PUSH: - _worker_count = - config::push_worker_count_normal_priority + config::push_worker_count_high_priority; - cb = std::bind<void>(&TaskWorkerPool::_push_worker_thread_callback, this); + break; + case TaskWorkerType::DELETE: break; case TaskWorkerType::PUBLISH_VERSION: _worker_count = config::publish_version_worker_count; - cb = std::bind<void>(&TaskWorkerPool::_publish_version_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_publish_version_worker_thread_callback, this); break; case TaskWorkerType::CLEAR_TRANSACTION_TASK: _worker_count = config::clear_transaction_task_worker_count; - cb = std::bind<void>(&TaskWorkerPool::_clear_transaction_task_worker_thread_callback, this); - break; - case TaskWorkerType::DELETE: - _worker_count = config::delete_worker_count; - cb = std::bind<void>(&TaskWorkerPool::_push_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_clear_transaction_task_worker_thread_callback, + this); break; case TaskWorkerType::ALTER_TABLE: _worker_count = config::alter_tablet_worker_count; - cb = std::bind<void>(&TaskWorkerPool::_alter_tablet_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_alter_tablet_worker_thread_callback, this); break; case TaskWorkerType::CLONE: _worker_count = config::clone_worker_count; - cb = std::bind<void>(&TaskWorkerPool::_clone_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_clone_worker_thread_callback, this); break; case TaskWorkerType::STORAGE_MEDIUM_MIGRATE: _worker_count = config::storage_medium_migrate_count; - cb = std::bind<void>(&TaskWorkerPool::_storage_medium_migrate_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_storage_medium_migrate_worker_thread_callback, + this); break; case TaskWorkerType::CHECK_CONSISTENCY: _worker_count = config::check_consistency_worker_count; - cb = std::bind<void>(&TaskWorkerPool::_check_consistency_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_check_consistency_worker_thread_callback, this); break; case TaskWorkerType::REPORT_TASK: - cb = std::bind<void>(&TaskWorkerPool::_report_task_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_report_task_worker_thread_callback, this); break; case TaskWorkerType::REPORT_DISK_STATE: - cb = std::bind<void>(&TaskWorkerPool::_report_disk_state_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_report_disk_state_worker_thread_callback, this); break; case TaskWorkerType::REPORT_OLAP_TABLE: - cb = std::bind<void>(&TaskWorkerPool::_report_tablet_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_report_tablet_worker_thread_callback, this); break; case TaskWorkerType::UPLOAD: _worker_count = config::upload_worker_count; - cb = std::bind<void>(&TaskWorkerPool::_upload_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_upload_worker_thread_callback, this); break; case TaskWorkerType::DOWNLOAD: _worker_count = config::download_worker_count; - cb = std::bind<void>(&TaskWorkerPool::_download_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_download_worker_thread_callback, this); break; case TaskWorkerType::MAKE_SNAPSHOT: _worker_count = config::make_snapshot_worker_count; - cb = std::bind<void>(&TaskWorkerPool::_make_snapshot_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_make_snapshot_thread_callback, this); break; case TaskWorkerType::RELEASE_SNAPSHOT: _worker_count = config::release_snapshot_worker_count; - cb = std::bind<void>(&TaskWorkerPool::_release_snapshot_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_release_snapshot_thread_callback, this); break; case TaskWorkerType::MOVE: _worker_count = 1; - cb = std::bind<void>(&TaskWorkerPool::_move_dir_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_move_dir_thread_callback, this); break; case TaskWorkerType::UPDATE_TABLET_META_INFO: _worker_count = 1; - cb = std::bind<void>(&TaskWorkerPool::_update_tablet_meta_worker_thread_callback, this); + _cb = std::bind<void>(&TaskWorkerPool::_update_tablet_meta_worker_thread_callback, this); break; case TaskWorkerType::SUBMIT_TABLE_COMPACTION: _worker_count = 1; - cb = std::bind<void>(&TaskWorkerPool::_submit_table_compaction_worker_thread_callback, - this); + _cb = std::bind<void>(&TaskWorkerPool::_submit_table_compaction_worker_thread_callback, + this); break; case TaskWorkerType::REFRESH_STORAGE_POLICY: - cb = std::bind<void>( + _cb = std::bind<void>( &TaskWorkerPool::_storage_refresh_storage_policy_worker_thread_callback, this); break; case TaskWorkerType::UPDATE_STORAGE_POLICY: _worker_count = 1; - cb = std::bind<void>(&TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback, - this); + _cb = std::bind<void>( + &TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback, this); break; default: // pass @@ -217,7 +213,7 @@ void TaskWorkerPool::start() { .build(&_thread_pool); for (int i = 0; i < _worker_count; i++) { - auto st = _thread_pool->submit_func(cb); + auto st = _thread_pool->submit_func(_cb); CHECK(st.ok()) << st; } #endif @@ -310,36 +306,9 @@ void TaskWorkerPool::_finish_task(const TFinishTaskRequest& finish_task_request) TRACE("finish task"); } -uint32_t TaskWorkerPool::_get_next_task_index(int32_t thread_count, - std::deque<TAgentTaskRequest>& tasks, - TPriority::type priority) { - int32_t index = -1; - std::deque<TAgentTaskRequest>::size_type task_count = tasks.size(); - for (uint32_t i = 0; i < task_count; ++i) { - TAgentTaskRequest task = tasks[i]; - if (priority == TPriority::HIGH) { - if (task.__isset.priority && task.priority == TPriority::HIGH) { - index = i; - break; - } - } - } - - if (index == -1) { - if (priority == TPriority::HIGH) { - return index; - } - - index = 0; - } - - return index; -} - void TaskWorkerPool::_create_tablet_worker_thread_callback() { while (_is_work) { TAgentTaskRequest agent_task_req; - TCreateTabletReq create_tablet_req; { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); _worker_thread_condition_variable.wait( @@ -349,10 +318,9 @@ void TaskWorkerPool::_create_tablet_worker_thread_callback() { } agent_task_req = _tasks.front(); - create_tablet_req = agent_task_req.create_tablet_req; _tasks.pop_front(); } - + const TCreateTabletReq& create_tablet_req = agent_task_req.create_tablet_req; scoped_refptr<Trace> trace(new Trace); MonotonicStopWatch watch; watch.start(); @@ -413,7 +381,6 @@ void TaskWorkerPool::_create_tablet_worker_thread_callback() { void TaskWorkerPool::_drop_tablet_worker_thread_callback() { while (_is_work) { TAgentTaskRequest agent_task_req; - TDropTabletReq drop_tablet_req; { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); _worker_thread_condition_variable.wait( @@ -423,10 +390,9 @@ void TaskWorkerPool::_drop_tablet_worker_thread_callback() { } agent_task_req = _tasks.front(); - drop_tablet_req = agent_task_req.drop_tablet_req; _tasks.pop_front(); } - + const TDropTabletReq& drop_tablet_req = agent_task_req.drop_tablet_req; Status status; TabletSharedPtr dropped_tablet = StorageEngine::instance()->tablet_manager()->get_tablet( drop_tablet_req.tablet_id, false); @@ -571,12 +537,27 @@ void TaskWorkerPool::_alter_tablet(const TAgentTaskRequest& agent_task_req, int6 finish_task_request->__set_task_status(status.to_thrift()); } -void TaskWorkerPool::_push_worker_thread_callback() { +PushTaskPool::PushTaskPool(ExecEnv* env, ThreadModel thread_model, PushWokerType type) + : TaskWorkerPool( + type == PushWokerType::LOAD_V2 ? TaskWorkerType::PUSH : TaskWorkerType::DELETE, + env, *env->master_info(), thread_model), + _push_worker_type(type) { + if (_push_worker_type == PushWokerType::LOAD_V2) { + _worker_count = + config::push_worker_count_normal_priority + config::push_worker_count_high_priority; + + } else { + _worker_count = config::delete_worker_count; + } + _cb = [this]() { _push_worker_thread_callback(); }; +} + +void PushTaskPool::_push_worker_thread_callback() { // gen high priority worker thread TPriority::type priority = TPriority::NORMAL; int32_t push_worker_count_high_priority = config::push_worker_count_high_priority; - static uint32_t s_worker_count = 0; - { + if (_push_worker_type == PushWokerType::LOAD_V2) { + static uint32_t s_worker_count = 0; std::lock_guard<std::mutex> worker_thread_lock(_worker_thread_lock); if (s_worker_count < push_worker_count_high_priority) { ++s_worker_count; @@ -586,9 +567,7 @@ void TaskWorkerPool::_push_worker_thread_callback() { while (_is_work) { TAgentTaskRequest agent_task_req; - TPushReq push_req; - int32_t index = 0; - do { + { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); _worker_thread_condition_variable.wait( worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); }); @@ -596,26 +575,26 @@ void TaskWorkerPool::_push_worker_thread_callback() { return; } - index = _get_next_task_index(config::push_worker_count_normal_priority + - config::push_worker_count_high_priority, - _tasks, priority); - - if (index < 0) { - // there is no high priority task. notify other thread to handle normal task - _worker_thread_condition_variable.notify_all(); - break; + if (priority == TPriority::HIGH) { + const auto it = std::find_if( + _tasks.cbegin(), _tasks.cend(), [](const TAgentTaskRequest& req) { + return req.__isset.priority && req.priority == TPriority::HIGH; + }); + + if (it == _tasks.cend()) { + // there is no high priority task. notify other thread to handle normal task + _worker_thread_condition_variable.notify_all(); + sleep(1); + continue; + } + agent_task_req = std::move(*it); + _tasks.erase(it); + } else { + agent_task_req = std::move(_tasks.front()); + _tasks.pop_front(); } - - agent_task_req = _tasks[index]; - push_req = agent_task_req.push_req; - _tasks.erase(_tasks.begin() + index); - } while (false); - - if (index < 0) { - // there is no high priority task in queue - sleep(1); - continue; } + TPushReq& push_req = agent_task_req.push_req; LOG(INFO) << "get push task. signature=" << agent_task_req.signature << ", priority=" << priority << " push_type=" << push_req.push_type; @@ -658,7 +637,6 @@ void TaskWorkerPool::_push_worker_thread_callback() { void TaskWorkerPool::_publish_version_worker_thread_callback() { while (_is_work) { TAgentTaskRequest agent_task_req; - TPublishVersionRequest publish_version_req; { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); _worker_thread_condition_variable.wait( @@ -668,10 +646,9 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() { } agent_task_req = _tasks.front(); - publish_version_req = agent_task_req.publish_version_req; _tasks.pop_front(); } - + TPublishVersionRequest& publish_version_req = agent_task_req.publish_version_req; DorisMetrics::instance()->publish_task_request_total->increment(1); VLOG_NOTICE << "get publish version task. signature=" << agent_task_req.signature; @@ -769,7 +746,6 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() { void TaskWorkerPool::_clear_transaction_task_worker_thread_callback() { while (_is_work) { TAgentTaskRequest agent_task_req; - TClearTransactionTaskRequest clear_transaction_task_req; { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); _worker_thread_condition_variable.wait( @@ -779,9 +755,10 @@ void TaskWorkerPool::_clear_transaction_task_worker_thread_callback() { } agent_task_req = _tasks.front(); - clear_transaction_task_req = agent_task_req.clear_transaction_task_req; _tasks.pop_front(); } + const TClearTransactionTaskRequest& clear_transaction_task_req = + agent_task_req.clear_transaction_task_req; LOG(INFO) << "get clear transaction task. signature=" << agent_task_req.signature << ", transaction_id=" << clear_transaction_task_req.transaction_id << ", partition_id_size=" << clear_transaction_task_req.partition_id.size(); @@ -821,7 +798,6 @@ void TaskWorkerPool::_clear_transaction_task_worker_thread_callback() { void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() { while (_is_work) { TAgentTaskRequest agent_task_req; - TUpdateTabletMetaInfoReq update_tablet_meta_req; { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); _worker_thread_condition_variable.wait( @@ -831,9 +807,10 @@ void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() { } agent_task_req = _tasks.front(); - update_tablet_meta_req = agent_task_req.update_tablet_meta_info_req; _tasks.pop_front(); } + const TUpdateTabletMetaInfoReq& update_tablet_meta_req = + agent_task_req.update_tablet_meta_info_req; LOG(INFO) << "get update tablet meta task. signature=" << agent_task_req.signature; Status status; @@ -895,8 +872,6 @@ void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() { void TaskWorkerPool::_clone_worker_thread_callback() { while (_is_work) { TAgentTaskRequest agent_task_req; - TCloneReq clone_req; - { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); _worker_thread_condition_variable.wait( @@ -906,9 +881,9 @@ void TaskWorkerPool::_clone_worker_thread_callback() { } agent_task_req = _tasks.front(); - clone_req = agent_task_req.clone_req; _tasks.pop_front(); } + const TCloneReq& clone_req = agent_task_req.clone_req; DorisMetrics::instance()->clone_requests_total->increment(1); LOG(INFO) << "get clone task. signature=" << agent_task_req.signature; @@ -945,7 +920,6 @@ void TaskWorkerPool::_clone_worker_thread_callback() { void TaskWorkerPool::_storage_medium_migrate_worker_thread_callback() { while (_is_work) { TAgentTaskRequest agent_task_req; - TStorageMediumMigrateReq storage_medium_migrate_req; { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); _worker_thread_condition_variable.wait( @@ -955,9 +929,10 @@ void TaskWorkerPool::_storage_medium_migrate_worker_thread_callback() { } agent_task_req = _tasks.front(); - storage_medium_migrate_req = agent_task_req.storage_medium_migrate_req; _tasks.pop_front(); } + const TStorageMediumMigrateReq& storage_medium_migrate_req = + agent_task_req.storage_medium_migrate_req; // check request and get info TabletSharedPtr tablet; @@ -1046,7 +1021,6 @@ Status TaskWorkerPool::_check_migrate_request(const TStorageMediumMigrateReq& re void TaskWorkerPool::_check_consistency_worker_thread_callback() { while (_is_work) { TAgentTaskRequest agent_task_req; - TCheckConsistencyReq check_consistency_req; { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); _worker_thread_condition_variable.wait( @@ -1056,9 +1030,9 @@ void TaskWorkerPool::_check_consistency_worker_thread_callback() { } agent_task_req = _tasks.front(); - check_consistency_req = agent_task_req.check_consistency_req; _tasks.pop_front(); } + const TCheckConsistencyReq& check_consistency_req = agent_task_req.check_consistency_req; uint32_t checksum = 0; EngineChecksumTask engine_task(check_consistency_req.tablet_id, @@ -1239,7 +1213,6 @@ void TaskWorkerPool::_report_tablet_worker_thread_callback() { void TaskWorkerPool::_upload_worker_thread_callback() { while (_is_work) { TAgentTaskRequest agent_task_req; - TUploadReq upload_request; { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); _worker_thread_condition_variable.wait( @@ -1249,9 +1222,9 @@ void TaskWorkerPool::_upload_worker_thread_callback() { } agent_task_req = _tasks.front(); - upload_request = agent_task_req.upload_req; _tasks.pop_front(); } + const TUploadReq& upload_request = agent_task_req.upload_req; LOG(INFO) << "get upload task. signature=" << agent_task_req.signature << ", job_id=" << upload_request.job_id; @@ -1290,7 +1263,6 @@ void TaskWorkerPool::_upload_worker_thread_callback() { void TaskWorkerPool::_download_worker_thread_callback() { while (_is_work) { TAgentTaskRequest agent_task_req; - TDownloadReq download_request; { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); _worker_thread_condition_variable.wait( @@ -1300,9 +1272,9 @@ void TaskWorkerPool::_download_worker_thread_callback() { } agent_task_req = _tasks.front(); - download_request = agent_task_req.download_req; _tasks.pop_front(); } + const TDownloadReq& download_request = agent_task_req.download_req; LOG(INFO) << "get download task. signature=" << agent_task_req.signature << ", job_id=" << download_request.job_id; @@ -1342,7 +1314,6 @@ void TaskWorkerPool::_download_worker_thread_callback() { void TaskWorkerPool::_make_snapshot_thread_callback() { while (_is_work) { TAgentTaskRequest agent_task_req; - TSnapshotRequest snapshot_request; { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); _worker_thread_condition_variable.wait( @@ -1352,9 +1323,9 @@ void TaskWorkerPool::_make_snapshot_thread_callback() { } agent_task_req = _tasks.front(); - snapshot_request = agent_task_req.snapshot_req; _tasks.pop_front(); } + const TSnapshotRequest& snapshot_request = agent_task_req.snapshot_req; LOG(INFO) << "get snapshot task. signature=" << agent_task_req.signature; string snapshot_path; @@ -1401,7 +1372,6 @@ void TaskWorkerPool::_make_snapshot_thread_callback() { void TaskWorkerPool::_release_snapshot_thread_callback() { while (_is_work) { TAgentTaskRequest agent_task_req; - TReleaseSnapshotRequest release_snapshot_request; { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); _worker_thread_condition_variable.wait( @@ -1411,12 +1381,13 @@ void TaskWorkerPool::_release_snapshot_thread_callback() { } agent_task_req = _tasks.front(); - release_snapshot_request = agent_task_req.release_snapshot_req; _tasks.pop_front(); } + const TReleaseSnapshotRequest& release_snapshot_request = + agent_task_req.release_snapshot_req; LOG(INFO) << "get release snapshot task. signature=" << agent_task_req.signature; - string& snapshot_path = release_snapshot_request.snapshot_path; + const string& snapshot_path = release_snapshot_request.snapshot_path; Status status = SnapshotManager::instance()->release_snapshot(snapshot_path); if (!status.ok()) { LOG_WARNING("failed to release snapshot") @@ -1450,7 +1421,6 @@ Status TaskWorkerPool::_get_tablet_info(const TTabletId tablet_id, const TSchema void TaskWorkerPool::_move_dir_thread_callback() { while (_is_work) { TAgentTaskRequest agent_task_req; - TMoveDirReq move_dir_req; { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); _worker_thread_condition_variable.wait( @@ -1460,9 +1430,9 @@ void TaskWorkerPool::_move_dir_thread_callback() { } agent_task_req = _tasks.front(); - move_dir_req = agent_task_req.move_dir_req; _tasks.pop_front(); } + const TMoveDirReq& move_dir_req = agent_task_req.move_dir_req; LOG(INFO) << "get move dir task. signature=" << agent_task_req.signature << ", job_id=" << move_dir_req.job_id; Status status = _move_dir(move_dir_req.tablet_id, move_dir_req.src, move_dir_req.job_id, @@ -1556,8 +1526,6 @@ void TaskWorkerPool::_random_sleep(int second) { void TaskWorkerPool::_submit_table_compaction_worker_thread_callback() { while (_is_work) { TAgentTaskRequest agent_task_req; - TCompactionReq compaction_req; - { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); _worker_thread_condition_variable.wait( @@ -1567,9 +1535,9 @@ void TaskWorkerPool::_submit_table_compaction_worker_thread_callback() { } agent_task_req = _tasks.front(); - compaction_req = agent_task_req.compaction_req; _tasks.pop_front(); } + const TCompactionReq& compaction_req = agent_task_req.compaction_req; LOG(INFO) << "get compaction task. signature=" << agent_task_req.signature << ", compaction_type=" << compaction_req.type; @@ -1658,7 +1626,6 @@ void TaskWorkerPool::_storage_refresh_storage_policy_worker_thread_callback() { void TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback() { while (_is_work) { TAgentTaskRequest agent_task_req; - TGetStoragePolicy get_storage_policy_req; { std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock); _worker_thread_condition_variable.wait( @@ -1668,9 +1635,10 @@ void TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback() { } agent_task_req = _tasks.front(); - get_storage_policy_req = agent_task_req.update_policy; + _tasks.pop_front(); } + const TGetStoragePolicy& get_storage_policy_req = agent_task_req.update_policy; StoragePolicyMgr* spm = ExecEnv::GetInstance()->storage_policy_mgr(); auto policy_ptr = std::make_shared<StoragePolicy>(); diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index ae8b9e8149..f81b5c0b31 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -164,16 +164,12 @@ public: // notify the worker. currently for task/disk/tablet report thread void notify_thread(); -private: +protected: bool _register_task_info(const TTaskType::type task_type, int64_t signature); void _remove_task_info(const TTaskType::type task_type, int64_t signature); void _finish_task(const TFinishTaskRequest& finish_task_request); - uint32_t _get_next_task_index(int32_t thread_count, std::deque<TAgentTaskRequest>& tasks, - TPriority::type priority); - void _create_tablet_worker_thread_callback(); void _drop_tablet_worker_thread_callback(); - void _push_worker_thread_callback(); void _publish_version_worker_thread_callback(); void _clear_transaction_task_worker_thread_callback(); void _alter_tablet_worker_thread_callback(); @@ -209,7 +205,7 @@ private: // random sleep 1~second seconds void _random_sleep(int second); -private: +protected: std::string _name; // Reference to the ExecEnv::_master_info @@ -237,6 +233,7 @@ private: // Always 1 when _thread_model is SINGLE_THREAD uint32_t _worker_count; TaskWorkerType _task_worker_type; + std::function<void()> _cb; static FrontendServiceClientCache _master_service_client_cache; static std::atomic_ulong _s_report_version; @@ -246,4 +243,17 @@ private: DISALLOW_COPY_AND_ASSIGN(TaskWorkerPool); }; // class TaskWorkerPool + +class PushTaskPool : public TaskWorkerPool { +public: + enum class PushWokerType { LOAD_V2, DELETE }; + PushTaskPool(ExecEnv* env, ThreadModel thread_model, PushWokerType type); + void _push_worker_thread_callback(); + + DISALLOW_COPY_AND_ASSIGN(PushTaskPool); + +private: + PushWokerType _push_worker_type; +}; + } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org