This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 846246e90b54a1132f967a4d44c3a6722e68de81 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Fri Mar 1 14:26:37 2024 +0800 [pipelineX](bug) cancel pipeline tasks if timeout (#31635) --- be/src/pipeline/pipeline_fragment_context.cpp | 14 ++++++++++++++ be/src/pipeline/pipeline_fragment_context.h | 5 +++++ be/src/runtime/fragment_mgr.cpp | 10 ++++++++++ be/src/runtime/plan_fragment_executor.cpp | 2 +- 4 files changed, 30 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 239173fa781..44533e3a596 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -132,6 +132,7 @@ PipelineFragmentContext::PipelineFragmentContext( _report_status_cb(std::move(report_status_cb)), _create_time(MonotonicNanos()) { _fragment_watcher.start(); + _start_time = VecDateTimeValue::local_time(); } PipelineFragmentContext::~PipelineFragmentContext() { @@ -146,6 +147,16 @@ PipelineFragmentContext::~PipelineFragmentContext() { } } +bool PipelineFragmentContext::is_timeout(const VecDateTimeValue& now) const { + if (_timeout <= 0) { + return false; + } + if (now.second_diff(_start_time) > _timeout) { + return true; + } + return false; +} + void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { LOG_INFO("PipelineFragmentContext::cancel") @@ -214,6 +225,9 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re if (_prepared) { return Status::InternalError("Already prepared"); } + if (request.__isset.query_options && request.query_options.__isset.execution_timeout) { + _timeout = request.query_options.execution_timeout; + } const auto& local_params = request.local_params[idx]; _runtime_profile = std::make_unique<RuntimeProfile>("PipelineContext"); _start_timer = ADD_TIMER(_runtime_profile, "StartTime"); diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 9925689cb2c..065fadd27e3 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -67,6 +67,8 @@ public: ~PipelineFragmentContext() override; + bool is_timeout(const VecDateTimeValue& now) const; + PipelinePtr add_pipeline(); PipelinePtr add_pipeline(PipelinePtr parent, int idx = -1); @@ -207,6 +209,9 @@ protected: DescriptorTbl* _desc_tbl = nullptr; int _num_instances = 1; + VecDateTimeValue _start_time; + int _timeout = -1; + private: std::vector<std::unique_ptr<PipelineTask>> _tasks; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index c722fb24bb7..29f4065ab3d 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1085,6 +1085,16 @@ void FragmentMgr::cancel_worker() { to_cancel.push_back(fragment_instance_itr.second->fragment_instance_id()); } } + for (auto& pipeline_itr : _pipeline_map) { + if (pipeline_itr.second->is_timeout(now)) { + std::vector<TUniqueId> ins_ids; + reinterpret_cast<pipeline::PipelineXFragmentContext*>(pipeline_itr.second.get()) + ->instance_ids(ins_ids); + for (auto& ins_id : ins_ids) { + to_cancel.push_back(ins_id); + } + } + } for (auto it = _query_ctx_map.begin(); it != _query_ctx_map.end();) { if (it->second->is_timeout(now)) { LOG_WARNING("Query {} is timeout", print_id(it->first)); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 9040fa12040..0617a89bf01 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -113,7 +113,7 @@ PlanFragmentExecutor::~PlanFragmentExecutor() { } Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) { - if (request.__isset.query_options) { + if (request.__isset.query_options && request.query_options.__isset.execution_timeout) { _timeout_second = request.query_options.execution_timeout; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org