lingbin commented on a change in pull request #2831: Refactor `AgentServer` to make it less error-prone and more readable URL: https://github.com/apache/incubator-doris/pull/2831#discussion_r375334190
########## File path: be/src/agent/agent_server.cpp ########## @@ -67,438 +47,236 @@ AgentServer::AgentServer(ExecEnv* exec_env, boost::filesystem::remove_all(dpp_download_path); } } catch (...) { - LOG(WARNING) << "boost exception when remove dpp download path. path=" - << path.path; + LOG(WARNING) << "boost exception when remove dpp download path. path=" << path.path; } } - // init task worker pool - _create_tablet_workers = new TaskWorkerPool( - TaskWorkerPool::TaskWorkerType::CREATE_TABLE, - _exec_env, - master_info); - _drop_tablet_workers = new TaskWorkerPool( - TaskWorkerPool::TaskWorkerType::DROP_TABLE, - _exec_env, - master_info); - _push_workers = new TaskWorkerPool( - TaskWorkerPool::TaskWorkerType::PUSH, - _exec_env, - master_info); - _publish_version_workers = new TaskWorkerPool( - TaskWorkerPool::TaskWorkerType::PUBLISH_VERSION, - _exec_env, - master_info); - _clear_transaction_task_workers = new TaskWorkerPool( - TaskWorkerPool::TaskWorkerType::CLEAR_TRANSACTION_TASK, - exec_env, - master_info); - _delete_workers = new TaskWorkerPool( - TaskWorkerPool::TaskWorkerType::DELETE, - _exec_env, - master_info); - _alter_tablet_workers = new TaskWorkerPool( - TaskWorkerPool::TaskWorkerType::ALTER_TABLE, - _exec_env, - master_info); - _clone_workers = new TaskWorkerPool( - TaskWorkerPool::TaskWorkerType::CLONE, - _exec_env, - master_info); - _storage_medium_migrate_workers = new TaskWorkerPool( - TaskWorkerPool::TaskWorkerType::STORAGE_MEDIUM_MIGRATE, - _exec_env, - master_info); - _check_consistency_workers = new TaskWorkerPool( - TaskWorkerPool::TaskWorkerType::CHECK_CONSISTENCY, - _exec_env, - master_info); - _report_task_workers = new TaskWorkerPool( - TaskWorkerPool::TaskWorkerType::REPORT_TASK, - _exec_env, - master_info); - _report_disk_state_workers = new TaskWorkerPool( - TaskWorkerPool::TaskWorkerType::REPORT_DISK_STATE, - _exec_env, - master_info); - _report_tablet_workers = new TaskWorkerPool( - TaskWorkerPool::TaskWorkerType::REPORT_OLAP_TABLE, - _exec_env, - master_info); - _upload_workers = new TaskWorkerPool( - TaskWorkerPool::TaskWorkerType::UPLOAD, - _exec_env, - master_info); - _download_workers = new TaskWorkerPool( - TaskWorkerPool::TaskWorkerType::DOWNLOAD, - _exec_env, - master_info); - _make_snapshot_workers = new TaskWorkerPool( - TaskWorkerPool::TaskWorkerType::MAKE_SNAPSHOT, - _exec_env, - master_info); - _release_snapshot_workers = new TaskWorkerPool( - TaskWorkerPool::TaskWorkerType::RELEASE_SNAPSHOT, - _exec_env, - master_info); - _move_dir_workers= new TaskWorkerPool( - TaskWorkerPool::TaskWorkerType::MOVE, - _exec_env, - master_info); - _recover_tablet_workers = new TaskWorkerPool( - TaskWorkerPool::TaskWorkerType::RECOVER_TABLET, - _exec_env, - master_info); - _update_tablet_meta_info_workers = new TaskWorkerPool( - TaskWorkerPool::TaskWorkerType::UPDATE_TABLET_META_INFO, - _exec_env, - master_info); + // It is the same code to create workers of each type, so we use a macro + // to make code to be more readable. + +#ifndef BE_TEST +#define CREATE_AND_START_POOL(type, pool_name) \ + pool_name.reset(new TaskWorkerPool( \ + TaskWorkerPool::TaskWorkerType::type, \ + _exec_env, \ + master_info)); \ + pool_name->start(); +#else +#define CREATE_AND_START_POOL(type, pool_name) +#endif // BE_TEST + + CREATE_AND_START_POOL(CREATE_TABLE, _create_tablet_workers); + CREATE_AND_START_POOL(DROP_TABLE, _drop_tablet_workers); + // Both PUSH and REALTIME_PUSH type use _push_workers + CREATE_AND_START_POOL(PUSH, _push_workers); + CREATE_AND_START_POOL(PUBLISH_VERSION, _publish_version_workers); + CREATE_AND_START_POOL(CLEAR_TRANSACTION_TASK, _clear_transaction_task_workers); + CREATE_AND_START_POOL(DELETE, _delete_workers); + CREATE_AND_START_POOL(ALTER_TABLE, _alter_tablet_workers); + CREATE_AND_START_POOL(CLONE, _clone_workers); + CREATE_AND_START_POOL(STORAGE_MEDIUM_MIGRATE, _storage_medium_migrate_workers); + CREATE_AND_START_POOL(CHECK_CONSISTENCY, _check_consistency_workers); + CREATE_AND_START_POOL(REPORT_TASK, _report_task_workers); + CREATE_AND_START_POOL(REPORT_DISK_STATE, _report_disk_state_workers); + CREATE_AND_START_POOL(REPORT_OLAP_TABLE, _report_tablet_workers); + CREATE_AND_START_POOL(UPLOAD, _upload_workers); + CREATE_AND_START_POOL(DOWNLOAD, _download_workers); + CREATE_AND_START_POOL(MAKE_SNAPSHOT, _make_snapshot_workers); + CREATE_AND_START_POOL(RELEASE_SNAPSHOT, _release_snapshot_workers); + CREATE_AND_START_POOL(MOVE, _move_dir_workers); + CREATE_AND_START_POOL(RECOVER_TABLET, _recover_tablet_workers); + CREATE_AND_START_POOL(UPDATE_TABLET_META_INFO, _update_tablet_meta_info_workers); +#undef CREATE_AND_START_POOL + #ifndef BE_TEST - _create_tablet_workers->start(); - _drop_tablet_workers->start(); - _push_workers->start(); - _publish_version_workers->start(); - _clear_transaction_task_workers->start(); - _delete_workers->start(); - _alter_tablet_workers->start(); - _clone_workers->start(); - _storage_medium_migrate_workers->start(); - _check_consistency_workers->start(); - _report_task_workers->start(); - _report_disk_state_workers->start(); - _report_tablet_workers->start(); - _upload_workers->start(); - _download_workers->start(); - _make_snapshot_workers->start(); - _release_snapshot_workers->start(); - _move_dir_workers->start(); - _recover_tablet_workers->start(); - _update_tablet_meta_info_workers->start(); // Add subscriber here and register listeners TopicListener* user_resource_listener = new UserResourceListener(exec_env, master_info); LOG(INFO) << "Register user resource listener"; _topic_subscriber->register_listener(doris::TTopicType::type::RESOURCE, user_resource_listener); #endif } -AgentServer::~AgentServer() { - if (_create_tablet_workers != NULL) { - delete _create_tablet_workers; - } - if (_drop_tablet_workers != NULL) { - delete _drop_tablet_workers; - } - if (_push_workers != NULL) { - delete _push_workers; - } - if (_publish_version_workers != NULL) { - delete _publish_version_workers; - } - if (_clear_transaction_task_workers != NULL) { - delete _clear_transaction_task_workers; - } - if (_delete_workers != NULL) { - delete _delete_workers; - } - if (_alter_tablet_workers != NULL) { - delete _alter_tablet_workers; - } - if (_clone_workers != NULL) { - delete _clone_workers; - } - if (_storage_medium_migrate_workers != NULL) { - delete _storage_medium_migrate_workers; - } - if (_check_consistency_workers != NULL) { - delete _check_consistency_workers; - } - if (_report_task_workers != NULL) { - delete _report_task_workers; - } - if (_report_disk_state_workers != NULL) { - delete _report_disk_state_workers; - } - if (_report_tablet_workers != NULL) { - delete _report_tablet_workers; - } - if (_upload_workers != NULL) { - delete _upload_workers; - } - if (_download_workers != NULL) { - delete _download_workers; - } - if (_make_snapshot_workers != NULL) { - delete _make_snapshot_workers; - } - if (_move_dir_workers!= NULL) { - delete _move_dir_workers; - } - if (_recover_tablet_workers != NULL) { - delete _recover_tablet_workers; - } - - if (_update_tablet_meta_info_workers != NULL) { - delete _update_tablet_meta_info_workers; - } - if (_release_snapshot_workers != NULL) { - delete _release_snapshot_workers; - } - if (_topic_subscriber !=NULL) { - delete _topic_subscriber; - } -} - -void AgentServer::submit_tasks( - TAgentResult& return_value, - const vector<TAgentTaskRequest>& tasks) { +AgentServer::~AgentServer() { } - // Set result to dm - vector<string> error_msgs; - TStatusCode::type status_code = TStatusCode::OK; +void AgentServer::submit_tasks(TAgentResult& agent_result, const vector<TAgentTaskRequest>& tasks) { + Status ret_st; - // TODO check require master same to heartbeat master - if (_master_info.network_address.hostname == "" - || _master_info.network_address.port == 0) { - error_msgs.push_back("Not get master heartbeat yet."); - return_value.status.__set_error_msgs(error_msgs); - return_value.status.__set_status_code(TStatusCode::CANCELLED); + // TODO check master_info here if it is the same with that of heartbeat rpc + if (_master_info.network_address.hostname == "" || _master_info.network_address.port == 0) { + Status ret_st = Status::Cancelled("Have not get FE Master heartbeat yet"); + ret_st.to_thrift(&agent_result.status); return; } for (auto task : tasks) { + VLOG_RPC << "submit one task: " << apache::thrift::ThriftDebugString(task).c_str(); TTaskType::type task_type = task.task_type; int64_t signature = task.signature; +#define HANDLE_TYPE(t_task_type, work_pool, req_member) \ + case t_task_type: \ + if (task.__isset.req_member) { \ + work_pool->submit_task(task); \ + } \ + ret_st = Status::InvalidArgument(strings::Substitute( \ + "task(signature=$0) has wrong request member", signature)); \ + break; + + // TODO(lingbin): divided these task types into several categories switch (task_type) { - case TTaskType::CREATE: - if (task.__isset.create_tablet_req) { - _create_tablet_workers->submit_task(task); - } else { - status_code = TStatusCode::ANALYSIS_ERROR; - } - break; - case TTaskType::DROP: - if (task.__isset.drop_tablet_req) { - _drop_tablet_workers->submit_task(task); - } else { - status_code = TStatusCode::ANALYSIS_ERROR; - } - break; + HANDLE_TYPE(TTaskType::CREATE, _create_tablet_workers, create_tablet_req); + HANDLE_TYPE(TTaskType::DROP, _drop_tablet_workers, drop_tablet_req); + HANDLE_TYPE(TTaskType::PUBLISH_VERSION, _publish_version_workers, publish_version_req); + HANDLE_TYPE(TTaskType::CLEAR_TRANSACTION_TASK, + _clear_transaction_task_workers, + clear_transaction_task_req); + HANDLE_TYPE(TTaskType::CLONE, _clone_workers, clone_req); + HANDLE_TYPE(TTaskType::STORAGE_MEDIUM_MIGRATE, + _storage_medium_migrate_workers, + storage_medium_migrate_req); + HANDLE_TYPE(TTaskType::CHECK_CONSISTENCY, + _check_consistency_workers, + check_consistency_req); + HANDLE_TYPE(TTaskType::UPLOAD, _upload_workers, upload_req); + HANDLE_TYPE(TTaskType::DOWNLOAD, _download_workers, download_req); + HANDLE_TYPE(TTaskType::MAKE_SNAPSHOT, _make_snapshot_workers, snapshot_req); + HANDLE_TYPE(TTaskType::RELEASE_SNAPSHOT, _release_snapshot_workers, release_snapshot_req); + HANDLE_TYPE(TTaskType::MOVE, _move_dir_workers, move_dir_req); + HANDLE_TYPE(TTaskType::RECOVER_TABLET, _recover_tablet_workers, recover_tablet_req); + HANDLE_TYPE(TTaskType::UPDATE_TABLET_META_INFO, + _update_tablet_meta_info_workers, + update_tablet_meta_info_req); + case TTaskType::REALTIME_PUSH: case TTaskType::PUSH: - if (task.__isset.push_req) { - if (task.push_req.push_type == TPushType::LOAD - || task.push_req.push_type == TPushType::LOAD_DELETE) { - _push_workers->submit_task(task); - } else if (task.push_req.push_type == TPushType::DELETE) { - _delete_workers->submit_task(task); - } else { - status_code = TStatusCode::ANALYSIS_ERROR; - } - } else { - status_code = TStatusCode::ANALYSIS_ERROR; + if (!task.__isset.push_req) { + ret_st = Status::InvalidArgument(strings::Substitute( + "task(signature=$0) has wrong request member", signature)); + break; } - break; - case TTaskType::PUBLISH_VERSION: - if (task.__isset.publish_version_req) { - _publish_version_workers->submit_task(task); + if (task.push_req.push_type == TPushType::LOAD + || task.push_req.push_type == TPushType::LOAD_DELETE) { + _push_workers->submit_task(task); + } else if (task.push_req.push_type == TPushType::DELETE) { + _delete_workers->submit_task(task); } else { - status_code = TStatusCode::ANALYSIS_ERROR; - } - break; - case TTaskType::CLEAR_TRANSACTION_TASK: - if (task.__isset.clear_transaction_task_req) { - _clear_transaction_task_workers->submit_task(task); - } else { - status_code = TStatusCode::ANALYSIS_ERROR; + ret_st = Status::InvalidArgument(strings::Substitute( + "task(signature=$0, type=$1, push_type=$2) has wrong push_type", + signature, task_type, task.push_req.push_type)); } break; case TTaskType::ALTER: if (task.__isset.alter_tablet_req || task.__isset.alter_tablet_req_v2) { _alter_tablet_workers->submit_task(task); } else { - status_code = TStatusCode::ANALYSIS_ERROR; - } - break; - case TTaskType::CLONE: - if (task.__isset.clone_req) { - _clone_workers->submit_task(task); - } else { - status_code = TStatusCode::ANALYSIS_ERROR; - } - break; - case TTaskType::STORAGE_MEDIUM_MIGRATE: - if (task.__isset.storage_medium_migrate_req) { - _storage_medium_migrate_workers->submit_task(task); - } else { - status_code = TStatusCode::ANALYSIS_ERROR; - } - break; - case TTaskType::CHECK_CONSISTENCY: - if (task.__isset.check_consistency_req) { - _check_consistency_workers->submit_task(task); - } else { - status_code = TStatusCode::ANALYSIS_ERROR; - } - break; - case TTaskType::UPLOAD: - if (task.__isset.upload_req) { - _upload_workers->submit_task(task); - } else { - status_code = TStatusCode::ANALYSIS_ERROR; - } - break; - case TTaskType::DOWNLOAD: - if (task.__isset.download_req) { - _download_workers->submit_task(task); - } else { - status_code = TStatusCode::ANALYSIS_ERROR; - } - break; - case TTaskType::MAKE_SNAPSHOT: - if (task.__isset.snapshot_req) { - _make_snapshot_workers->submit_task(task); - } else { - status_code = TStatusCode::ANALYSIS_ERROR; - } - break; - case TTaskType::RELEASE_SNAPSHOT: - if (task.__isset.release_snapshot_req) { - _release_snapshot_workers->submit_task(task); - } else { - status_code = TStatusCode::ANALYSIS_ERROR; - } - break; - case TTaskType::MOVE: - if (task.__isset.move_dir_req) { - _move_dir_workers->submit_task(task); - } else { - status_code = TStatusCode::ANALYSIS_ERROR; - } - break; - case TTaskType::RECOVER_TABLET: - if (task.__isset.recover_tablet_req) { - _recover_tablet_workers->submit_task(task); - } else { - status_code = TStatusCode::ANALYSIS_ERROR; - } - break; - case TTaskType::UPDATE_TABLET_META_INFO: - if (task.__isset.update_tablet_meta_info_req) { - _update_tablet_meta_info_workers->submit_task(task); - } else { - status_code = TStatusCode::ANALYSIS_ERROR; + ret_st = Status::InvalidArgument(strings::Substitute( + "task(signature=$0) has wrong request member", signature)); } break; default: - status_code = TStatusCode::ANALYSIS_ERROR; + ret_st = Status::InvalidArgument(strings::Substitute( + "task(signature=$0, type=$1) has wrong task type", signature, task_type)); break; } +#undef HANDLE_TYPE - if (status_code == TStatusCode::ANALYSIS_ERROR) { - OLAP_LOG_WARNING("task anaysis_error, signature: %ld", signature); - error_msgs.push_back("the task signature is:" + to_string(signature) + " has wrong request."); + if (!ret_st.ok()) { + LOG(WARNING) << "fail to submit task. reason: " << ret_st.get_error_msg() + << ", task: " << task; + break; } } - return_value.status.__set_error_msgs(error_msgs); - return_value.status.__set_status_code(status_code); + ret_st.to_thrift(&agent_result.status); } -void AgentServer::make_snapshot(TAgentResult& return_value, - const TSnapshotRequest& snapshot_request) { - TStatus status; - vector<string> error_msgs; - TStatusCode::type status_code = TStatusCode::OK; - return_value.__set_snapshot_version(snapshot_request.preferred_snapshot_version); +void AgentServer::make_snapshot(TAgentResult& t_agent_result, + const TSnapshotRequest& snapshot_request) { + Status ret_st; string snapshot_path; - OLAPStatus make_snapshot_status = + OLAPStatus err_code = SnapshotManager::instance()->make_snapshot(snapshot_request, &snapshot_path); - if (make_snapshot_status != OLAP_SUCCESS) { - status_code = TStatusCode::RUNTIME_ERROR; - OLAP_LOG_WARNING("make_snapshot failed. tablet_id: %ld, schema_hash: %ld, status: %d", - snapshot_request.tablet_id, snapshot_request.schema_hash, - make_snapshot_status); - error_msgs.push_back("make_snapshot failed. status: " + - boost::lexical_cast<string>(make_snapshot_status)); + if (err_code != OLAP_SUCCESS) { + LOG(WARNING) << "fail to make_snapshot. tablet_id=" << snapshot_request.tablet_id + << ", schema_hash=" << snapshot_request.schema_hash + << ", error_code=" << err_code; + ret_st = Status::RuntimeError(strings::Substitute( + "fail to make_snapshot. err_code=$0", err_code)); } else { - LOG(INFO) << "make_snapshot success. tablet_id: " << snapshot_request.tablet_id - << " schema_hash: " << snapshot_request.schema_hash << " snapshot_path: " << snapshot_path; - return_value.__set_snapshot_path(snapshot_path); + LOG(INFO) << "success to make_snapshot. tablet_id=" << snapshot_request.tablet_id + << ", schema_hash=" << snapshot_request.schema_hash + << ", snapshot_path: " << snapshot_path; + t_agent_result.__set_snapshot_path(snapshot_path); } - status.__set_error_msgs(error_msgs); - status.__set_status_code(status_code); - return_value.__set_status(status); + ret_st.to_thrift(&t_agent_result.status); + t_agent_result.__set_snapshot_version(snapshot_request.preferred_snapshot_version); if (snapshot_request.__isset.allow_incremental_clone) { - return_value.__set_allow_incremental_clone(snapshot_request.allow_incremental_clone); + t_agent_result.__set_allow_incremental_clone(snapshot_request.allow_incremental_clone); } } -void AgentServer::release_snapshot(TAgentResult& return_value, const std::string& snapshot_path) { - vector<string> error_msgs; - TStatusCode::type status_code = TStatusCode::OK; - - OLAPStatus release_snapshot_status = - SnapshotManager::instance()->release_snapshot(snapshot_path); - if (release_snapshot_status != OLAP_SUCCESS) { - status_code = TStatusCode::RUNTIME_ERROR; - LOG(WARNING) << "release_snapshot failed. snapshot_path: " << snapshot_path << ", status: " << release_snapshot_status; - error_msgs.push_back("release_snapshot failed. status: " + - boost::lexical_cast<string>(release_snapshot_status)); +void AgentServer::release_snapshot(TAgentResult& t_agent_result, const std::string& snapshot_path) { + Status ret_st; + OLAPStatus err_code = SnapshotManager::instance()->release_snapshot(snapshot_path); + if (err_code != OLAP_SUCCESS) { + LOG(WARNING) << "failt to release_snapshot. snapshot_path: " << snapshot_path + << ", err_code: " << err_code; + ret_st = Status::RuntimeError(strings::Substitute( + "fail to release_snapshot. err_code=$0", err_code)); } else { - LOG(INFO) << "release_snapshot success. snapshot_path: " << snapshot_path << ", status: " << release_snapshot_status; + LOG(INFO) << "success to release_snapshot. snapshot_path=" << snapshot_path + << ", err_code=" << err_code; } - - return_value.status.__set_error_msgs(error_msgs); - return_value.status.__set_status_code(status_code); + ret_st.to_thrift(&t_agent_result.status); } -void AgentServer::publish_cluster_state(TAgentResult& _return, const TAgentPublishRequest& request) { - vector<string> error_msgs; +// TODO(lingbin): always return OK? +void AgentServer::publish_cluster_state(TAgentResult& t_agent_result, Review comment: Unfortunately, `Multi Load` will use these `xxx_etl_xxx()` methods, so they will be kept. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org