This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new c5e18ef5520 branch-3.0: [fix](multi-catalog) Fix multi-thread issue in 
hive/iceberg writer commit meta-info to fe. #49842 (#49863)
c5e18ef5520 is described below

commit c5e18ef5520b910402ab3b2b5ee3e38ecc6dbc74
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Apr 11 10:44:40 2025 +0800

    branch-3.0: [fix](multi-catalog) Fix multi-thread issue in hive/iceberg 
writer commit meta-info to fe. #49842 (#49863)
    
    Cherry-picked from #49842
    
    Co-authored-by: Qi Chen <che...@selectdb.com>
---
 be/src/runtime/fragment_mgr.cpp                    | 28 ++++++++--------------
 be/src/runtime/runtime_state.h                     | 22 +++++++++++++++--
 .../writer/iceberg/viceberg_partition_writer.cpp   |  3 ++-
 be/src/vec/sink/writer/vhive_partition_writer.cpp  |  3 ++-
 4 files changed, 34 insertions(+), 22 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 97fef234331..3f51331346d 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -501,37 +501,29 @@ void FragmentMgr::coordinator_callback(const 
ReportStatusRequest& req) {
             }
         }
     }
-
-    if (!req.runtime_state->hive_partition_updates().empty()) {
+    if (auto hpu = req.runtime_state->hive_partition_updates(); !hpu.empty()) {
         params.__isset.hive_partition_updates = true;
-        
params.hive_partition_updates.reserve(req.runtime_state->hive_partition_updates().size());
-        for (auto& hive_partition_update : 
req.runtime_state->hive_partition_updates()) {
-            params.hive_partition_updates.push_back(hive_partition_update);
-        }
+        
params.hive_partition_updates.insert(params.hive_partition_updates.end(), 
hpu.begin(),
+                                             hpu.end());
     } else if (!req.runtime_states.empty()) {
         for (auto* rs : req.runtime_states) {
-            if (!rs->hive_partition_updates().empty()) {
+            if (auto rs_hpu = rs->hive_partition_updates(); !rs_hpu.empty()) {
                 params.__isset.hive_partition_updates = true;
                 
params.hive_partition_updates.insert(params.hive_partition_updates.end(),
-                                                     
rs->hive_partition_updates().begin(),
-                                                     
rs->hive_partition_updates().end());
+                                                     rs_hpu.begin(), 
rs_hpu.end());
             }
         }
     }
-
-    if (!req.runtime_state->iceberg_commit_datas().empty()) {
+    if (auto icd = req.runtime_state->iceberg_commit_datas(); !icd.empty()) {
         params.__isset.iceberg_commit_datas = true;
-        
params.iceberg_commit_datas.reserve(req.runtime_state->iceberg_commit_datas().size());
-        for (auto& iceberg_commit_data : 
req.runtime_state->iceberg_commit_datas()) {
-            params.iceberg_commit_datas.push_back(iceberg_commit_data);
-        }
+        params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), 
icd.begin(),
+                                           icd.end());
     } else if (!req.runtime_states.empty()) {
         for (auto* rs : req.runtime_states) {
-            if (!rs->iceberg_commit_datas().empty()) {
+            if (auto rs_icd = rs->iceberg_commit_datas(); !rs_icd.empty()) {
                 params.__isset.iceberg_commit_datas = true;
                 
params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
-                                                   
rs->iceberg_commit_datas().begin(),
-                                                   
rs->iceberg_commit_datas().end());
+                                                   rs_icd.begin(), 
rs_icd.end());
             }
         }
     }
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 3d239e64911..14320be48e8 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -457,9 +457,25 @@ public:
                                    
std::make_move_iterator(tablet_infos.end()));
     }
 
-    std::vector<THivePartitionUpdate>& hive_partition_updates() { return 
_hive_partition_updates; }
+    std::vector<THivePartitionUpdate> hive_partition_updates() const {
+        std::lock_guard<std::mutex> lock(_hive_partition_updates_mutex);
+        return _hive_partition_updates;
+    }
+
+    void add_hive_partition_updates(const THivePartitionUpdate& 
hive_partition_update) {
+        std::lock_guard<std::mutex> lock(_hive_partition_updates_mutex);
+        _hive_partition_updates.emplace_back(hive_partition_update);
+    }
 
-    std::vector<TIcebergCommitData>& iceberg_commit_datas() { return 
_iceberg_commit_datas; }
+    std::vector<TIcebergCommitData> iceberg_commit_datas() const {
+        std::lock_guard<std::mutex> lock(_iceberg_commit_datas_mutex);
+        return _iceberg_commit_datas;
+    }
+
+    void add_iceberg_commit_datas(const TIcebergCommitData& 
iceberg_commit_data) {
+        std::lock_guard<std::mutex> lock(_iceberg_commit_datas_mutex);
+        _iceberg_commit_datas.emplace_back(iceberg_commit_data);
+    }
 
     // local runtime filter mgr, the runtime filter do not have remote target 
or
     // not need local merge should regist here. the instance exec finish, the 
local
@@ -734,8 +750,10 @@ private:
     int _task_id = -1;
     int _task_num = 0;
 
+    mutable std::mutex _hive_partition_updates_mutex;
     std::vector<THivePartitionUpdate> _hive_partition_updates;
 
+    mutable std::mutex _iceberg_commit_datas_mutex;
     std::vector<TIcebergCommitData> _iceberg_commit_datas;
 
     std::vector<std::unique_ptr<doris::pipeline::PipelineXLocalStateBase>> 
_op_id_to_local_state;
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp 
b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
index 23ee389a8b7..8963a129eee 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
@@ -118,7 +118,8 @@ Status VIcebergPartitionWriter::close(const Status& status) 
{
         }
     }
     if (status_ok) {
-        
_state->iceberg_commit_datas().emplace_back(_build_iceberg_commit_data());
+        auto commit_data = _build_iceberg_commit_data();
+        _state->add_iceberg_commit_datas(commit_data);
     }
     return result_status;
 }
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp 
b/be/src/vec/sink/writer/vhive_partition_writer.cpp
index 386ca0ea90c..9f8f1041a01 100644
--- a/be/src/vec/sink/writer/vhive_partition_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp
@@ -145,7 +145,8 @@ Status VHivePartitionWriter::close(const Status& status) {
         }
     }
     if (status_ok) {
-        
_state->hive_partition_updates().emplace_back(_build_partition_update());
+        auto partition_update = _build_partition_update();
+        _state->add_hive_partition_updates(partition_update);
     }
     return result_status;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to