This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 846246e90b54a1132f967a4d44c3a6722e68de81
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Fri Mar 1 14:26:37 2024 +0800

    [pipelineX](bug) cancel pipeline tasks if timeout (#31635)
---
 be/src/pipeline/pipeline_fragment_context.cpp | 14 ++++++++++++++
 be/src/pipeline/pipeline_fragment_context.h   |  5 +++++
 be/src/runtime/fragment_mgr.cpp               | 10 ++++++++++
 be/src/runtime/plan_fragment_executor.cpp     |  2 +-
 4 files changed, 30 insertions(+), 1 deletion(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 239173fa781..44533e3a596 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -132,6 +132,7 @@ PipelineFragmentContext::PipelineFragmentContext(
           _report_status_cb(std::move(report_status_cb)),
           _create_time(MonotonicNanos()) {
     _fragment_watcher.start();
+    _start_time = VecDateTimeValue::local_time();
 }
 
 PipelineFragmentContext::~PipelineFragmentContext() {
@@ -146,6 +147,16 @@ PipelineFragmentContext::~PipelineFragmentContext() {
     }
 }
 
+bool PipelineFragmentContext::is_timeout(const VecDateTimeValue& now) const {
+    if (_timeout <= 0) {
+        return false;
+    }
+    if (now.second_diff(_start_time) > _timeout) {
+        return true;
+    }
+    return false;
+}
+
 void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
                                      const std::string& msg) {
     LOG_INFO("PipelineFragmentContext::cancel")
@@ -214,6 +225,9 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
     if (_prepared) {
         return Status::InternalError("Already prepared");
     }
+    if (request.__isset.query_options && 
request.query_options.__isset.execution_timeout) {
+        _timeout = request.query_options.execution_timeout;
+    }
     const auto& local_params = request.local_params[idx];
     _runtime_profile = std::make_unique<RuntimeProfile>("PipelineContext");
     _start_timer = ADD_TIMER(_runtime_profile, "StartTime");
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 9925689cb2c..065fadd27e3 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -67,6 +67,8 @@ public:
 
     ~PipelineFragmentContext() override;
 
+    bool is_timeout(const VecDateTimeValue& now) const;
+
     PipelinePtr add_pipeline();
 
     PipelinePtr add_pipeline(PipelinePtr parent, int idx = -1);
@@ -207,6 +209,9 @@ protected:
     DescriptorTbl* _desc_tbl = nullptr;
     int _num_instances = 1;
 
+    VecDateTimeValue _start_time;
+    int _timeout = -1;
+
 private:
     std::vector<std::unique_ptr<PipelineTask>> _tasks;
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index c722fb24bb7..29f4065ab3d 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1085,6 +1085,16 @@ void FragmentMgr::cancel_worker() {
                     
to_cancel.push_back(fragment_instance_itr.second->fragment_instance_id());
                 }
             }
+            for (auto& pipeline_itr : _pipeline_map) {
+                if (pipeline_itr.second->is_timeout(now)) {
+                    std::vector<TUniqueId> ins_ids;
+                    
reinterpret_cast<pipeline::PipelineXFragmentContext*>(pipeline_itr.second.get())
+                            ->instance_ids(ins_ids);
+                    for (auto& ins_id : ins_ids) {
+                        to_cancel.push_back(ins_id);
+                    }
+                }
+            }
             for (auto it = _query_ctx_map.begin(); it != 
_query_ctx_map.end();) {
                 if (it->second->is_timeout(now)) {
                     LOG_WARNING("Query {} is timeout", print_id(it->first));
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index 9040fa12040..0617a89bf01 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -113,7 +113,7 @@ PlanFragmentExecutor::~PlanFragmentExecutor() {
 }
 
 Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) {
-    if (request.__isset.query_options) {
+    if (request.__isset.query_options && 
request.query_options.__isset.execution_timeout) {
         _timeout_second = request.query_options.execution_timeout;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to