This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 40f159fb81c [fix](group commit) group commit may heap-use-after-free if execute plan failed (#31839) 40f159fb81c is described below commit 40f159fb81ccbb316b080e5b2409c3e6564c0761 Author: meiyi <myime...@gmail.com> AuthorDate: Wed Mar 6 14:14:35 2024 +0800 [fix](group commit) group commit may heap-use-after-free if execute plan failed (#31839) --- be/src/runtime/group_commit_mgr.cpp | 20 +++----------------- be/src/runtime/group_commit_mgr.h | 3 +-- 2 files changed, 4 insertions(+), 19 deletions(-) diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 887dd856bb4..cadff231761 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -212,10 +212,9 @@ Status GroupCommitTable::get_first_block_load_queue( if (!_is_creating_plan_fragment) { _is_creating_plan_fragment = true; RETURN_IF_ERROR(_thread_pool->submit_func([&] { - auto st = _create_group_commit_load(load_block_queue, be_exe_version); + auto st = _create_group_commit_load(be_exe_version); if (!st.ok()) { LOG(WARNING) << "create group commit load error, st=" << st.to_string(); - load_block_queue.reset(); std::unique_lock l(_lock); _is_creating_plan_fragment = false; _cv.notify_all(); @@ -223,26 +222,13 @@ Status GroupCommitTable::get_first_block_load_queue( })); } _cv.wait_for(l, std::chrono::seconds(4)); - if (load_block_queue != nullptr) { - if (load_block_queue->schema_version == base_schema_version) { - if (load_block_queue->add_load_id(load_id).ok()) { - return Status::OK(); - } - } else if (base_schema_version < load_block_queue->schema_version) { - return Status::DataQualityError<false>( - "schema version not match, maybe a schema change is in process. Please " - "retry this load manually."); - } - load_block_queue.reset(); - } } } return Status::InternalError<false>("can not get a block queue for table_id: " + std::to_string(_table_id)); } -Status GroupCommitTable::_create_group_commit_load( - std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version) { +Status GroupCommitTable::_create_group_commit_load(int be_exe_version) { Status st = Status::OK(); TStreamLoadPutRequest request; UniqueId load_id = UniqueId::gen_uid(); @@ -305,7 +291,7 @@ Status GroupCommitTable::_create_group_commit_load( << ", txn_id=" << txn_id << ", instance_id=" << print_id(instance_id) << ", is_pipeline=" << is_pipeline; { - load_block_queue = std::make_shared<LoadBlockQueue>( + auto load_block_queue = std::make_shared<LoadBlockQueue>( instance_id, label, txn_id, schema_version, _all_block_queues_bytes, result.wait_internal_group_commit_finish, result.group_commit_interval_ms, result.group_commit_data_bytes); diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index dc135ea1063..f77012bc2d1 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -134,8 +134,7 @@ public: std::shared_ptr<LoadBlockQueue>& load_block_queue); private: - Status _create_group_commit_load(std::shared_ptr<LoadBlockQueue>& load_block_queue, - int be_exe_version); + Status _create_group_commit_load(int be_exe_version); Status _exec_plan_fragment(int64_t db_id, int64_t table_id, const std::string& label, int64_t txn_id, bool is_pipeline, const TExecPlanFragmentParams& params, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org