This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e991b13  [Code Refactor] Refactor AgentServer to make it less 
error-prone and more readable (#2831)
e991b13 is described below

commit e991b1300f80211367476cd7346da6540f38ad1d
Author: LingBin <lingbi...@gmail.com>
AuthorDate: Thu Feb 6 09:56:00 2020 +0800

    [Code Refactor] Refactor AgentServer to make it less error-prone and more 
readable (#2831)
    
    In `AgentServer`, each task type needs to be processed separately,
    which leads to very long code, hard to read, and not easy to detect
    errors (for example, some task type processing may be missed,
    corresponding relationship may be error)
    
    Fortunately, the code for each task_type is very similar, so this
    is a good case to use `MACRO`, which can greatly reduce the repeated
    code and solve above problems.
    
    This patch also fix two small bugs:
    1. The `_topic_subscriber` member has not been released in dtor
    2. in `submit_tasks()`, the `status_code` is not reset before
       each task is processed, resulting in wrong judgment.
    
    No functional changes in this patch.
---
 be/src/agent/agent_server.cpp     | 562 ++++++++++++--------------------------
 be/src/agent/agent_server.h       | 141 ++++------
 be/src/agent/heartbeat_server.h   |   4 +-
 be/src/agent/task_worker_pool.h   |   2 +
 be/src/service/doris_main.cpp     |   1 +
 gensrc/thrift/AgentService.thrift |   3 +-
 6 files changed, 239 insertions(+), 474 deletions(-)

diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index dcbdf4d..b30ed70 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -16,49 +16,29 @@
 // under the License.
 
 #include "agent/agent_server.h"
+
 #include <string>
-#include "boost/filesystem.hpp"
-#include "boost/lexical_cast.hpp"
-#include "thrift/concurrency/ThreadManager.h"
-#include "thrift/concurrency/PosixThreadFactory.h"
-#include "thrift/server/TThreadPoolServer.h"
-#include "thrift/server/TThreadedServer.h"
-#include "thrift/transport/TSocket.h"
-#include "thrift/transport/TTransportUtils.h"
-#include "util/thrift_server.h"
-#include "agent/status.h"
+
+#include <boost/filesystem.hpp>
+
 #include "agent/task_worker_pool.h"
+#include "agent/topic_subscriber.h"
 #include "agent/user_resource_listener.h"
 #include "common/status.h"
 #include "common/logging.h"
-#include "gen_cpp/AgentService_types.h"
-#include "gen_cpp/MasterService_types.h"
-#include "gen_cpp/Status_types.h"
-#include "gen_cpp/Types_constants.h"
-#include "olap/utils.h"
+#include "gutil/strings/substitute.h"
 #include "olap/snapshot_manager.h"
-#include "runtime/exec_env.h"
 #include "runtime/etl_job_mgr.h"
-#include "util/debug_util.h"
 
-using apache::thrift::transport::TProcessor;
-using std::deque;
-using std::list;
-using std::map;
-using std::nothrow;
-using std::set;
 using std::string;
-using std::to_string;
 using std::vector;
 
 namespace doris {
 
-AgentServer::AgentServer(ExecEnv* exec_env,
-                         const TMasterInfo& master_info) :
+AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info) :
         _exec_env(exec_env),
         _master_info(master_info),
         _topic_subscriber(new TopicSubscriber()) {
-    
     for (auto& path : exec_env->store_paths()) {
         try {
             string dpp_download_path_str = path.path + DPP_PREFIX;
@@ -67,113 +47,48 @@ AgentServer::AgentServer(ExecEnv* exec_env,
                 boost::filesystem::remove_all(dpp_download_path);
             }
         } catch (...) {
-            LOG(WARNING) << "boost exception when remove dpp download path. 
path="
-                         << path.path;
+            LOG(WARNING) << "boost exception when remove dpp download path. 
path=" << path.path;
         }
     }
 
-    // init task worker pool
-    _create_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CREATE_TABLE,
-            _exec_env,
-            master_info);
-    _drop_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DROP_TABLE,
-            _exec_env,
-            master_info);
-    _push_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::PUSH,
-            _exec_env,
-            master_info);
-    _publish_version_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::PUBLISH_VERSION,
-            _exec_env,
-            master_info);
-    _clear_transaction_task_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CLEAR_TRANSACTION_TASK,
-            exec_env,
-            master_info);
-    _delete_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DELETE,
-            _exec_env,
-            master_info);
-    _alter_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::ALTER_TABLE,
-            _exec_env,
-            master_info);
-    _clone_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CLONE,
-            _exec_env,
-            master_info);
-    _storage_medium_migrate_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::STORAGE_MEDIUM_MIGRATE,
-            _exec_env,
-            master_info);
-    _check_consistency_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::CHECK_CONSISTENCY,
-            _exec_env,
-            master_info);
-    _report_task_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_TASK,
-            _exec_env,
-            master_info);
-    _report_disk_state_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_DISK_STATE,
-            _exec_env,
-            master_info);
-    _report_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::REPORT_OLAP_TABLE,
-            _exec_env,
-            master_info);
-    _upload_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::UPLOAD,
-            _exec_env,
-            master_info);
-    _download_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::DOWNLOAD,
-            _exec_env,
-            master_info);
-    _make_snapshot_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::MAKE_SNAPSHOT,
-            _exec_env,
-            master_info);
-    _release_snapshot_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::RELEASE_SNAPSHOT,
-            _exec_env,
-            master_info);
-    _move_dir_workers= new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::MOVE,
-            _exec_env,
-            master_info);
-    _recover_tablet_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::RECOVER_TABLET,
-            _exec_env,
-            master_info);
-    _update_tablet_meta_info_workers = new TaskWorkerPool(
-            TaskWorkerPool::TaskWorkerType::UPDATE_TABLET_META_INFO,
-            _exec_env,
-            master_info);
+    // It is the same code to create workers of each type, so we use a macro
+    // to make code to be more readable.
+
+#ifndef BE_TEST
+#define CREATE_AND_START_POOL(type, pool_name)         \
+    pool_name.reset(new TaskWorkerPool(                \
+                TaskWorkerPool::TaskWorkerType::type,  \
+                _exec_env,                             \
+                master_info));                         \
+    pool_name->start();
+#else
+#define CREATE_AND_START_POOL(type, pool_name)
+#endif // BE_TEST
+
+    CREATE_AND_START_POOL(CREATE_TABLE, _create_tablet_workers);
+    CREATE_AND_START_POOL(DROP_TABLE, _drop_tablet_workers);
+    // Both PUSH and REALTIME_PUSH type use _push_workers
+    CREATE_AND_START_POOL(PUSH, _push_workers);
+    CREATE_AND_START_POOL(PUBLISH_VERSION, _publish_version_workers);
+    CREATE_AND_START_POOL(CLEAR_TRANSACTION_TASK, 
_clear_transaction_task_workers);
+    CREATE_AND_START_POOL(DELETE, _delete_workers);
+    CREATE_AND_START_POOL(ALTER_TABLE, _alter_tablet_workers);
+    CREATE_AND_START_POOL(CLONE, _clone_workers);
+    CREATE_AND_START_POOL(STORAGE_MEDIUM_MIGRATE, 
_storage_medium_migrate_workers);
+    CREATE_AND_START_POOL(CHECK_CONSISTENCY, _check_consistency_workers);
+    CREATE_AND_START_POOL(REPORT_TASK, _report_task_workers);
+    CREATE_AND_START_POOL(REPORT_DISK_STATE, _report_disk_state_workers);
+    CREATE_AND_START_POOL(REPORT_OLAP_TABLE, _report_tablet_workers);
+    CREATE_AND_START_POOL(UPLOAD, _upload_workers);
+    CREATE_AND_START_POOL(DOWNLOAD, _download_workers);
+    CREATE_AND_START_POOL(MAKE_SNAPSHOT, _make_snapshot_workers);
+    CREATE_AND_START_POOL(RELEASE_SNAPSHOT, _release_snapshot_workers);
+    CREATE_AND_START_POOL(MOVE, _move_dir_workers);
+    CREATE_AND_START_POOL(RECOVER_TABLET, _recover_tablet_workers);
+    CREATE_AND_START_POOL(UPDATE_TABLET_META_INFO, 
_update_tablet_meta_info_workers);
+#undef CREATE_AND_START_POOL
+
 #ifndef BE_TEST
