This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ae7caba54d2 [fix](group commit) remove non pipeline code (#37018) ae7caba54d2 is described below commit ae7caba54d2a9e30a0c6e7e157c04e1d7f0d81f6 Author: meiyi <myime...@gmail.com> AuthorDate: Tue Jul 2 10:29:02 2024 +0800 [fix](group commit) remove non pipeline code (#37018) remove non pipilene code in group_commit_mgr --- be/src/runtime/group_commit_mgr.cpp | 67 +++++++++++-------------------------- be/src/runtime/group_commit_mgr.h | 4 +-- 2 files changed, 21 insertions(+), 50 deletions(-) diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 111780c9a42..8a81c942fd3 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -266,14 +266,12 @@ Status GroupCommitTable::get_first_block_load_queue( if (base_schema_version == inner_block_queue->schema_version) { if (inner_block_queue->add_load_id(load_id, put_block_dep).ok()) { load_block_queue = inner_block_queue; - return Status::OK(); } } else { return Status::DataQualityError<false>( "schema version not match, maybe a schema change is in process. " - "Please " - "retry this load manually."); + "Please retry this load manually."); } } } @@ -284,13 +282,13 @@ Status GroupCommitTable::get_first_block_load_queue( if (try_to_get_matched_queue().ok()) { return Status::OK(); } + create_plan_dep->block(); + _create_plan_deps.push_back(create_plan_dep); if (!_is_creating_plan_fragment) { _is_creating_plan_fragment = true; - create_plan_dep->block(); RETURN_IF_ERROR(_thread_pool->submit_func([&, be_exe_version, mem_tracker, dep = create_plan_dep] { Defer defer {[&, dep = dep]() { - dep->set_ready(); std::unique_lock l(_lock); for (auto it : _create_plan_deps) { it->set_ready(); @@ -303,9 +301,6 @@ Status GroupCommitTable::get_first_block_load_queue( LOG(WARNING) << "create group commit load error, st=" << st.to_string(); } })); - } else { - create_plan_dep->block(); - _create_plan_deps.push_back(create_plan_dep); } return try_to_get_matched_queue(); } @@ -313,16 +308,14 @@ Status GroupCommitTable::get_first_block_load_queue( Status GroupCommitTable::_create_group_commit_load(int be_exe_version, std::shared_ptr<MemTrackerLimiter> mem_tracker) { Status st = Status::OK(); - TStreamLoadPutRequest request; - UniqueId load_id = UniqueId::gen_uid(); - TUniqueId tload_id; - bool is_pipeline = true; TStreamLoadPutResult result; std::string label; int64_t txn_id; TUniqueId instance_id; { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker); + UniqueId load_id = UniqueId::gen_uid(); + TUniqueId tload_id; tload_id.__set_hi(load_id.hi); tload_id.__set_lo(load_id.lo); std::regex reg("-"); @@ -330,6 +323,7 @@ Status GroupCommitTable::_create_group_commit_load(int be_exe_version, std::stringstream ss; ss << "insert into doris_internal_table_id(" << _table_id << ") WITH LABEL " << label << " select * from group_commit(\"table_id\"=\"" << _table_id << "\")"; + TStreamLoadPutRequest request; request.__set_load_sql(ss.str()); request.__set_loadId(tload_id); request.__set_label(label); @@ -355,51 +349,36 @@ Status GroupCommitTable::_create_group_commit_load(int be_exe_version, return st; } st = Status::create<false>(result.status); + if (st.ok() && !result.__isset.pipeline_params) { + st = Status::InternalError("Non-pipeline is disabled!"); + } if (!st.ok()) { LOG(WARNING) << "create group commit load error, st=" << st.to_string(); return st; } auto schema_version = result.base_schema_version; - is_pipeline = result.__isset.pipeline_params; - auto& params = result.params; auto& pipeline_params = result.pipeline_params; - if (!is_pipeline) { - DCHECK(params.fragment.output_sink.olap_table_sink.db_id == _db_id); - txn_id = params.txn_conf.txn_id; - instance_id = params.params.fragment_instance_id; - } else { - DCHECK(pipeline_params.fragment.output_sink.olap_table_sink.db_id == _db_id); - txn_id = pipeline_params.txn_conf.txn_id; - DCHECK(pipeline_params.local_params.size() == 1); - instance_id = pipeline_params.local_params[0].fragment_instance_id; - } + DCHECK(pipeline_params.fragment.output_sink.olap_table_sink.db_id == _db_id); + txn_id = pipeline_params.txn_conf.txn_id; + DCHECK(pipeline_params.local_params.size() == 1); + instance_id = pipeline_params.local_params[0].fragment_instance_id; VLOG_DEBUG << "create plan fragment, db_id=" << _db_id << ", table=" << _table_id << ", schema version=" << schema_version << ", label=" << label - << ", txn_id=" << txn_id << ", instance_id=" << print_id(instance_id) - << ", is_pipeline=" << is_pipeline; + << ", txn_id=" << txn_id << ", instance_id=" << print_id(instance_id); { auto load_block_queue = std::make_shared<LoadBlockQueue>( instance_id, label, txn_id, schema_version, _all_block_queues_bytes, result.wait_internal_group_commit_finish, result.group_commit_interval_ms, result.group_commit_data_bytes); std::unique_lock l(_lock); - //create wal - if (!is_pipeline) { - RETURN_IF_ERROR(load_block_queue->create_wal( - _db_id, _table_id, txn_id, label, _exec_env->wal_mgr(), - params.fragment.output_sink.olap_table_sink.schema.slot_descs, - be_exe_version)); - } else { - RETURN_IF_ERROR(load_block_queue->create_wal( - _db_id, _table_id, txn_id, label, _exec_env->wal_mgr(), - pipeline_params.fragment.output_sink.olap_table_sink.schema.slot_descs, - be_exe_version)); - } + RETURN_IF_ERROR(load_block_queue->create_wal( + _db_id, _table_id, txn_id, label, _exec_env->wal_mgr(), + pipeline_params.fragment.output_sink.olap_table_sink.schema.slot_descs, + be_exe_version)); _load_block_queues.emplace(instance_id, load_block_queue); } } - st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, result.params, - result.pipeline_params); + st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, result.pipeline_params); if (!st.ok()) { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker); auto finish_st = _finish_group_commit_load(_db_id, _table_id, label, txn_id, instance_id, @@ -533,8 +512,6 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ Status GroupCommitTable::_exec_plan_fragment(int64_t db_id, int64_t table_id, const std::string& label, int64_t txn_id, - bool is_pipeline, - const TExecPlanFragmentParams& params, const TPipelineFragmentParams& pipeline_params) { auto finish_cb = [db_id, table_id, label, txn_id, this](RuntimeState* state, Status* status) { DCHECK(state); @@ -545,11 +522,7 @@ Status GroupCommitTable::_exec_plan_fragment(int64_t db_id, int64_t table_id, << ", st=" << finish_st.to_string(); } }; - if (is_pipeline) { - return _exec_env->fragment_mgr()->exec_plan_fragment(pipeline_params, finish_cb); - } else { - return _exec_env->fragment_mgr()->exec_plan_fragment(params, finish_cb); - } + return _exec_env->fragment_mgr()->exec_plan_fragment(pipeline_params, finish_cb); } Status GroupCommitTable::get_load_block_queue(const TUniqueId& instance_id, diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index c668197e8dc..36c51746ef4 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -168,9 +168,7 @@ private: Status _create_group_commit_load(int be_exe_version, std::shared_ptr<MemTrackerLimiter> mem_tracker); Status _exec_plan_fragment(int64_t db_id, int64_t table_id, const std::string& label, - int64_t txn_id, bool is_pipeline, - const TExecPlanFragmentParams& params, - const TPipelineFragmentParams& pipeline_params); + int64_t txn_id, const TPipelineFragmentParams& pipeline_params); Status _finish_group_commit_load(int64_t db_id, int64_t table_id, const std::string& label, int64_t txn_id, const TUniqueId& instance_id, Status& status, RuntimeState* state); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org