This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ae7caba54d2 [fix](group commit) remove non pipeline code (#37018)
ae7caba54d2 is described below

commit ae7caba54d2a9e30a0c6e7e157c04e1d7f0d81f6
Author: meiyi <myime...@gmail.com>
AuthorDate: Tue Jul 2 10:29:02 2024 +0800

    [fix](group commit) remove non pipeline code (#37018)
    
    remove non pipilene code in group_commit_mgr
---
 be/src/runtime/group_commit_mgr.cpp | 67 +++++++++++--------------------------
 be/src/runtime/group_commit_mgr.h   |  4 +--
 2 files changed, 21 insertions(+), 50 deletions(-)

diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 111780c9a42..8a81c942fd3 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -266,14 +266,12 @@ Status GroupCommitTable::get_first_block_load_queue(
                 if (base_schema_version == inner_block_queue->schema_version) {
                     if (inner_block_queue->add_load_id(load_id, 
put_block_dep).ok()) {
                         load_block_queue = inner_block_queue;
-
                         return Status::OK();
                     }
                 } else {
                     return Status::DataQualityError<false>(
                             "schema version not match, maybe a schema change 
is in process. "
-                            "Please "
-                            "retry this load manually.");
+                            "Please retry this load manually.");
                 }
             }
         }
@@ -284,13 +282,13 @@ Status GroupCommitTable::get_first_block_load_queue(
     if (try_to_get_matched_queue().ok()) {
         return Status::OK();
     }
+    create_plan_dep->block();
+    _create_plan_deps.push_back(create_plan_dep);
     if (!_is_creating_plan_fragment) {
         _is_creating_plan_fragment = true;
-        create_plan_dep->block();
         RETURN_IF_ERROR(_thread_pool->submit_func([&, be_exe_version, 
mem_tracker,
                                                    dep = create_plan_dep] {
             Defer defer {[&, dep = dep]() {
-                dep->set_ready();
                 std::unique_lock l(_lock);
                 for (auto it : _create_plan_deps) {
                     it->set_ready();
@@ -303,9 +301,6 @@ Status GroupCommitTable::get_first_block_load_queue(
                 LOG(WARNING) << "create group commit load error, st=" << 
st.to_string();
             }
         }));
-    } else {
-        create_plan_dep->block();
-        _create_plan_deps.push_back(create_plan_dep);
     }
     return try_to_get_matched_queue();
 }
@@ -313,16 +308,14 @@ Status GroupCommitTable::get_first_block_load_queue(
 Status GroupCommitTable::_create_group_commit_load(int be_exe_version,
                                                    
std::shared_ptr<MemTrackerLimiter> mem_tracker) {
     Status st = Status::OK();
-    TStreamLoadPutRequest request;
-    UniqueId load_id = UniqueId::gen_uid();
-    TUniqueId tload_id;
-    bool is_pipeline = true;
     TStreamLoadPutResult result;
     std::string label;
     int64_t txn_id;
     TUniqueId instance_id;
     {
         SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker);
+        UniqueId load_id = UniqueId::gen_uid();
+        TUniqueId tload_id;
         tload_id.__set_hi(load_id.hi);
         tload_id.__set_lo(load_id.lo);
         std::regex reg("-");
@@ -330,6 +323,7 @@ Status GroupCommitTable::_create_group_commit_load(int 
be_exe_version,
         std::stringstream ss;
         ss << "insert into doris_internal_table_id(" << _table_id << ") WITH 
LABEL " << label
            << " select * from group_commit(\"table_id\"=\"" << _table_id << 
"\")";
+        TStreamLoadPutRequest request;
         request.__set_load_sql(ss.str());
         request.__set_loadId(tload_id);
         request.__set_label(label);
@@ -355,51 +349,36 @@ Status GroupCommitTable::_create_group_commit_load(int 
be_exe_version,
             return st;
         }
         st = Status::create<false>(result.status);
+        if (st.ok() && !result.__isset.pipeline_params) {
+            st = Status::InternalError("Non-pipeline is disabled!");
+        }
         if (!st.ok()) {
             LOG(WARNING) << "create group commit load error, st=" << 
st.to_string();
             return st;
         }
         auto schema_version = result.base_schema_version;
-        is_pipeline = result.__isset.pipeline_params;
-        auto& params = result.params;
         auto& pipeline_params = result.pipeline_params;
-        if (!is_pipeline) {
-            DCHECK(params.fragment.output_sink.olap_table_sink.db_id == 
_db_id);
-            txn_id = params.txn_conf.txn_id;
-            instance_id = params.params.fragment_instance_id;
-        } else {
-            DCHECK(pipeline_params.fragment.output_sink.olap_table_sink.db_id 
== _db_id);
-            txn_id = pipeline_params.txn_conf.txn_id;
-            DCHECK(pipeline_params.local_params.size() == 1);
-            instance_id = pipeline_params.local_params[0].fragment_instance_id;
-        }
+        DCHECK(pipeline_params.fragment.output_sink.olap_table_sink.db_id == 
_db_id);
+        txn_id = pipeline_params.txn_conf.txn_id;
+        DCHECK(pipeline_params.local_params.size() == 1);
+        instance_id = pipeline_params.local_params[0].fragment_instance_id;
         VLOG_DEBUG << "create plan fragment, db_id=" << _db_id << ", table=" 
<< _table_id
                    << ", schema version=" << schema_version << ", label=" << 
label
-                   << ", txn_id=" << txn_id << ", instance_id=" << 
print_id(instance_id)
-                   << ", is_pipeline=" << is_pipeline;
+                   << ", txn_id=" << txn_id << ", instance_id=" << 
print_id(instance_id);
         {
             auto load_block_queue = std::make_shared<LoadBlockQueue>(
                     instance_id, label, txn_id, schema_version, 
_all_block_queues_bytes,
                     result.wait_internal_group_commit_finish, 
result.group_commit_interval_ms,
                     result.group_commit_data_bytes);
             std::unique_lock l(_lock);
-            //create wal
-            if (!is_pipeline) {
-                RETURN_IF_ERROR(load_block_queue->create_wal(
-                        _db_id, _table_id, txn_id, label, _exec_env->wal_mgr(),
-                        
params.fragment.output_sink.olap_table_sink.schema.slot_descs,
-                        be_exe_version));
-            } else {
-                RETURN_IF_ERROR(load_block_queue->create_wal(
-                        _db_id, _table_id, txn_id, label, _exec_env->wal_mgr(),
-                        
pipeline_params.fragment.output_sink.olap_table_sink.schema.slot_descs,
-                        be_exe_version));
-            }
+            RETURN_IF_ERROR(load_block_queue->create_wal(
+                    _db_id, _table_id, txn_id, label, _exec_env->wal_mgr(),
+                    
pipeline_params.fragment.output_sink.olap_table_sink.schema.slot_descs,
+                    be_exe_version));
             _load_block_queues.emplace(instance_id, load_block_queue);
         }
     }
-    st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, 
result.params,
-                             result.pipeline_params);
+    st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, 
result.pipeline_params);
     if (!st.ok()) {
         SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker);
         auto finish_st = _finish_group_commit_load(_db_id, _table_id, label, 
txn_id, instance_id,
@@ -533,8 +512,6 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t 
db_id, int64_t table_
 
 Status GroupCommitTable::_exec_plan_fragment(int64_t db_id, int64_t table_id,
                                              const std::string& label, int64_t 
txn_id,
-                                             bool is_pipeline,
-                                             const TExecPlanFragmentParams& 
params,
                                              const TPipelineFragmentParams& 
pipeline_params) {
     auto finish_cb = [db_id, table_id, label, txn_id, this](RuntimeState* 
state, Status* status) {
         DCHECK(state);
@@ -545,11 +522,7 @@ Status GroupCommitTable::_exec_plan_fragment(int64_t 
db_id, int64_t table_id,
                          << ", st=" << finish_st.to_string();
         }
     };
-    if (is_pipeline) {
-        return _exec_env->fragment_mgr()->exec_plan_fragment(pipeline_params, 
finish_cb);
-    } else {
-        return _exec_env->fragment_mgr()->exec_plan_fragment(params, 
finish_cb);
-    }
+    return _exec_env->fragment_mgr()->exec_plan_fragment(pipeline_params, 
finish_cb);
 }
 
 Status GroupCommitTable::get_load_block_queue(const TUniqueId& instance_id,
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index c668197e8dc..36c51746ef4 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -168,9 +168,7 @@ private:
     Status _create_group_commit_load(int be_exe_version,
                                      std::shared_ptr<MemTrackerLimiter> 
mem_tracker);
     Status _exec_plan_fragment(int64_t db_id, int64_t table_id, const 
std::string& label,
-                               int64_t txn_id, bool is_pipeline,
-                               const TExecPlanFragmentParams& params,
-                               const TPipelineFragmentParams& pipeline_params);
+                               int64_t txn_id, const TPipelineFragmentParams& 
pipeline_params);
     Status _finish_group_commit_load(int64_t db_id, int64_t table_id, const 
std::string& label,
                                      int64_t txn_id, const TUniqueId& 
instance_id, Status& status,
                                      RuntimeState* state);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to