This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new c5e18ef5520 branch-3.0: [fix](multi-catalog) Fix multi-thread issue in hive/iceberg writer commit meta-info to fe. #49842 (#49863) c5e18ef5520 is described below commit c5e18ef5520b910402ab3b2b5ee3e38ecc6dbc74 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Fri Apr 11 10:44:40 2025 +0800 branch-3.0: [fix](multi-catalog) Fix multi-thread issue in hive/iceberg writer commit meta-info to fe. #49842 (#49863) Cherry-picked from #49842 Co-authored-by: Qi Chen <che...@selectdb.com> --- be/src/runtime/fragment_mgr.cpp | 28 ++++++++-------------- be/src/runtime/runtime_state.h | 22 +++++++++++++++-- .../writer/iceberg/viceberg_partition_writer.cpp | 3 ++- be/src/vec/sink/writer/vhive_partition_writer.cpp | 3 ++- 4 files changed, 34 insertions(+), 22 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 97fef234331..3f51331346d 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -501,37 +501,29 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { } } } - - if (!req.runtime_state->hive_partition_updates().empty()) { + if (auto hpu = req.runtime_state->hive_partition_updates(); !hpu.empty()) { params.__isset.hive_partition_updates = true; - params.hive_partition_updates.reserve(req.runtime_state->hive_partition_updates().size()); - for (auto& hive_partition_update : req.runtime_state->hive_partition_updates()) { - params.hive_partition_updates.push_back(hive_partition_update); - } + params.hive_partition_updates.insert(params.hive_partition_updates.end(), hpu.begin(), + hpu.end()); } else if (!req.runtime_states.empty()) { for (auto* rs : req.runtime_states) { - if (!rs->hive_partition_updates().empty()) { + if (auto rs_hpu = rs->hive_partition_updates(); !rs_hpu.empty()) { params.__isset.hive_partition_updates = true; params.hive_partition_updates.insert(params.hive_partition_updates.end(), - rs->hive_partition_updates().begin(), - rs->hive_partition_updates().end()); + rs_hpu.begin(), rs_hpu.end()); } } } - - if (!req.runtime_state->iceberg_commit_datas().empty()) { + if (auto icd = req.runtime_state->iceberg_commit_datas(); !icd.empty()) { params.__isset.iceberg_commit_datas = true; - params.iceberg_commit_datas.reserve(req.runtime_state->iceberg_commit_datas().size()); - for (auto& iceberg_commit_data : req.runtime_state->iceberg_commit_datas()) { - params.iceberg_commit_datas.push_back(iceberg_commit_data); - } + params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), icd.begin(), + icd.end()); } else if (!req.runtime_states.empty()) { for (auto* rs : req.runtime_states) { - if (!rs->iceberg_commit_datas().empty()) { + if (auto rs_icd = rs->iceberg_commit_datas(); !rs_icd.empty()) { params.__isset.iceberg_commit_datas = true; params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), - rs->iceberg_commit_datas().begin(), - rs->iceberg_commit_datas().end()); + rs_icd.begin(), rs_icd.end()); } } } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 3d239e64911..14320be48e8 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -457,9 +457,25 @@ public: std::make_move_iterator(tablet_infos.end())); } - std::vector<THivePartitionUpdate>& hive_partition_updates() { return _hive_partition_updates; } + std::vector<THivePartitionUpdate> hive_partition_updates() const { + std::lock_guard<std::mutex> lock(_hive_partition_updates_mutex); + return _hive_partition_updates; + } + + void add_hive_partition_updates(const THivePartitionUpdate& hive_partition_update) { + std::lock_guard<std::mutex> lock(_hive_partition_updates_mutex); + _hive_partition_updates.emplace_back(hive_partition_update); + } - std::vector<TIcebergCommitData>& iceberg_commit_datas() { return _iceberg_commit_datas; } + std::vector<TIcebergCommitData> iceberg_commit_datas() const { + std::lock_guard<std::mutex> lock(_iceberg_commit_datas_mutex); + return _iceberg_commit_datas; + } + + void add_iceberg_commit_datas(const TIcebergCommitData& iceberg_commit_data) { + std::lock_guard<std::mutex> lock(_iceberg_commit_datas_mutex); + _iceberg_commit_datas.emplace_back(iceberg_commit_data); + } // local runtime filter mgr, the runtime filter do not have remote target or // not need local merge should regist here. the instance exec finish, the local @@ -734,8 +750,10 @@ private: int _task_id = -1; int _task_num = 0; + mutable std::mutex _hive_partition_updates_mutex; std::vector<THivePartitionUpdate> _hive_partition_updates; + mutable std::mutex _iceberg_commit_datas_mutex; std::vector<TIcebergCommitData> _iceberg_commit_datas; std::vector<std::unique_ptr<doris::pipeline::PipelineXLocalStateBase>> _op_id_to_local_state; diff --git a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp index 23ee389a8b7..8963a129eee 100644 --- a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp +++ b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp @@ -118,7 +118,8 @@ Status VIcebergPartitionWriter::close(const Status& status) { } } if (status_ok) { - _state->iceberg_commit_datas().emplace_back(_build_iceberg_commit_data()); + auto commit_data = _build_iceberg_commit_data(); + _state->add_iceberg_commit_datas(commit_data); } return result_status; } diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp b/be/src/vec/sink/writer/vhive_partition_writer.cpp index 386ca0ea90c..9f8f1041a01 100644 --- a/be/src/vec/sink/writer/vhive_partition_writer.cpp +++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp @@ -145,7 +145,8 @@ Status VHivePartitionWriter::close(const Status& status) { } } if (status_ok) { - _state->hive_partition_updates().emplace_back(_build_partition_update()); + auto partition_update = _build_partition_update(); + _state->add_hive_partition_updates(partition_update); } return result_status; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org