This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 80e863cbcf6 [refactor](pipeline) Delete unnecessary code (#35415) 80e863cbcf6 is described below commit 80e863cbcf6ed898157bb186c442c5a1e22498ac Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Mon May 27 15:03:48 2024 +0800 [refactor](pipeline) Delete unnecessary code (#35415) --- be/src/pipeline/pipeline_fragment_context.cpp | 22 ---------------------- be/src/pipeline/pipeline_fragment_context.h | 10 ---------- be/src/pipeline/pipeline_task.cpp | 4 ---- be/src/pipeline/pipeline_task.h | 2 -- be/src/runtime/fragment_mgr.cpp | 5 +---- 5 files changed, 1 insertion(+), 42 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index fd4c903d5aa..89588e39471 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1500,28 +1500,6 @@ Status PipelineFragmentContext::submit() { } } -void PipelineFragmentContext::close_sink() { - for (auto& tasks : _tasks) { - auto& root_task = *tasks.begin(); - auto st = root_task->close_sink(_prepared ? Status::RuntimeError("prepare failed") - : Status::OK()); - if (!st.ok()) { - LOG_WARNING("PipelineFragmentContext::close_sink() error").tag("msg", st.msg()); - } - } -} - -void PipelineFragmentContext::close_if_prepare_failed(Status st) { - for (auto& task : _tasks) { - for (auto& t : task) { - DCHECK(!t->is_pending_finish()); - WARN_IF_ERROR(t->close(st), "close_if_prepare_failed failed: "); - close_a_pipeline(); - } - } - _query_ctx->cancel(st, _fragment_id); -} - // If all pipeline tasks binded to the fragment instance are finished, then we could // close the fragment instance. void PipelineFragmentContext::_close_fragment_instance() { diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 03f405e0401..8bc1eb29139 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -92,9 +92,6 @@ public: Status submit(); - void close_if_prepare_failed(Status st); - void close_sink(); - void set_is_report_success(bool is_report_success) { _is_report_success = is_report_success; } void cancel(const Status reason); @@ -120,8 +117,6 @@ public: [[nodiscard]] int next_sink_operator_id() { return _sink_operator_id--; } - [[nodiscard]] int max_sink_operator_id() const { return _sink_operator_id; } - void instance_ids(std::vector<TUniqueId>& ins_ids) const { ins_ids.resize(_fragment_instance_ids.size()); for (size_t i = 0; i < _fragment_instance_ids.size(); i++) { @@ -136,11 +131,6 @@ public: } } - void add_merge_controller_handler( - std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) { - _merge_controller_handlers.emplace_back(handler); - } - private: Status _build_pipelines(ObjectPool* pool, const doris::TPipelineFragmentParams& request, const DescriptorTbl& descs, OperatorXPtr* root, PipelinePtr cur_pipe); diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 781566157f0..867dc49dc33 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -449,10 +449,6 @@ Status PipelineTask::close(Status exec_status) { return s; } -Status PipelineTask::close_sink(Status exec_status) { - return _sink->close(_state, exec_status); -} - std::string PipelineTask::debug_string() { std::unique_lock<std::mutex> lc(_release_lock); fmt::memory_buffer debug_string_buffer; diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 4bf58a708a5..0965ec1c18f 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -64,8 +64,6 @@ public: // must be call after all pipeline task is finish to release resource Status close(Status exec_status); - Status close_sink(Status exec_status); - PipelineFragmentContext* fragment_context() { return _fragment_context; } QueryContext* query_context(); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 8534638f681..888d4069731 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -179,9 +179,6 @@ void FragmentMgr::stop() { std::lock_guard<std::mutex> lock(_lock); _fragment_instance_map.clear(); _query_ctx_map.clear(); - for (auto& pipeline : _pipeline_map) { - pipeline.second->close_sink(); - } _pipeline_map.clear(); } _async_report_thread_pool->shutdown(); @@ -860,7 +857,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, SCOPED_RAW_TIMER(&duration_ns); auto prepare_st = context->prepare(params); if (!prepare_st.ok()) { - context->close_if_prepare_failed(prepare_st); + query_ctx->cancel(prepare_st, params.fragment_id); query_ctx->set_execution_dependency_ready(); return prepare_st; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org