This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 80e863cbcf6 [refactor](pipeline) Delete unnecessary code (#35415)
80e863cbcf6 is described below

commit 80e863cbcf6ed898157bb186c442c5a1e22498ac
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Mon May 27 15:03:48 2024 +0800

    [refactor](pipeline) Delete unnecessary code (#35415)
---
 be/src/pipeline/pipeline_fragment_context.cpp | 22 ----------------------
 be/src/pipeline/pipeline_fragment_context.h   | 10 ----------
 be/src/pipeline/pipeline_task.cpp             |  4 ----
 be/src/pipeline/pipeline_task.h               |  2 --
 be/src/runtime/fragment_mgr.cpp               |  5 +----
 5 files changed, 1 insertion(+), 42 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index fd4c903d5aa..89588e39471 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1500,28 +1500,6 @@ Status PipelineFragmentContext::submit() {
     }
 }
 
-void PipelineFragmentContext::close_sink() {
-    for (auto& tasks : _tasks) {
-        auto& root_task = *tasks.begin();
-        auto st = root_task->close_sink(_prepared ? 
Status::RuntimeError("prepare failed")
-                                                  : Status::OK());
-        if (!st.ok()) {
-            LOG_WARNING("PipelineFragmentContext::close_sink() 
error").tag("msg", st.msg());
-        }
-    }
-}
-
-void PipelineFragmentContext::close_if_prepare_failed(Status st) {
-    for (auto& task : _tasks) {
-        for (auto& t : task) {
-            DCHECK(!t->is_pending_finish());
-            WARN_IF_ERROR(t->close(st), "close_if_prepare_failed failed: ");
-            close_a_pipeline();
-        }
-    }
-    _query_ctx->cancel(st, _fragment_id);
-}
-
 // If all pipeline tasks binded to the fragment instance are finished, then we 
could
 // close the fragment instance.
 void PipelineFragmentContext::_close_fragment_instance() {
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 03f405e0401..8bc1eb29139 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -92,9 +92,6 @@ public:
 
     Status submit();
 
-    void close_if_prepare_failed(Status st);
-    void close_sink();
-
     void set_is_report_success(bool is_report_success) { _is_report_success = 
is_report_success; }
 
     void cancel(const Status reason);
@@ -120,8 +117,6 @@ public:
 
     [[nodiscard]] int next_sink_operator_id() { return _sink_operator_id--; }
 
-    [[nodiscard]] int max_sink_operator_id() const { return _sink_operator_id; 
}
-
     void instance_ids(std::vector<TUniqueId>& ins_ids) const {
         ins_ids.resize(_fragment_instance_ids.size());
         for (size_t i = 0; i < _fragment_instance_ids.size(); i++) {
@@ -136,11 +131,6 @@ public:
         }
     }
 
-    void add_merge_controller_handler(
-            std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
-        _merge_controller_handlers.emplace_back(handler);
-    }
-
 private:
     Status _build_pipelines(ObjectPool* pool, const 
doris::TPipelineFragmentParams& request,
                             const DescriptorTbl& descs, OperatorXPtr* root, 
PipelinePtr cur_pipe);
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 781566157f0..867dc49dc33 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -449,10 +449,6 @@ Status PipelineTask::close(Status exec_status) {
     return s;
 }
 
-Status PipelineTask::close_sink(Status exec_status) {
-    return _sink->close(_state, exec_status);
-}
-
 std::string PipelineTask::debug_string() {
     std::unique_lock<std::mutex> lc(_release_lock);
     fmt::memory_buffer debug_string_buffer;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 4bf58a708a5..0965ec1c18f 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -64,8 +64,6 @@ public:
     // must be call after all pipeline task is finish to release resource
     Status close(Status exec_status);
 
-    Status close_sink(Status exec_status);
-
     PipelineFragmentContext* fragment_context() { return _fragment_context; }
 
     QueryContext* query_context();
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 8534638f681..888d4069731 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -179,9 +179,6 @@ void FragmentMgr::stop() {
         std::lock_guard<std::mutex> lock(_lock);
         _fragment_instance_map.clear();
         _query_ctx_map.clear();
-        for (auto& pipeline : _pipeline_map) {
-            pipeline.second->close_sink();
-        }
         _pipeline_map.clear();
     }
     _async_report_thread_pool->shutdown();
@@ -860,7 +857,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
         SCOPED_RAW_TIMER(&duration_ns);
         auto prepare_st = context->prepare(params);
         if (!prepare_st.ok()) {
-            context->close_if_prepare_failed(prepare_st);
+            query_ctx->cancel(prepare_st, params.fragment_id);
             query_ctx->set_execution_dependency_ready();
             return prepare_st;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to