This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 40f159fb81c [fix](group commit) group commit may heap-use-after-free 
if execute plan failed (#31839)
40f159fb81c is described below

commit 40f159fb81ccbb316b080e5b2409c3e6564c0761
Author: meiyi <myime...@gmail.com>
AuthorDate: Wed Mar 6 14:14:35 2024 +0800

    [fix](group commit) group commit may heap-use-after-free if execute plan 
failed (#31839)
---
 be/src/runtime/group_commit_mgr.cpp | 20 +++-----------------
 be/src/runtime/group_commit_mgr.h   |  3 +--
 2 files changed, 4 insertions(+), 19 deletions(-)

diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 887dd856bb4..cadff231761 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -212,10 +212,9 @@ Status GroupCommitTable::get_first_block_load_queue(
             if (!_is_creating_plan_fragment) {
                 _is_creating_plan_fragment = true;
                 RETURN_IF_ERROR(_thread_pool->submit_func([&] {
-                    auto st = _create_group_commit_load(load_block_queue, 
be_exe_version);
+                    auto st = _create_group_commit_load(be_exe_version);
                     if (!st.ok()) {
                         LOG(WARNING) << "create group commit load error, st=" 
<< st.to_string();
-                        load_block_queue.reset();
                         std::unique_lock l(_lock);
                         _is_creating_plan_fragment = false;
                         _cv.notify_all();
@@ -223,26 +222,13 @@ Status GroupCommitTable::get_first_block_load_queue(
                 }));
             }
             _cv.wait_for(l, std::chrono::seconds(4));
-            if (load_block_queue != nullptr) {
-                if (load_block_queue->schema_version == base_schema_version) {
-                    if (load_block_queue->add_load_id(load_id).ok()) {
-                        return Status::OK();
-                    }
-                } else if (base_schema_version < 
load_block_queue->schema_version) {
-                    return Status::DataQualityError<false>(
-                            "schema version not match, maybe a schema change 
is in process. Please "
-                            "retry this load manually.");
-                }
-                load_block_queue.reset();
-            }
         }
     }
     return Status::InternalError<false>("can not get a block queue for 
table_id: " +
                                         std::to_string(_table_id));
 }
 
-Status GroupCommitTable::_create_group_commit_load(
-        std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version) 
{
+Status GroupCommitTable::_create_group_commit_load(int be_exe_version) {
     Status st = Status::OK();
     TStreamLoadPutRequest request;
     UniqueId load_id = UniqueId::gen_uid();
@@ -305,7 +291,7 @@ Status GroupCommitTable::_create_group_commit_load(
                << ", txn_id=" << txn_id << ", instance_id=" << 
print_id(instance_id)
                << ", is_pipeline=" << is_pipeline;
     {
-        load_block_queue = std::make_shared<LoadBlockQueue>(
+        auto load_block_queue = std::make_shared<LoadBlockQueue>(
                 instance_id, label, txn_id, schema_version, 
_all_block_queues_bytes,
                 result.wait_internal_group_commit_finish, 
result.group_commit_interval_ms,
                 result.group_commit_data_bytes);
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index dc135ea1063..f77012bc2d1 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -134,8 +134,7 @@ public:
                                 std::shared_ptr<LoadBlockQueue>& 
load_block_queue);
 
 private:
-    Status _create_group_commit_load(std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
-                                     int be_exe_version);
+    Status _create_group_commit_load(int be_exe_version);
     Status _exec_plan_fragment(int64_t db_id, int64_t table_id, const 
std::string& label,
                                int64_t txn_id, bool is_pipeline,
                                const TExecPlanFragmentParams& params,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to