This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 2a37230a654336539417ee2ff1db05409252bf9d
Author: Xin Liao <liaoxin...@126.com>
AuthorDate: Fri Jul 26 10:16:20 2024 +0800

    [fix](cloud) Ensure that the eror log has been uploaded to S3 before 
returning an error URL to the user (#38359)
---
 be/src/runtime/runtime_state.cpp | 48 ++++++++++++++++++----------------------
 be/src/runtime/runtime_state.h   |  4 ++--
 2 files changed, 23 insertions(+), 29 deletions(-)

diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 48d71a64eb2..5471a01c246 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -25,6 +25,7 @@
 #include <gen_cpp/Types_types.h>
 #include <glog/logging.h>
 
+#include <fstream>
 #include <memory>
 #include <string>
 
@@ -70,7 +71,6 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& 
fragment_exec_params,
           _num_finished_scan_range(0),
           _normal_row_number(0),
           _error_row_number(0),
-          _error_log_file(nullptr),
           _query_ctx(ctx) {
     Status status =
             init(fragment_exec_params.fragment_instance_id, query_options, 
query_globals, exec_env);
@@ -117,7 +117,6 @@ RuntimeState::RuntimeState(const TUniqueId& instance_id, 
const TUniqueId& query_
           _num_finished_scan_range(0),
           _normal_row_number(0),
           _error_row_number(0),
-          _error_log_file(nullptr),
           _query_ctx(ctx) {
     [[maybe_unused]] auto status = init(instance_id, query_options, 
query_globals, exec_env);
     DCHECK(status.ok());
@@ -153,7 +152,6 @@ 
RuntimeState::RuntimeState(pipeline::PipelineFragmentContext*, const TUniqueId&
           _num_finished_scan_range(0),
           _normal_row_number(0),
           _error_row_number(0),
-          _error_log_file(nullptr),
           _query_ctx(ctx) {
     [[maybe_unused]] auto status = init(instance_id, query_options, 
query_globals, exec_env);
     _query_mem_tracker = ctx->query_mem_tracker;
@@ -185,7 +183,6 @@ RuntimeState::RuntimeState(const TUniqueId& query_id, 
int32_t fragment_id,
           _num_finished_scan_range(0),
           _normal_row_number(0),
           _error_row_number(0),
-          _error_log_file(nullptr),
           _query_ctx(ctx) {
     // TODO: do we really need instance id?
     Status status = init(TUniqueId(), query_options, query_globals, exec_env);
@@ -255,22 +252,6 @@ RuntimeState::~RuntimeState() {
     // close error log file
     if (_error_log_file != nullptr && _error_log_file->is_open()) {
         _error_log_file->close();
-        delete _error_log_file;
-        _error_log_file = nullptr;
-        if (_s3_error_fs) {
-            std::string error_log_absolute_path =
-                    
_exec_env->load_path_mgr()->get_load_error_absolute_path(_error_log_file_path);
-            // upload error log file to s3
-            Status st = _s3_error_fs->upload(error_log_absolute_path, 
_s3_error_log_file_path);
-            if (st.ok()) {
-                // remove local error log file
-                std::filesystem::remove(error_log_absolute_path);
-            } else {
-                // remove local error log file later by 
clean_expired_temp_path thread
-                LOG(WARNING) << "Fail to upload error file to s3, 
error_log_file_path="
-                             << _error_log_file_path << ", error=" << st;
-            }
-        }
     }
 
     _obj_pool->clear();
@@ -394,7 +375,7 @@ Status RuntimeState::create_error_log_file() {
             _db_name, _import_label, _fragment_instance_id, 
&_error_log_file_path));
     std::string error_log_absolute_path =
             
_exec_env->load_path_mgr()->get_load_error_absolute_path(_error_log_file_path);
-    _error_log_file = new std::ofstream(error_log_absolute_path, 
std::ifstream::out);
+    _error_log_file = std::make_unique<std::ofstream>(error_log_absolute_path, 
std::ifstream::out);
     if (!_error_log_file->is_open()) {
         std::stringstream error_msg;
         error_msg << "Fail to open error file: [" << _error_log_file_path << 
"].";
@@ -420,8 +401,6 @@ Status 
RuntimeState::append_error_msg_to_file(std::function<std::string()> line,
             LOG(WARNING) << "Create error file log failed. because: " << 
status;
             if (_error_log_file != nullptr) {
                 _error_log_file->close();
-                delete _error_log_file;
-                _error_log_file = nullptr;
             }
             return status;
         }
@@ -464,13 +443,28 @@ Status 
RuntimeState::append_error_msg_to_file(std::function<std::string()> line,
     return Status::OK();
 }
 
-std::string RuntimeState::get_error_log_file_path() const {
-    if (_s3_error_fs) {
+std::string RuntimeState::get_error_log_file_path() {
+    if (_s3_error_fs && _error_log_file && _error_log_file->is_open()) {
+        // close error log file
+        _error_log_file->close();
+        std::string error_log_absolute_path =
+                
_exec_env->load_path_mgr()->get_load_error_absolute_path(_error_log_file_path);
+        // upload error log file to s3
+        Status st = _s3_error_fs->upload(error_log_absolute_path, 
_s3_error_log_file_path);
+        if (st.ok()) {
+            // remove local error log file
+            std::filesystem::remove(error_log_absolute_path);
+        } else {
+            // upload failed and return local error log file path
+            LOG(WARNING) << "Fail to upload error file to s3, 
error_log_file_path="
+                         << _error_log_file_path << ", error=" << st;
+            return _error_log_file_path;
+        }
         // expiration must be less than a week (in seconds) for presigned url
         static const unsigned EXPIRATION_SECONDS = 7 * 24 * 60 * 60 - 1;
         // We should return a public endpoint to user.
-        return _s3_error_fs->generate_presigned_url(_s3_error_log_file_path, 
EXPIRATION_SECONDS,
-                                                    true);
+        _error_log_file_path = 
_s3_error_fs->generate_presigned_url(_s3_error_log_file_path,
+                                                                    
EXPIRATION_SECONDS, true);
     }
     return _error_log_file_path;
 }
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 1bfd4348d18..ec812fffed8 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -263,7 +263,7 @@ public:
 
     int64_t load_job_id() const { return _load_job_id; }
 
-    std::string get_error_log_file_path() const;
+    std::string get_error_log_file_path();
 
     // append error msg and error line to file when loading data.
     // is_summary is true, means we are going to write the summary line
@@ -710,7 +710,7 @@ private:
     int64_t _normal_row_number;
     int64_t _error_row_number;
     std::string _error_log_file_path;
-    std::ofstream* _error_log_file = nullptr; // error file path, absolute path
+    std::unique_ptr<std::ofstream> _error_log_file; // error file path, 
absolute path
     std::vector<TTabletCommitInfo> _tablet_commit_infos;
     std::vector<TErrorTabletInfo> _error_tablet_infos;
     int _max_operator_id = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to