This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 2a37230a654336539417ee2ff1db05409252bf9d Author: Xin Liao <liaoxin...@126.com> AuthorDate: Fri Jul 26 10:16:20 2024 +0800 [fix](cloud) Ensure that the eror log has been uploaded to S3 before returning an error URL to the user (#38359) --- be/src/runtime/runtime_state.cpp | 48 ++++++++++++++++++---------------------- be/src/runtime/runtime_state.h | 4 ++-- 2 files changed, 23 insertions(+), 29 deletions(-) diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 48d71a64eb2..5471a01c246 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -25,6 +25,7 @@ #include <gen_cpp/Types_types.h> #include <glog/logging.h> +#include <fstream> #include <memory> #include <string> @@ -70,7 +71,6 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params, _num_finished_scan_range(0), _normal_row_number(0), _error_row_number(0), - _error_log_file(nullptr), _query_ctx(ctx) { Status status = init(fragment_exec_params.fragment_instance_id, query_options, query_globals, exec_env); @@ -117,7 +117,6 @@ RuntimeState::RuntimeState(const TUniqueId& instance_id, const TUniqueId& query_ _num_finished_scan_range(0), _normal_row_number(0), _error_row_number(0), - _error_log_file(nullptr), _query_ctx(ctx) { [[maybe_unused]] auto status = init(instance_id, query_options, query_globals, exec_env); DCHECK(status.ok()); @@ -153,7 +152,6 @@ RuntimeState::RuntimeState(pipeline::PipelineFragmentContext*, const TUniqueId& _num_finished_scan_range(0), _normal_row_number(0), _error_row_number(0), - _error_log_file(nullptr), _query_ctx(ctx) { [[maybe_unused]] auto status = init(instance_id, query_options, query_globals, exec_env); _query_mem_tracker = ctx->query_mem_tracker; @@ -185,7 +183,6 @@ RuntimeState::RuntimeState(const TUniqueId& query_id, int32_t fragment_id, _num_finished_scan_range(0), _normal_row_number(0), _error_row_number(0), - _error_log_file(nullptr), _query_ctx(ctx) { // TODO: do we really need instance id? Status status = init(TUniqueId(), query_options, query_globals, exec_env); @@ -255,22 +252,6 @@ RuntimeState::~RuntimeState() { // close error log file if (_error_log_file != nullptr && _error_log_file->is_open()) { _error_log_file->close(); - delete _error_log_file; - _error_log_file = nullptr; - if (_s3_error_fs) { - std::string error_log_absolute_path = - _exec_env->load_path_mgr()->get_load_error_absolute_path(_error_log_file_path); - // upload error log file to s3 - Status st = _s3_error_fs->upload(error_log_absolute_path, _s3_error_log_file_path); - if (st.ok()) { - // remove local error log file - std::filesystem::remove(error_log_absolute_path); - } else { - // remove local error log file later by clean_expired_temp_path thread - LOG(WARNING) << "Fail to upload error file to s3, error_log_file_path=" - << _error_log_file_path << ", error=" << st; - } - } } _obj_pool->clear(); @@ -394,7 +375,7 @@ Status RuntimeState::create_error_log_file() { _db_name, _import_label, _fragment_instance_id, &_error_log_file_path)); std::string error_log_absolute_path = _exec_env->load_path_mgr()->get_load_error_absolute_path(_error_log_file_path); - _error_log_file = new std::ofstream(error_log_absolute_path, std::ifstream::out); + _error_log_file = std::make_unique<std::ofstream>(error_log_absolute_path, std::ifstream::out); if (!_error_log_file->is_open()) { std::stringstream error_msg; error_msg << "Fail to open error file: [" << _error_log_file_path << "]."; @@ -420,8 +401,6 @@ Status RuntimeState::append_error_msg_to_file(std::function<std::string()> line, LOG(WARNING) << "Create error file log failed. because: " << status; if (_error_log_file != nullptr) { _error_log_file->close(); - delete _error_log_file; - _error_log_file = nullptr; } return status; } @@ -464,13 +443,28 @@ Status RuntimeState::append_error_msg_to_file(std::function<std::string()> line, return Status::OK(); } -std::string RuntimeState::get_error_log_file_path() const { - if (_s3_error_fs) { +std::string RuntimeState::get_error_log_file_path() { + if (_s3_error_fs && _error_log_file && _error_log_file->is_open()) { + // close error log file + _error_log_file->close(); + std::string error_log_absolute_path = + _exec_env->load_path_mgr()->get_load_error_absolute_path(_error_log_file_path); + // upload error log file to s3 + Status st = _s3_error_fs->upload(error_log_absolute_path, _s3_error_log_file_path); + if (st.ok()) { + // remove local error log file + std::filesystem::remove(error_log_absolute_path); + } else { + // upload failed and return local error log file path + LOG(WARNING) << "Fail to upload error file to s3, error_log_file_path=" + << _error_log_file_path << ", error=" << st; + return _error_log_file_path; + } // expiration must be less than a week (in seconds) for presigned url static const unsigned EXPIRATION_SECONDS = 7 * 24 * 60 * 60 - 1; // We should return a public endpoint to user. - return _s3_error_fs->generate_presigned_url(_s3_error_log_file_path, EXPIRATION_SECONDS, - true); + _error_log_file_path = _s3_error_fs->generate_presigned_url(_s3_error_log_file_path, + EXPIRATION_SECONDS, true); } return _error_log_file_path; } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 1bfd4348d18..ec812fffed8 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -263,7 +263,7 @@ public: int64_t load_job_id() const { return _load_job_id; } - std::string get_error_log_file_path() const; + std::string get_error_log_file_path(); // append error msg and error line to file when loading data. // is_summary is true, means we are going to write the summary line @@ -710,7 +710,7 @@ private: int64_t _normal_row_number; int64_t _error_row_number; std::string _error_log_file_path; - std::ofstream* _error_log_file = nullptr; // error file path, absolute path + std::unique_ptr<std::ofstream> _error_log_file; // error file path, absolute path std::vector<TTabletCommitInfo> _tablet_commit_infos; std::vector<TErrorTabletInfo> _error_tablet_infos; int _max_operator_id = 0; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org