This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new 95cdeaeb24 [fix-branch-1.2-lts](TaskWorkerPool) Fix task worker pool 
and remove unnecessary copy (#23538)
95cdeaeb24 is described below

commit 95cdeaeb240dfb4766016d9b9e8c7ee954af24cd
Author: bobhan1 <bh2444151...@outlook.com>
AuthorDate: Mon Aug 28 11:26:11 2023 +0800

    [fix-branch-1.2-lts](TaskWorkerPool) Fix task worker pool and remove 
unnecessary copy (#23538)
    
    cherry-pick #19822
---
 be/src/agent/agent_server.cpp     |  17 +++-
 be/src/agent/agent_server.h       |   4 +-
 be/src/agent/task_worker_pool.cpp | 202 ++++++++++++++++----------------------
 be/src/agent/task_worker_pool.h   |  22 +++--
 4 files changed, 115 insertions(+), 130 deletions(-)

diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index d57777b6e6..f925baa416 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -67,13 +67,20 @@ AgentServer::AgentServer(ExecEnv* exec_env, const 
TMasterInfo& master_info)
 #define CREATE_AND_START_THREAD(type, pool_name)
 #endif // BE_TEST
 
+#ifndef BE_TEST
+    // Both PUSH and REALTIME_PUSH type use _push_load_workers
+    _push_load_workers.reset(new PushTaskPool(exec_env, 
TaskWorkerPool::ThreadModel::MULTI_THREADS,
+                                              
PushTaskPool::PushWokerType::LOAD_V2));
+    _push_load_workers->start();
+    _push_delete_workers.reset(new PushTaskPool(exec_env,
+                                                
TaskWorkerPool::ThreadModel::MULTI_THREADS,
+                                                
PushTaskPool::PushWokerType::DELETE));
+    _push_delete_workers->start();
+#endif
     CREATE_AND_START_POOL(CREATE_TABLE, _create_tablet_workers);
     CREATE_AND_START_POOL(DROP_TABLE, _drop_tablet_workers);
-    // Both PUSH and REALTIME_PUSH type use _push_workers
-    CREATE_AND_START_POOL(PUSH, _push_workers);
     CREATE_AND_START_POOL(PUBLISH_VERSION, _publish_version_workers);
     CREATE_AND_START_POOL(CLEAR_TRANSACTION_TASK, 
_clear_transaction_task_workers);
-    CREATE_AND_START_POOL(DELETE, _delete_workers);
     CREATE_AND_START_POOL(ALTER_TABLE, _alter_tablet_workers);
     CREATE_AND_START_POOL(CLONE, _clone_workers);
     CREATE_AND_START_POOL(STORAGE_MEDIUM_MIGRATE, 
_storage_medium_migrate_workers);
@@ -165,9 +172,9 @@ void AgentServer::submit_tasks(TAgentResult& agent_result,
             }
             if (task.push_req.push_type == TPushType::LOAD ||
                 task.push_req.push_type == TPushType::LOAD_V2) {
-                _push_workers->submit_task(task);
+                _push_load_workers->submit_task(task);
             } else if (task.push_req.push_type == TPushType::DELETE) {
-                _delete_workers->submit_task(task);
+                _push_delete_workers->submit_task(task);
             } else {
                 ret_st = Status::InvalidArgument(
                         "task(signature={}, type={}, push_type={}) has wrong 
push_type", signature,
diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h
index bfb819dd97..32ae5ad8ee 100644
--- a/be/src/agent/agent_server.h
+++ b/be/src/agent/agent_server.h
@@ -55,10 +55,10 @@ private:
 
     std::unique_ptr<TaskWorkerPool> _create_tablet_workers;
     std::unique_ptr<TaskWorkerPool> _drop_tablet_workers;
-    std::unique_ptr<TaskWorkerPool> _push_workers;
+    std::unique_ptr<TaskWorkerPool> _push_load_workers;
     std::unique_ptr<TaskWorkerPool> _publish_version_workers;
     std::unique_ptr<TaskWorkerPool> _clear_transaction_task_workers;
-    std::unique_ptr<TaskWorkerPool> _delete_workers;
+    std::unique_ptr<TaskWorkerPool> _push_delete_workers;
     std::unique_ptr<TaskWorkerPool> _alter_tablet_workers;
     std::unique_ptr<TaskWorkerPool> _clone_workers;
     std::unique_ptr<TaskWorkerPool> _storage_medium_migrate_workers;
diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index bdccdaf7e5..c95b782d3b 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -113,96 +113,92 @@ void TaskWorkerPool::start() {
     if (_thread_model == ThreadModel::SINGLE_THREAD) {
         _worker_count = 1;
     }
-    std::function<void()> cb;
     switch (_task_worker_type) {
     case TaskWorkerType::CREATE_TABLE:
         _worker_count = config::create_tablet_worker_count;
-        cb = 
std::bind<void>(&TaskWorkerPool::_create_tablet_worker_thread_callback, this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_create_tablet_worker_thread_callback, this);
         break;
     case TaskWorkerType::DROP_TABLE:
         _worker_count = config::drop_tablet_worker_count;
-        cb = 
std::bind<void>(&TaskWorkerPool::_drop_tablet_worker_thread_callback, this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_drop_tablet_worker_thread_callback, this);
         break;
     case TaskWorkerType::PUSH:
     case TaskWorkerType::REALTIME_PUSH:
-        _worker_count =
-                config::push_worker_count_normal_priority + 
config::push_worker_count_high_priority;
-        cb = std::bind<void>(&TaskWorkerPool::_push_worker_thread_callback, 
this);
+        break;
+    case TaskWorkerType::DELETE:
         break;
     case TaskWorkerType::PUBLISH_VERSION:
         _worker_count = config::publish_version_worker_count;
-        cb = 
std::bind<void>(&TaskWorkerPool::_publish_version_worker_thread_callback, this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_publish_version_worker_thread_callback, this);
         break;
     case TaskWorkerType::CLEAR_TRANSACTION_TASK:
         _worker_count = config::clear_transaction_task_worker_count;
-        cb = 
std::bind<void>(&TaskWorkerPool::_clear_transaction_task_worker_thread_callback,
 this);
-        break;
-    case TaskWorkerType::DELETE:
-        _worker_count = config::delete_worker_count;
-        cb = std::bind<void>(&TaskWorkerPool::_push_worker_thread_callback, 
this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_clear_transaction_task_worker_thread_callback,
+                              this);
         break;
     case TaskWorkerType::ALTER_TABLE:
         _worker_count = config::alter_tablet_worker_count;
-        cb = 
std::bind<void>(&TaskWorkerPool::_alter_tablet_worker_thread_callback, this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_alter_tablet_worker_thread_callback, this);
         break;
     case TaskWorkerType::CLONE:
         _worker_count = config::clone_worker_count;
-        cb = std::bind<void>(&TaskWorkerPool::_clone_worker_thread_callback, 
this);
+        _cb = std::bind<void>(&TaskWorkerPool::_clone_worker_thread_callback, 
this);
         break;
     case TaskWorkerType::STORAGE_MEDIUM_MIGRATE:
         _worker_count = config::storage_medium_migrate_count;
-        cb = 
std::bind<void>(&TaskWorkerPool::_storage_medium_migrate_worker_thread_callback,
 this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_storage_medium_migrate_worker_thread_callback,
+                              this);
         break;
     case TaskWorkerType::CHECK_CONSISTENCY:
         _worker_count = config::check_consistency_worker_count;
-        cb = 
std::bind<void>(&TaskWorkerPool::_check_consistency_worker_thread_callback, 
this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_check_consistency_worker_thread_callback, 
this);
         break;
     case TaskWorkerType::REPORT_TASK:
-        cb = 
std::bind<void>(&TaskWorkerPool::_report_task_worker_thread_callback, this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_report_task_worker_thread_callback, this);
         break;
     case TaskWorkerType::REPORT_DISK_STATE:
-        cb = 
std::bind<void>(&TaskWorkerPool::_report_disk_state_worker_thread_callback, 
this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_report_disk_state_worker_thread_callback, 
this);
         break;
     case TaskWorkerType::REPORT_OLAP_TABLE:
-        cb = 
std::bind<void>(&TaskWorkerPool::_report_tablet_worker_thread_callback, this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_report_tablet_worker_thread_callback, this);
         break;
     case TaskWorkerType::UPLOAD:
         _worker_count = config::upload_worker_count;
-        cb = std::bind<void>(&TaskWorkerPool::_upload_worker_thread_callback, 
this);
+        _cb = std::bind<void>(&TaskWorkerPool::_upload_worker_thread_callback, 
this);
         break;
     case TaskWorkerType::DOWNLOAD:
         _worker_count = config::download_worker_count;
-        cb = 
std::bind<void>(&TaskWorkerPool::_download_worker_thread_callback, this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_download_worker_thread_callback, this);
         break;
     case TaskWorkerType::MAKE_SNAPSHOT:
         _worker_count = config::make_snapshot_worker_count;
-        cb = std::bind<void>(&TaskWorkerPool::_make_snapshot_thread_callback, 
this);
+        _cb = std::bind<void>(&TaskWorkerPool::_make_snapshot_thread_callback, 
this);
         break;
     case TaskWorkerType::RELEASE_SNAPSHOT:
         _worker_count = config::release_snapshot_worker_count;
-        cb = 
std::bind<void>(&TaskWorkerPool::_release_snapshot_thread_callback, this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_release_snapshot_thread_callback, this);
         break;
     case TaskWorkerType::MOVE:
         _worker_count = 1;
-        cb = std::bind<void>(&TaskWorkerPool::_move_dir_thread_callback, this);
+        _cb = std::bind<void>(&TaskWorkerPool::_move_dir_thread_callback, 
this);
         break;
     case TaskWorkerType::UPDATE_TABLET_META_INFO:
         _worker_count = 1;
-        cb = 
std::bind<void>(&TaskWorkerPool::_update_tablet_meta_worker_thread_callback, 
this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_update_tablet_meta_worker_thread_callback, 
this);
         break;
     case TaskWorkerType::SUBMIT_TABLE_COMPACTION:
         _worker_count = 1;
-        cb = 
std::bind<void>(&TaskWorkerPool::_submit_table_compaction_worker_thread_callback,
-                             this);
+        _cb = 
std::bind<void>(&TaskWorkerPool::_submit_table_compaction_worker_thread_callback,
+                              this);
         break;
     case TaskWorkerType::REFRESH_STORAGE_POLICY:
-        cb = std::bind<void>(
+        _cb = std::bind<void>(
                 
&TaskWorkerPool::_storage_refresh_storage_policy_worker_thread_callback, this);
         break;
     case TaskWorkerType::UPDATE_STORAGE_POLICY:
         _worker_count = 1;
-        cb = 
std::bind<void>(&TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback,
-                             this);
+        _cb = std::bind<void>(
+                
&TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback, this);
         break;
     default:
         // pass
@@ -217,7 +213,7 @@ void TaskWorkerPool::start() {
             .build(&_thread_pool);
 
     for (int i = 0; i < _worker_count; i++) {
-        auto st = _thread_pool->submit_func(cb);
+        auto st = _thread_pool->submit_func(_cb);
         CHECK(st.ok()) << st;
     }
 #endif
@@ -310,36 +306,9 @@ void TaskWorkerPool::_finish_task(const 
TFinishTaskRequest& finish_task_request)
     TRACE("finish task");
 }
 
-uint32_t TaskWorkerPool::_get_next_task_index(int32_t thread_count,
-                                              std::deque<TAgentTaskRequest>& 
tasks,
-                                              TPriority::type priority) {
-    int32_t index = -1;
-    std::deque<TAgentTaskRequest>::size_type task_count = tasks.size();
-    for (uint32_t i = 0; i < task_count; ++i) {
-        TAgentTaskRequest task = tasks[i];
-        if (priority == TPriority::HIGH) {
-            if (task.__isset.priority && task.priority == TPriority::HIGH) {
-                index = i;
-                break;
-            }
-        }
-    }
-
-    if (index == -1) {
-        if (priority == TPriority::HIGH) {
-            return index;
-        }
-
-        index = 0;
-    }
-
-    return index;
-}
-
 void TaskWorkerPool::_create_tablet_worker_thread_callback() {
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
-        TCreateTabletReq create_tablet_req;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
             _worker_thread_condition_variable.wait(
@@ -349,10 +318,9 @@ void 
TaskWorkerPool::_create_tablet_worker_thread_callback() {
             }
 
             agent_task_req = _tasks.front();
-            create_tablet_req = agent_task_req.create_tablet_req;
             _tasks.pop_front();
         }
-
+        const TCreateTabletReq& create_tablet_req = 
agent_task_req.create_tablet_req;
         scoped_refptr<Trace> trace(new Trace);
         MonotonicStopWatch watch;
         watch.start();
@@ -413,7 +381,6 @@ void 
TaskWorkerPool::_create_tablet_worker_thread_callback() {
 void TaskWorkerPool::_drop_tablet_worker_thread_callback() {
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
-        TDropTabletReq drop_tablet_req;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
             _worker_thread_condition_variable.wait(
@@ -423,10 +390,9 @@ void TaskWorkerPool::_drop_tablet_worker_thread_callback() 
{
             }
 
             agent_task_req = _tasks.front();
-            drop_tablet_req = agent_task_req.drop_tablet_req;
             _tasks.pop_front();
         }
-
+        const TDropTabletReq& drop_tablet_req = agent_task_req.drop_tablet_req;
         Status status;
         TabletSharedPtr dropped_tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(
                 drop_tablet_req.tablet_id, false);
@@ -571,12 +537,27 @@ void TaskWorkerPool::_alter_tablet(const 
TAgentTaskRequest& agent_task_req, int6
     finish_task_request->__set_task_status(status.to_thrift());
 }
 
-void TaskWorkerPool::_push_worker_thread_callback() {
+PushTaskPool::PushTaskPool(ExecEnv* env, ThreadModel thread_model, 
PushWokerType type)
+        : TaskWorkerPool(
+                  type == PushWokerType::LOAD_V2 ? TaskWorkerType::PUSH : 
TaskWorkerType::DELETE,
+                  env, *env->master_info(), thread_model),
+          _push_worker_type(type) {
+    if (_push_worker_type == PushWokerType::LOAD_V2) {
+        _worker_count =
+                config::push_worker_count_normal_priority + 
config::push_worker_count_high_priority;
+
+    } else {
+        _worker_count = config::delete_worker_count;
+    }
+    _cb = [this]() { _push_worker_thread_callback(); };
+}
+
+void PushTaskPool::_push_worker_thread_callback() {
     // gen high priority worker thread
     TPriority::type priority = TPriority::NORMAL;
     int32_t push_worker_count_high_priority = 
config::push_worker_count_high_priority;
-    static uint32_t s_worker_count = 0;
-    {
+    if (_push_worker_type == PushWokerType::LOAD_V2) {
+        static uint32_t s_worker_count = 0;
         std::lock_guard<std::mutex> worker_thread_lock(_worker_thread_lock);
         if (s_worker_count < push_worker_count_high_priority) {
             ++s_worker_count;
@@ -586,9 +567,7 @@ void TaskWorkerPool::_push_worker_thread_callback() {
 
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
-        TPushReq push_req;
-        int32_t index = 0;
-        do {
+        {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
             _worker_thread_condition_variable.wait(
                     worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
@@ -596,26 +575,26 @@ void TaskWorkerPool::_push_worker_thread_callback() {
                 return;
             }
 
-            index = 
_get_next_task_index(config::push_worker_count_normal_priority +
-                                                 
config::push_worker_count_high_priority,
-                                         _tasks, priority);
-
-            if (index < 0) {
-                // there is no high priority task. notify other thread to 
handle normal task
-                _worker_thread_condition_variable.notify_all();
-                break;
+            if (priority == TPriority::HIGH) {
+                const auto it = std::find_if(
+                        _tasks.cbegin(), _tasks.cend(), [](const 
TAgentTaskRequest& req) {
+                            return req.__isset.priority && req.priority == 
TPriority::HIGH;
+                        });
+
+                if (it == _tasks.cend()) {
+                    // there is no high priority task. notify other thread to 
handle normal task
+                    _worker_thread_condition_variable.notify_all();
+                    sleep(1);
+                    continue;
+                }
+                agent_task_req = std::move(*it);
+                _tasks.erase(it);
+            } else {
+                agent_task_req = std::move(_tasks.front());
+                _tasks.pop_front();
             }
-
-            agent_task_req = _tasks[index];
-            push_req = agent_task_req.push_req;
-            _tasks.erase(_tasks.begin() + index);
-        } while (false);
-
-        if (index < 0) {
-            // there is no high priority task in queue
-            sleep(1);
-            continue;
         }
+        TPushReq& push_req = agent_task_req.push_req;
 
         LOG(INFO) << "get push task. signature=" << agent_task_req.signature
                   << ", priority=" << priority << " push_type=" << 
push_req.push_type;
@@ -658,7 +637,6 @@ void TaskWorkerPool::_push_worker_thread_callback() {
 void TaskWorkerPool::_publish_version_worker_thread_callback() {
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
-        TPublishVersionRequest publish_version_req;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
             _worker_thread_condition_variable.wait(
@@ -668,10 +646,9 @@ void 
TaskWorkerPool::_publish_version_worker_thread_callback() {
             }
 
             agent_task_req = _tasks.front();
-            publish_version_req = agent_task_req.publish_version_req;
             _tasks.pop_front();
         }
-
+        TPublishVersionRequest& publish_version_req = 
agent_task_req.publish_version_req;
         DorisMetrics::instance()->publish_task_request_total->increment(1);
         VLOG_NOTICE << "get publish version task. signature=" << 
agent_task_req.signature;
 
@@ -769,7 +746,6 @@ void 
TaskWorkerPool::_publish_version_worker_thread_callback() {
 void TaskWorkerPool::_clear_transaction_task_worker_thread_callback() {
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
-        TClearTransactionTaskRequest clear_transaction_task_req;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
             _worker_thread_condition_variable.wait(
@@ -779,9 +755,10 @@ void 
TaskWorkerPool::_clear_transaction_task_worker_thread_callback() {
             }
 
             agent_task_req = _tasks.front();
-            clear_transaction_task_req = 
agent_task_req.clear_transaction_task_req;
             _tasks.pop_front();
         }
+        const TClearTransactionTaskRequest& clear_transaction_task_req =
+                agent_task_req.clear_transaction_task_req;
         LOG(INFO) << "get clear transaction task. signature=" << 
agent_task_req.signature
                   << ", transaction_id=" << 
clear_transaction_task_req.transaction_id
                   << ", partition_id_size=" << 
clear_transaction_task_req.partition_id.size();
@@ -821,7 +798,6 @@ void 
TaskWorkerPool::_clear_transaction_task_worker_thread_callback() {
 void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
-        TUpdateTabletMetaInfoReq update_tablet_meta_req;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
             _worker_thread_condition_variable.wait(
@@ -831,9 +807,10 @@ void 
TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
             }
 
             agent_task_req = _tasks.front();
-            update_tablet_meta_req = 
agent_task_req.update_tablet_meta_info_req;
             _tasks.pop_front();
         }
+        const TUpdateTabletMetaInfoReq& update_tablet_meta_req =
+                agent_task_req.update_tablet_meta_info_req;
         LOG(INFO) << "get update tablet meta task. signature=" << 
agent_task_req.signature;
 
         Status status;
@@ -895,8 +872,6 @@ void 
TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
 void TaskWorkerPool::_clone_worker_thread_callback() {
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
-        TCloneReq clone_req;
-
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
             _worker_thread_condition_variable.wait(
@@ -906,9 +881,9 @@ void TaskWorkerPool::_clone_worker_thread_callback() {
             }
 
             agent_task_req = _tasks.front();
-            clone_req = agent_task_req.clone_req;
             _tasks.pop_front();
         }
+        const TCloneReq& clone_req = agent_task_req.clone_req;
 
         DorisMetrics::instance()->clone_requests_total->increment(1);
         LOG(INFO) << "get clone task. signature=" << agent_task_req.signature;
@@ -945,7 +920,6 @@ void TaskWorkerPool::_clone_worker_thread_callback() {
 void TaskWorkerPool::_storage_medium_migrate_worker_thread_callback() {
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
-        TStorageMediumMigrateReq storage_medium_migrate_req;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
             _worker_thread_condition_variable.wait(
@@ -955,9 +929,10 @@ void 
TaskWorkerPool::_storage_medium_migrate_worker_thread_callback() {
             }
 
             agent_task_req = _tasks.front();
-            storage_medium_migrate_req = 
agent_task_req.storage_medium_migrate_req;
             _tasks.pop_front();
         }
+        const TStorageMediumMigrateReq& storage_medium_migrate_req =
+                agent_task_req.storage_medium_migrate_req;
 
         // check request and get info
         TabletSharedPtr tablet;
@@ -1046,7 +1021,6 @@ Status TaskWorkerPool::_check_migrate_request(const 
TStorageMediumMigrateReq& re
 void TaskWorkerPool::_check_consistency_worker_thread_callback() {
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
-        TCheckConsistencyReq check_consistency_req;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
             _worker_thread_condition_variable.wait(
@@ -1056,9 +1030,9 @@ void 
TaskWorkerPool::_check_consistency_worker_thread_callback() {
             }
 
             agent_task_req = _tasks.front();
-            check_consistency_req = agent_task_req.check_consistency_req;
             _tasks.pop_front();
         }
+        const TCheckConsistencyReq& check_consistency_req = 
agent_task_req.check_consistency_req;
 
         uint32_t checksum = 0;
         EngineChecksumTask engine_task(check_consistency_req.tablet_id,
@@ -1239,7 +1213,6 @@ void 
TaskWorkerPool::_report_tablet_worker_thread_callback() {
 void TaskWorkerPool::_upload_worker_thread_callback() {
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
-        TUploadReq upload_request;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
             _worker_thread_condition_variable.wait(
@@ -1249,9 +1222,9 @@ void TaskWorkerPool::_upload_worker_thread_callback() {
             }
 
             agent_task_req = _tasks.front();
-            upload_request = agent_task_req.upload_req;
             _tasks.pop_front();
         }
+        const TUploadReq& upload_request = agent_task_req.upload_req;
 
         LOG(INFO) << "get upload task. signature=" << agent_task_req.signature
                   << ", job_id=" << upload_request.job_id;
@@ -1290,7 +1263,6 @@ void TaskWorkerPool::_upload_worker_thread_callback() {
 void TaskWorkerPool::_download_worker_thread_callback() {
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
-        TDownloadReq download_request;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
             _worker_thread_condition_variable.wait(
@@ -1300,9 +1272,9 @@ void TaskWorkerPool::_download_worker_thread_callback() {
             }
 
             agent_task_req = _tasks.front();
-            download_request = agent_task_req.download_req;
             _tasks.pop_front();
         }
+        const TDownloadReq& download_request = agent_task_req.download_req;
         LOG(INFO) << "get download task. signature=" << 
agent_task_req.signature
                   << ", job_id=" << download_request.job_id;
 
@@ -1342,7 +1314,6 @@ void TaskWorkerPool::_download_worker_thread_callback() {
 void TaskWorkerPool::_make_snapshot_thread_callback() {
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
-        TSnapshotRequest snapshot_request;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
             _worker_thread_condition_variable.wait(
@@ -1352,9 +1323,9 @@ void TaskWorkerPool::_make_snapshot_thread_callback() {
             }
 
             agent_task_req = _tasks.front();
-            snapshot_request = agent_task_req.snapshot_req;
             _tasks.pop_front();
         }
+        const TSnapshotRequest& snapshot_request = agent_task_req.snapshot_req;
         LOG(INFO) << "get snapshot task. signature=" << 
agent_task_req.signature;
 
         string snapshot_path;
@@ -1401,7 +1372,6 @@ void TaskWorkerPool::_make_snapshot_thread_callback() {
 void TaskWorkerPool::_release_snapshot_thread_callback() {
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
-        TReleaseSnapshotRequest release_snapshot_request;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
             _worker_thread_condition_variable.wait(
@@ -1411,12 +1381,13 @@ void 
TaskWorkerPool::_release_snapshot_thread_callback() {
             }
 
             agent_task_req = _tasks.front();
-            release_snapshot_request = agent_task_req.release_snapshot_req;
             _tasks.pop_front();
         }
+        const TReleaseSnapshotRequest& release_snapshot_request =
+                agent_task_req.release_snapshot_req;
         LOG(INFO) << "get release snapshot task. signature=" << 
agent_task_req.signature;
 
-        string& snapshot_path = release_snapshot_request.snapshot_path;
+        const string& snapshot_path = release_snapshot_request.snapshot_path;
         Status status = 
SnapshotManager::instance()->release_snapshot(snapshot_path);
         if (!status.ok()) {
             LOG_WARNING("failed to release snapshot")
@@ -1450,7 +1421,6 @@ Status TaskWorkerPool::_get_tablet_info(const TTabletId 
tablet_id, const TSchema
 void TaskWorkerPool::_move_dir_thread_callback() {
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
-        TMoveDirReq move_dir_req;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
             _worker_thread_condition_variable.wait(
@@ -1460,9 +1430,9 @@ void TaskWorkerPool::_move_dir_thread_callback() {
             }
 
             agent_task_req = _tasks.front();
-            move_dir_req = agent_task_req.move_dir_req;
             _tasks.pop_front();
         }
+        const TMoveDirReq& move_dir_req = agent_task_req.move_dir_req;
         LOG(INFO) << "get move dir task. signature=" << 
agent_task_req.signature
                   << ", job_id=" << move_dir_req.job_id;
         Status status = _move_dir(move_dir_req.tablet_id, move_dir_req.src, 
move_dir_req.job_id,
@@ -1556,8 +1526,6 @@ void TaskWorkerPool::_random_sleep(int second) {
 void TaskWorkerPool::_submit_table_compaction_worker_thread_callback() {
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
-        TCompactionReq compaction_req;
-
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
             _worker_thread_condition_variable.wait(
@@ -1567,9 +1535,9 @@ void 
TaskWorkerPool::_submit_table_compaction_worker_thread_callback() {
             }
 
             agent_task_req = _tasks.front();
-            compaction_req = agent_task_req.compaction_req;
             _tasks.pop_front();
         }
+        const TCompactionReq& compaction_req = agent_task_req.compaction_req;
 
         LOG(INFO) << "get compaction task. signature=" << 
agent_task_req.signature
                   << ", compaction_type=" << compaction_req.type;
@@ -1658,7 +1626,6 @@ void 
TaskWorkerPool::_storage_refresh_storage_policy_worker_thread_callback() {
 void TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback() {
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
-        TGetStoragePolicy get_storage_policy_req;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
             _worker_thread_condition_variable.wait(
@@ -1668,9 +1635,10 @@ void 
TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback() {
             }
 
             agent_task_req = _tasks.front();
-            get_storage_policy_req = agent_task_req.update_policy;
+
             _tasks.pop_front();
         }
+        const TGetStoragePolicy& get_storage_policy_req = 
agent_task_req.update_policy;
 
         StoragePolicyMgr* spm = ExecEnv::GetInstance()->storage_policy_mgr();
         auto policy_ptr = std::make_shared<StoragePolicy>();
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index ae8b9e8149..f81b5c0b31 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -164,16 +164,12 @@ public:
     // notify the worker. currently for task/disk/tablet report thread
     void notify_thread();
 
-private:
+protected:
     bool _register_task_info(const TTaskType::type task_type, int64_t 
signature);
     void _remove_task_info(const TTaskType::type task_type, int64_t signature);
     void _finish_task(const TFinishTaskRequest& finish_task_request);
-    uint32_t _get_next_task_index(int32_t thread_count, 
std::deque<TAgentTaskRequest>& tasks,
-                                  TPriority::type priority);
-
     void _create_tablet_worker_thread_callback();
     void _drop_tablet_worker_thread_callback();
-    void _push_worker_thread_callback();
     void _publish_version_worker_thread_callback();
     void _clear_transaction_task_worker_thread_callback();
     void _alter_tablet_worker_thread_callback();
@@ -209,7 +205,7 @@ private:
     // random sleep 1~second seconds
     void _random_sleep(int second);
 
-private:
+protected:
     std::string _name;
 
     // Reference to the ExecEnv::_master_info
@@ -237,6 +233,7 @@ private:
     // Always 1 when _thread_model is SINGLE_THREAD
     uint32_t _worker_count;
     TaskWorkerType _task_worker_type;
+    std::function<void()> _cb;
 
     static FrontendServiceClientCache _master_service_client_cache;
     static std::atomic_ulong _s_report_version;
@@ -246,4 +243,17 @@ private:
 
     DISALLOW_COPY_AND_ASSIGN(TaskWorkerPool);
 }; // class TaskWorkerPool
+
+class PushTaskPool : public TaskWorkerPool {
+public:
+    enum class PushWokerType { LOAD_V2, DELETE };
+    PushTaskPool(ExecEnv* env, ThreadModel thread_model, PushWokerType type);
+    void _push_worker_thread_callback();
+
+    DISALLOW_COPY_AND_ASSIGN(PushTaskPool);
+
+private:
+    PushWokerType _push_worker_type;
+};
+
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to