-    _create_tablet_workers->start();
-    _drop_tablet_workers->start();
-    _push_workers->start();
-    _publish_version_workers->start();
-    _clear_transaction_task_workers->start();
-    _delete_workers->start();
-    _alter_tablet_workers->start();
-    _clone_workers->start();
-    _storage_medium_migrate_workers->start();
-    _check_consistency_workers->start();
-    _report_task_workers->start();
-    _report_disk_state_workers->start();
-    _report_tablet_workers->start();
-    _upload_workers->start();
-    _download_workers->start();
-    _make_snapshot_workers->start();
-    _release_snapshot_workers->start();
-    _move_dir_workers->start();
-    _recover_tablet_workers->start();
-    _update_tablet_meta_info_workers->start();
     // Add subscriber here and register listeners
     TopicListener* user_resource_listener = new UserResourceListener(exec_env, 
master_info);
     LOG(INFO) << "Register user resource listener";
@@ -181,325 +96,198 @@ AgentServer::AgentServer(ExecEnv* exec_env,
 #endif
 }
 
-AgentServer::~AgentServer() {
-    if (_create_tablet_workers != NULL) {
-        delete _create_tablet_workers;
-    }
-    if (_drop_tablet_workers != NULL) {
-        delete _drop_tablet_workers;
-    }
-    if (_push_workers != NULL) {
-        delete _push_workers;
-    }
-    if (_publish_version_workers != NULL) {
-        delete _publish_version_workers;
-    }
-    if (_clear_transaction_task_workers != NULL) {
-        delete _clear_transaction_task_workers;
-    }
-    if (_delete_workers != NULL) {
-        delete _delete_workers;
-    }
-    if (_alter_tablet_workers != NULL) {
-        delete _alter_tablet_workers;
-    }
-    if (_clone_workers != NULL) {
-        delete _clone_workers;
-    }
-    if (_storage_medium_migrate_workers != NULL) {
-        delete _storage_medium_migrate_workers;
-    }
-    if (_check_consistency_workers != NULL) {
-        delete _check_consistency_workers;
-    }
-    if (_report_task_workers != NULL) {
-        delete _report_task_workers;
-    }
-    if (_report_disk_state_workers != NULL) {
-        delete _report_disk_state_workers;
-    }
-    if (_report_tablet_workers != NULL) {
-        delete _report_tablet_workers;
-    }
-    if (_upload_workers != NULL) {
-        delete _upload_workers;
-    }
-    if (_download_workers != NULL) {
-        delete _download_workers;
-    }
-    if (_make_snapshot_workers != NULL) {
-        delete _make_snapshot_workers;
-    }
-    if (_move_dir_workers!= NULL) {
-        delete _move_dir_workers;
-    }
-    if (_recover_tablet_workers != NULL) {
-        delete _recover_tablet_workers;
-    }
-
-    if (_update_tablet_meta_info_workers != NULL) {
-        delete _update_tablet_meta_info_workers;
-    }
-    if (_release_snapshot_workers != NULL) {
-        delete _release_snapshot_workers;
-    }
-    if (_topic_subscriber !=NULL) {
-        delete _topic_subscriber;
-    }
-}
-
-void AgentServer::submit_tasks(
-        TAgentResult& return_value,
-        const vector<TAgentTaskRequest>& tasks) {
+AgentServer::~AgentServer() { }
 
-    // Set result to dm
-    vector<string> error_msgs;
-    TStatusCode::type status_code = TStatusCode::OK;
+// TODO(lingbin): each task in the batch may have it own status or FE must 
check and
+// resend request when something is wrong(BE may need some logic to guarantee 
idempotence.
+void AgentServer::submit_tasks(TAgentResult& agent_result, const 
vector<TAgentTaskRequest>& tasks) {
+    Status ret_st;
 
-    // TODO check require master same to heartbeat master
-    if (_master_info.network_address.hostname == ""
-            || _master_info.network_address.port == 0) {
-        error_msgs.push_back("Not get master heartbeat yet.");
-        return_value.status.__set_error_msgs(error_msgs);
-        return_value.status.__set_status_code(TStatusCode::CANCELLED);
+    // TODO check master_info here if it is the same with that of heartbeat rpc
+    if (_master_info.network_address.hostname == "" || 
_master_info.network_address.port == 0) {
+        Status ret_st = Status::Cancelled("Have not get FE Master heartbeat 
yet");
+        ret_st.to_thrift(&agent_result.status);
         return;
     }
 
     for (auto task : tasks) {
+        VLOG_RPC << "submit one task: " << 
apache::thrift::ThriftDebugString(task).c_str();
         TTaskType::type task_type = task.task_type;
         int64_t signature = task.signature;
 
+#define HANDLE_TYPE(t_task_type, work_pool, req_member)                        
 \
+    case t_task_type:                                                          
 \
+        if (task.__isset.req_member) {                                         
 \
+            work_pool->submit_task(task);                                      
 \
+        } else {                                                               
 \
+            ret_st = Status::InvalidArgument(strings::Substitute(              
 \
+                    "task(signature=$0) has wrong request member", 
signature)); \
+        }                                                                      
 \
+        break;
+
+        // TODO(lingbin): It still too long, divided these task types into 
several categories
         switch (task_type) {
-        case TTaskType::CREATE:
-            if (task.__isset.create_tablet_req) {
-               _create_tablet_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::DROP:
-            if (task.__isset.drop_tablet_req) {
-                _drop_tablet_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
+        HANDLE_TYPE(TTaskType::CREATE, _create_tablet_workers, 
create_tablet_req);
+        HANDLE_TYPE(TTaskType::DROP, _drop_tablet_workers, drop_tablet_req);
+        HANDLE_TYPE(TTaskType::PUBLISH_VERSION, _publish_version_workers, 
publish_version_req);
+        HANDLE_TYPE(TTaskType::CLEAR_TRANSACTION_TASK,
+                    _clear_transaction_task_workers,
+                    clear_transaction_task_req);
+        HANDLE_TYPE(TTaskType::CLONE, _clone_workers, clone_req);
+        HANDLE_TYPE(TTaskType::STORAGE_MEDIUM_MIGRATE,
+                    _storage_medium_migrate_workers,
+                    storage_medium_migrate_req);
+        HANDLE_TYPE(TTaskType::CHECK_CONSISTENCY,
+                    _check_consistency_workers,
+                    check_consistency_req);
+        HANDLE_TYPE(TTaskType::UPLOAD, _upload_workers, upload_req);
+        HANDLE_TYPE(TTaskType::DOWNLOAD, _download_workers, download_req);
+        HANDLE_TYPE(TTaskType::MAKE_SNAPSHOT, _make_snapshot_workers, 
snapshot_req);
+        HANDLE_TYPE(TTaskType::RELEASE_SNAPSHOT, _release_snapshot_workers, 
release_snapshot_req);
+        HANDLE_TYPE(TTaskType::MOVE, _move_dir_workers, move_dir_req);
+        HANDLE_TYPE(TTaskType::RECOVER_TABLET, _recover_tablet_workers, 
recover_tablet_req);
+        HANDLE_TYPE(TTaskType::UPDATE_TABLET_META_INFO,
+                    _update_tablet_meta_info_workers,
+                    update_tablet_meta_info_req);
+
         case TTaskType::REALTIME_PUSH:
         case TTaskType::PUSH:
-            if (task.__isset.push_req) {
-                if (task.push_req.push_type == TPushType::LOAD
-                        || task.push_req.push_type == TPushType::LOAD_DELETE) {
-                    _push_workers->submit_task(task);
-                } else if (task.push_req.push_type == TPushType::DELETE) {
-                    _delete_workers->submit_task(task);
-                } else {
-                    status_code = TStatusCode::ANALYSIS_ERROR;
-                }
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::PUBLISH_VERSION:
-            if (task.__isset.publish_version_req) {
-                _publish_version_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
+            if (!task.__isset.push_req) {
+                ret_st = Status::InvalidArgument(strings::Substitute(
+                        "task(signature=$0) has wrong request member", 
signature));
+                break;
             }
-            break;
-        case TTaskType::CLEAR_TRANSACTION_TASK:
-            if (task.__isset.clear_transaction_task_req) {
-                _clear_transaction_task_workers->submit_task(task);
+            if (task.push_req.push_type == TPushType::LOAD
+                    || task.push_req.push_type == TPushType::LOAD_DELETE) {
+                _push_workers->submit_task(task);
+            } else if (task.push_req.push_type == TPushType::DELETE) {
+                _delete_workers->submit_task(task);
             } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
+                ret_st = Status::InvalidArgument(strings::Substitute(
+                        "task(signature=$0, type=$1, push_type=$2) has wrong 
push_type",
+                        signature, task_type, task.push_req.push_type));
             }
             break;
         case TTaskType::ALTER:
             if (task.__isset.alter_tablet_req || 
task.__isset.alter_tablet_req_v2) {
                 _alter_tablet_workers->submit_task(task);
             } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::CLONE:
-            if (task.__isset.clone_req) {
-                _clone_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::STORAGE_MEDIUM_MIGRATE:
-            if (task.__isset.storage_medium_migrate_req) {
-                _storage_medium_migrate_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::CHECK_CONSISTENCY:
-            if (task.__isset.check_consistency_req) {
-                _check_consistency_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::UPLOAD:
-            if (task.__isset.upload_req) {
-                _upload_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::DOWNLOAD:
-            if (task.__isset.download_req) {
-                _download_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::MAKE_SNAPSHOT:
-            if (task.__isset.snapshot_req) {
-                _make_snapshot_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::RELEASE_SNAPSHOT:
-            if (task.__isset.release_snapshot_req) {
-                _release_snapshot_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::MOVE:
-            if (task.__isset.move_dir_req) {
-                _move_dir_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::RECOVER_TABLET:
-            if (task.__isset.recover_tablet_req) {
-                _recover_tablet_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
-            }
-            break;
-        case TTaskType::UPDATE_TABLET_META_INFO:
-            if (task.__isset.update_tablet_meta_info_req) {
-                _update_tablet_meta_info_workers->submit_task(task);
-            } else {
-                status_code = TStatusCode::ANALYSIS_ERROR;
+                ret_st = Status::InvalidArgument(strings::Substitute(
+                        "task(signature=$0) has wrong request member", 
signature));
             }
             break;
         default:
-            status_code = TStatusCode::ANALYSIS_ERROR;
+            ret_st = Status::InvalidArgument(strings::Substitute(
+                    "task(signature=$0, type=$1) has wrong task type", 
signature, task_type));
             break;
         }
+#undef HANDLE_TYPE
 
-        if (status_code == TStatusCode::ANALYSIS_ERROR) {
-            OLAP_LOG_WARNING("task anaysis_error, signature: %ld", signature);
-            error_msgs.push_back("the task signature is:" + 
to_string(signature) + " has wrong request.");
+        if (!ret_st.ok()) {
+            LOG(WARNING) << "fail to submit task. reason: " << 
ret_st.get_error_msg()
+                    << ", task: " << task;
+            // For now, all tasks in the batch share one status, so if any task
+            // was failed to submit, we can only return error to FE(even when 
some
+            // tasks have already been successfully submitted).
+            // However, Fe does not check the return status of submit_tasks() 
currently,
+            // and it is not sure that FE will retry when something is wrong, 
so here we
+            // only print an warning log and go on(i.e. do not break current 
loop),
+            // to ensure every task can be submitted once. It is OK for now, 
because the
+            // ret_st can be error only when it encounters an wrong task_type 
and
+            // req-member in TAgentTaskRequest, which is basically impossible.
+            // TODO(lingbin): check the logic in FE again later.
         }
     }
 
-    return_value.status.__set_error_msgs(error_msgs);
-    return_value.status.__set_status_code(status_code);
+    ret_st.to_thrift(&agent_result.status);
 }
 
-void AgentServer::make_snapshot(TAgentResult& return_value,
-        const TSnapshotRequest& snapshot_request) {
-    TStatus status;
-    vector<string> error_msgs;
-    TStatusCode::type status_code = TStatusCode::OK;
-    
return_value.__set_snapshot_version(snapshot_request.preferred_snapshot_version);
+void AgentServer::make_snapshot(TAgentResult& t_agent_result,
+                                const TSnapshotRequest& snapshot_request) {
+    Status ret_st;
     string snapshot_path;
-    OLAPStatus make_snapshot_status =
+    OLAPStatus err_code =
             SnapshotManager::instance()->make_snapshot(snapshot_request, 
&snapshot_path);
-    if (make_snapshot_status != OLAP_SUCCESS) {
-        status_code = TStatusCode::RUNTIME_ERROR;
-        OLAP_LOG_WARNING("make_snapshot failed. tablet_id: %ld, schema_hash: 
%ld, status: %d",
-                         snapshot_request.tablet_id, 
snapshot_request.schema_hash,
-                         make_snapshot_status);
-        error_msgs.push_back("make_snapshot failed. status: " +
-                             
boost::lexical_cast<string>(make_snapshot_status));
+    if (err_code != OLAP_SUCCESS) {
+        LOG(WARNING) << "fail to make_snapshot. tablet_id=" << 
snapshot_request.tablet_id
+                     << ", schema_hash=" << snapshot_request.schema_hash
+                     << ", error_code=" << err_code;
+        ret_st = Status::RuntimeError(strings::Substitute(
+                    "fail to make_snapshot. err_code=$0", err_code));
     } else {
-        LOG(INFO) << "make_snapshot success. tablet_id: " << 
snapshot_request.tablet_id
-                  << " schema_hash: " << snapshot_request.schema_hash << " 
snapshot_path: " << snapshot_path;
-        return_value.__set_snapshot_path(snapshot_path);
+        LOG(INFO) << "success to make_snapshot. tablet_id=" << 
snapshot_request.tablet_id
+                  << ", schema_hash=" << snapshot_request.schema_hash
+                  << ", snapshot_path: " << snapshot_path;
+        t_agent_result.__set_snapshot_path(snapshot_path);
     }
 
-    status.__set_error_msgs(error_msgs);
-    status.__set_status_code(status_code);
-    return_value.__set_status(status);
+    ret_st.to_thrift(&t_agent_result.status);
+    
t_agent_result.__set_snapshot_version(snapshot_request.preferred_snapshot_version);
     if (snapshot_request.__isset.allow_incremental_clone) {
-        
return_value.__set_allow_incremental_clone(snapshot_request.allow_incremental_clone);
+        
t_agent_result.__set_allow_incremental_clone(snapshot_request.allow_incremental_clone);
     }
 }
 
-void AgentServer::release_snapshot(TAgentResult& return_value, const 
std::string& snapshot_path) {
-    vector<string> error_msgs;
-    TStatusCode::type status_code = TStatusCode::OK;
-
-    OLAPStatus release_snapshot_status =
-            SnapshotManager::instance()->release_snapshot(snapshot_path);
-    if (release_snapshot_status != OLAP_SUCCESS) {
-        status_code = TStatusCode::RUNTIME_ERROR;
-        LOG(WARNING) << "release_snapshot failed. snapshot_path: " << 
snapshot_path << ", status: " << release_snapshot_status;
-        error_msgs.push_back("release_snapshot failed. status: " +
-                             
boost::lexical_cast<string>(release_snapshot_status));
+void AgentServer::release_snapshot(TAgentResult& t_agent_result, const 
std::string& snapshot_path) {
+    Status ret_st;
+    OLAPStatus err_code = 
SnapshotManager::instance()->release_snapshot(snapshot_path);
+    if (err_code != OLAP_SUCCESS) {
+        LOG(WARNING) << "failt to release_snapshot. snapshot_path: " << 
snapshot_path
+                     << ", err_code: " << err_code;
+        ret_st = Status::RuntimeError(strings::Substitute(
+                    "fail to release_snapshot. err_code=$0", err_code));
     } else {
-        LOG(INFO) << "release_snapshot success. snapshot_path: " << 
snapshot_path << ", status: " << release_snapshot_status;
+        LOG(INFO) << "success to release_snapshot. snapshot_path=" << 
snapshot_path
+                  << ", err_code=" << err_code;
     }
-
-    return_value.status.__set_error_msgs(error_msgs);
-    return_value.status.__set_status_code(status_code);
+    ret_st.to_thrift(&t_agent_result.status);
 }
 
-void AgentServer::publish_cluster_state(TAgentResult& _return, const 
TAgentPublishRequest& request) {
-    vector<string> error_msgs;
-    _topic_subscriber->handle_updates(request);
-    _return.status.__set_status_code(TStatusCode::OK);
+void AgentServer::publish_cluster_state(TAgentResult& t_agent_result,
+                                        const TAgentPublishRequest& request) {
+    Status status = Status::NotSupported("deprecated 
method(publish_cluster_state) was invoked");
+    status.to_thrift(&t_agent_result.status);
 }
 
-void AgentServer::submit_etl_task(TAgentResult& return_value,
-                                 const TMiniLoadEtlTaskRequest& request) {
+void AgentServer::submit_etl_task(TAgentResult& t_agent_result,
+                                  const TMiniLoadEtlTaskRequest& request) {
     Status status = _exec_env->etl_job_mgr()->start_job(request);
+    auto fragment_instance_id = request.params.params.fragment_instance_id;
     if (status.ok()) {
-        VLOG_RPC << "start etl task successfull id="
-            << request.params.params.fragment_instance_id;
+        VLOG_RPC << "success to submit etl task. id=" << fragment_instance_id;
     } else {
-        VLOG_RPC << "start etl task failed id="
-            << request.params.params.fragment_instance_id
-            << " and err_msg=" << status.get_error_msg();
+        VLOG_RPC << "fail to submit etl task. id=" << fragment_instance_id
+                 << ", err_msg=" << status.get_error_msg();
     }
-    status.to_thrift(&return_value.status);
+    status.to_thrift(&t_agent_result.status);
 }
 
-void AgentServer::get_etl_status(TMiniLoadEtlStatusResult& return_value,
+void AgentServer::get_etl_status(TMiniLoadEtlStatusResult& t_agent_result,
                                  const TMiniLoadEtlStatusRequest& request) {
-    Status status = 
_exec_env->etl_job_mgr()->get_job_state(request.mini_load_id, &return_value);
+    Status status = 
_exec_env->etl_job_mgr()->get_job_state(request.mini_load_id, &t_agent_result);
     if (!status.ok()) {
-        LOG(WARNING) << "get job state failed. [id=" << request.mini_load_id 
<< "]";
-    } else {
-        VLOG_RPC << "get job state successful. [id=" << request.mini_load_id 
<< ",status="
-            << return_value.status.status_code << ",etl_state=" << 
return_value.etl_state
-            << ",files=";
-        for (auto& item : return_value.file_map) {
-            VLOG_RPC << item.first << ":" << item.second << ";";
-        }
-        VLOG_RPC << "]";
+        LOG(WARNING) << "fail to get job state. [id=" << request.mini_load_id 
<< "]";
+    }
+
+    VLOG_RPC << "success to get job state. [id=" << request.mini_load_id << ", 
status="
+        << t_agent_result.status.status_code << ", etl_state=" << 
t_agent_result.etl_state
+        << ", files=";
+    for (auto& item : t_agent_result.file_map) {
+        VLOG_RPC << item.first << ":" << item.second << ";";
     }
+    VLOG_RPC << "]";
 }
 
-void AgentServer::delete_etl_files(TAgentResult& result,
+void AgentServer::delete_etl_files(TAgentResult& t_agent_result,
                                    const TDeleteEtlFilesRequest& request) {
     Status status = _exec_env->etl_job_mgr()->erase_job(request);
     if (!status.ok()) {
-        LOG(WARNING) << "delete etl files failed. because " << 
status.get_error_msg()
+        LOG(WARNING) << "fail to delete etl files. because " << 
status.get_error_msg()
             << " with request " << request;
-    } else {
-        VLOG_RPC << "delete etl files successful with param " << request;
     }
-    status.to_thrift(&result.status);
+
+    VLOG_RPC << "success to delete etl files. request=" << request;
+    status.to_thrift(&t_agent_result.status);
 }
 
 }  // namesapce doris
diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h
index 5e84458..833de4d 100644
--- a/be/src/agent/agent_server.h
+++ b/be/src/agent/agent_server.h
@@ -18,105 +18,78 @@
 #ifndef DORIS_BE_SRC_AGENT_AGENT_SERVER_H
 #define DORIS_BE_SRC_AGENT_AGENT_SERVER_H
 
-#include "thrift/transport/TTransportUtils.h"
-#include "agent/status.h"
-#include "agent/task_worker_pool.h"
-#include "agent/topic_subscriber.h"
-#include "agent/utils.h"
+#include <memory>
+#include <string>
+#include <vector>
+
 #include "gen_cpp/AgentService_types.h"
-#include "gen_cpp/Types_types.h"
-#include "olap/olap_define.h"
-#include "olap/utils.h"
 #include "runtime/exec_env.h"
 
 namespace doris {
 
+class TaskWorkerPool;
+class TopicSubscriber;
+
+// Each method corresponds to one RPC from FE Master, see BackendService.
 class AgentServer {
 public:
     explicit AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info);
     ~AgentServer();
 
-    // Receive agent task from dm
-    // 
-    // Input parameters:
-    // * tasks: The list of agent tasks
-    //
-    // Output parameters:
-    // * return_value: The result of receive agent task,
-    //                 contains return code and error messages.
-    void submit_tasks(
-            TAgentResult& return_value,
-            const std::vector<TAgentTaskRequest>& tasks);
-    
-    // Make a snapshot for a local tablet
-    //
-    // Input parameters:
-    // * tablet_id: The tablet id of local tablet.
-    // * schema_hash: The schema hash of local tablet
-    //
-    // Output parameters:
-    // * return_value: The result of make snapshot,
-    //                 contains return code and error messages.
-    void make_snapshot(
-            TAgentResult& return_value,
-            const TSnapshotRequest& snapshot_request);
-
-    // Release useless snapshot
-    //
-    // Input parameters:
-    // * snapshot_path: local useless snapshot path
-    //
-    // Output parameters:
-    // * return_value: The result of release snapshot,
-    //                 contains return code and error messages.
-    void release_snapshot(TAgentResult& return_value, const std::string& 
snapshot_path);
-
-    // Publish state to agent
-    //
-    // Input parameters:
-    //   request:  
-    void publish_cluster_state(TAgentResult& return_value, 
-                               const TAgentPublishRequest& request);
-
-    // Master call this rpc to submit a etl task
-    void submit_etl_task(TAgentResult& return_value, 
-                         const TMiniLoadEtlTaskRequest& request);
-
-    // Master call this rpc to fetch status of elt task
-    void get_etl_status(TMiniLoadEtlStatusResult& return_value,
-                        const TMiniLoadEtlStatusRequest& request);
+    // Receive agent task from FE master
+    void submit_tasks(TAgentResult& agent_result, const 
std::vector<TAgentTaskRequest>& tasks);
+
+    // TODO(lingbin): make the agent_result to be a pointer, because it will 
be modified.
+    void make_snapshot(TAgentResult& agent_result, const TSnapshotRequest& 
snapshot_request);
+    void release_snapshot(TAgentResult& agent_result, const std::string& 
snapshot_path);
 
-    void delete_etl_files(TAgentResult& result, 
-                          const TDeleteEtlFilesRequest& request);
+    // Deprected
+    // TODO(lingbin): This method is deprecated, should be removed later.
+    void publish_cluster_state(TAgentResult& agent_result, const 
TAgentPublishRequest& request);
+
+    // Multi-Load will still use the following 3 methods for now.
+    void submit_etl_task(TAgentResult& agent_result, const 
TMiniLoadEtlTaskRequest& request);
+    void get_etl_status(TMiniLoadEtlStatusResult& agent_result,
+                        const TMiniLoadEtlStatusRequest& request);
+    void delete_etl_files(TAgentResult& result, const TDeleteEtlFilesRequest& 
request);
 
 private:
+    DISALLOW_COPY_AND_ASSIGN(AgentServer);
+
+    // Not Owned
     ExecEnv* _exec_env;
+    // Reference to the ExecEnv::_master_info
     const TMasterInfo& _master_info;
 
-    TaskWorkerPool* _create_tablet_workers;
-    TaskWorkerPool* _drop_tablet_workers;
-    TaskWorkerPool* _push_workers;
-    TaskWorkerPool* _publish_version_workers;
-    TaskWorkerPool* _clear_transaction_task_workers;
-    TaskWorkerPool* _delete_workers;
-    TaskWorkerPool* _alter_tablet_workers;
-    TaskWorkerPool* _clone_workers;
-    TaskWorkerPool* _storage_medium_migrate_workers;
-    TaskWorkerPool* _check_consistency_workers;
-    TaskWorkerPool* _report_task_workers;
-    TaskWorkerPool* _report_disk_state_workers;
-    TaskWorkerPool* _report_tablet_workers;
-    TaskWorkerPool* _upload_workers;
-    TaskWorkerPool* _download_workers;
-    TaskWorkerPool* _make_snapshot_workers;
-    TaskWorkerPool* _release_snapshot_workers;
-    TaskWorkerPool* _move_dir_workers;
-    TaskWorkerPool* _recover_tablet_workers;
-    TaskWorkerPool* _update_tablet_meta_info_workers;
+    std::unique_ptr<TaskWorkerPool> _create_tablet_workers;
+    std::unique_ptr<TaskWorkerPool> _drop_tablet_workers;
+    std::unique_ptr<TaskWorkerPool> _push_workers;
+    std::unique_ptr<TaskWorkerPool> _publish_version_workers;
+    std::unique_ptr<TaskWorkerPool> _clear_transaction_task_workers;
+    std::unique_ptr<TaskWorkerPool> _delete_workers;
+    std::unique_ptr<TaskWorkerPool> _alter_tablet_workers;
+    std::unique_ptr<TaskWorkerPool> _clone_workers;
+    std::unique_ptr<TaskWorkerPool> _storage_medium_migrate_workers;
+    std::unique_ptr<TaskWorkerPool> _check_consistency_workers;
+
+    // These 3 worker-pool do not accept tasks from FE.
+    // It is self triggered periodically and reports to Fe master
+    std::unique_ptr<TaskWorkerPool> _report_task_workers;
+    std::unique_ptr<TaskWorkerPool> _report_disk_state_workers;
+    std::unique_ptr<TaskWorkerPool> _report_tablet_workers;
+
+    std::unique_ptr<TaskWorkerPool> _upload_workers;
+    std::unique_ptr<TaskWorkerPool> _download_workers;
+    std::unique_ptr<TaskWorkerPool> _make_snapshot_workers;
+    std::unique_ptr<TaskWorkerPool> _release_snapshot_workers;
+    std::unique_ptr<TaskWorkerPool> _move_dir_workers;
+    std::unique_ptr<TaskWorkerPool> _recover_tablet_workers;
+    std::unique_ptr<TaskWorkerPool> _update_tablet_meta_info_workers;
+
+    std::unique_ptr<TopicSubscriber> _topic_subscriber;
+};
+
+}  // end namespace doris
 
-    DISALLOW_COPY_AND_ASSIGN(AgentServer);
-    
-    TopicSubscriber* _topic_subscriber;   
-};  // class AgentServer
-}  // namespace doris
 #endif  // DORIS_BE_SRC_AGENT_AGENT_SERVER_H
+
diff --git a/be/src/agent/heartbeat_server.h b/be/src/agent/heartbeat_server.h
index 3df17f0..c43dea0 100644
--- a/be/src/agent/heartbeat_server.h
+++ b/be/src/agent/heartbeat_server.h
@@ -52,13 +52,13 @@ public:
     virtual void heartbeat(THeartbeatResult& heartbeat_result, const 
TMasterInfo& master_info);
 
 private:
-    Status _heartbeat(
-        const TMasterInfo& master_info);
+    Status _heartbeat(const TMasterInfo& master_info);
 
     StorageEngine* _olap_engine;
 
     // mutex to protect master_info and _epoch
     std::mutex _hb_mtx;
+    // Not owned. Point to the ExecEnv::_master_info
     TMasterInfo* _master_info;
     int64_t _epoch;
 
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index 349bf7d..041d8af 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -43,10 +43,12 @@ public:
         PUSH,
         REALTIME_PUSH,
         PUBLISH_VERSION,
+        // Deprecated
         CLEAR_ALTER_TASK,
         CLEAR_TRANSACTION_TASK,
         DELETE,
         ALTER_TABLE,
+        // Deprecated
         QUERY_SPLIT_KEY,
         CLONE,
         STORAGE_MEDIUM_MIGRATE,
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index 0488818..051bb75 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -50,6 +50,7 @@
 #include "agent/topic_subscriber.h"
 #include "util/doris_metrics.h"
 #include "olap/options.h"
+#include "olap/storage_engine.h"
 #include "service/backend_options.h"
 #include "service/backend_service.h"
 #include "service/brpc_service.h"
diff --git a/gensrc/thrift/AgentService.thrift 
b/gensrc/thrift/AgentService.thrift
index ed30cba..f1808e0 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -56,7 +56,8 @@ struct TCreateTabletReq {
     1: required Types.TTabletId tablet_id
     2: required TTabletSchema tablet_schema
     3: optional Types.TVersion version
-    4: optional Types.TVersionHash version_hash // Deprecated
+    // Deprecated
+    4: optional Types.TVersionHash version_hash 
     5: optional Types.TStorageMedium storage_medium
     6: optional bool in_restore_mode
     // this new tablet should be colocate with base tablet


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to