This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 5fd9f58bd3 [Chore](pipeline-engine) adjus queryt canceled log on pipeline engine (#20702) 5fd9f58bd3 is described below commit 5fd9f58bd3eca510b825d5dfc92927ffa3f31dc0 Author: Pxl <pxl...@qq.com> AuthorDate: Mon Jun 12 18:23:19 2023 +0800 [Chore](pipeline-engine) adjus queryt canceled log on pipeline engine (#20702) adjus queryt canceled log on pipeline engine --- be/src/pipeline/exec/operator.cpp | 6 +++--- be/src/pipeline/exec/union_source_operator.cpp | 5 ++--- be/src/pipeline/pipeline_fragment_context.cpp | 13 ++++++++++--- be/src/pipeline/pipeline_task.cpp | 4 ++++ be/src/pipeline/task_scheduler.cpp | 15 ++------------- 5 files changed, 21 insertions(+), 22 deletions(-) diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 3fd4e34eba..40da74ffb0 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -49,9 +49,9 @@ const RowDescriptor& OperatorBase::row_desc() { std::string OperatorBase::debug_string() const { std::stringstream ss; - ss << _operator_builder->get_name() << ", is source: " << is_source(); - ss << ", is sink: " << is_sink() << ", is closed: " << _is_closed; - ss << ", is pending finish: " << is_pending_finish(); + ss << _operator_builder->get_name() << ", is_source: " << is_source(); + ss << ", is_sink: " << is_sink() << ", is_closed: " << _is_closed; + ss << ", is_pending_finish: " << is_pending_finish(); return ss.str(); } diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index 7311d849c8..6efd0bfc78 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -83,10 +83,9 @@ Status UnionSourceOperator::get_block(RuntimeState* state, vectorized::Block* bl std::bind(&UnionSourceOperator::pull_data, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))); //have exectue const expr, queue have no data any more, and child could be colsed - if (eos || (!_need_read_for_const_expr && !_data_queue->remaining_has_data() && - _data_queue->is_all_finish())) { + if (eos || (!can_read() && _data_queue->is_all_finish())) { source_state = SourceState::FINISHED; - } else if (_need_read_for_const_expr || _data_queue->remaining_has_data()) { + } else if (can_read()) { source_state = SourceState::MORE_DATA; } else { source_state = SourceState::DEPEND_ON_SOURCE; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 1766e9aa2e..9ecea24228 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -152,6 +152,12 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, _exec_status = Status::Cancelled(msg); } _runtime_state->set_is_cancelled(true); + + LOG(WARNING) << "PipelineFragmentContext Canceled. reason=" << msg; + for (auto& task : _tasks) { + LOG(WARNING) << task->debug_string(); + } + _runtime_state->set_process_status(_exec_status); // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe // For stream load the fragment's query_id == load id, it is set in FE. @@ -321,6 +327,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re Status PipelineFragmentContext::_build_pipeline_tasks( const doris::TPipelineFragmentParams& request) { + _total_tasks = 0; for (PipelinePtr& pipeline : _pipelines) { // if sink auto sink = pipeline->sink()->build_operator(); @@ -329,8 +336,9 @@ Status PipelineFragmentContext::_build_pipeline_tasks( Operators operators; RETURN_IF_ERROR(pipeline->build_operators(operators)); - auto task = std::make_unique<PipelineTask>(pipeline, 0, _runtime_state.get(), operators, - sink, this, pipeline->pipeline_profile()); + auto task = + std::make_unique<PipelineTask>(pipeline, _total_tasks++, _runtime_state.get(), + operators, sink, this, pipeline->pipeline_profile()); sink->set_child(task->get_root()); _tasks.emplace_back(std::move(task)); _runtime_profile->add_child(pipeline->pipeline_profile(), true, nullptr); @@ -339,7 +347,6 @@ Status PipelineFragmentContext::_build_pipeline_tasks( for (auto& task : _tasks) { RETURN_IF_ERROR(task->prepare(_runtime_state.get())); } - _total_tasks = _tasks.size(); // register the profile of child data stream sender for (auto& sender : _multi_cast_stream_sink_senders) { diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 7c2379796a..b8ab119c34 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -310,6 +310,10 @@ std::string PipelineTask::debug_string() { _fresh_profile_counter(); _task_profile->pretty_print(&profile_ss, ""); + fmt::format_to(debug_string_buffer, "QueryId: {}\n", print_id(query_context()->query_id)); + fmt::format_to(debug_string_buffer, "InstanceId: {}\n", + print_id(fragment_context()->get_fragment_instance_id())); + fmt::format_to(debug_string_buffer, "Profile: {}\n", profile_ss.str()); fmt::format_to(debug_string_buffer, "PipelineTask[id = {}, state = {}]\noperators: ", _index, get_state_name(_cur_state)); diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 831c5e2546..f5c55ccbdd 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -118,15 +118,6 @@ void BlockedTaskScheduler::_schedule() { PipelineTaskState::PENDING_FINISH); } } else if (task->fragment_context()->is_canceled()) { - std::string task_ds; -#ifndef NDEBUG - task_ds = task->debug_string(); -#endif - LOG(WARNING) << "Canceled, query_id=" << print_id(task->query_context()->query_id) - << ", instance_id=" - << print_id(task->fragment_context()->get_fragment_instance_id()) - << (task_ds.empty() ? "" : task_ds); - if (task->is_pending_finish()) { task->set_state(PipelineTaskState::PENDING_FINISH); iter++; @@ -213,8 +204,6 @@ void BlockedTaskScheduler::_make_task_run(std::list<PipelineTask*>& local_tasks, ready_tasks.emplace_back(task); } -///////////////////////// TaskScheduler /////////////////////////////////////////////////////////////////////////// - TaskScheduler::~TaskScheduler() { shutdown(); } @@ -289,8 +278,8 @@ void TaskScheduler::_do_work(size_t index) { task->set_previous_core_id(index); if (!status.ok()) { - LOG(WARNING) << fmt::format("Pipeline task [{}] failed: {}", task->debug_string(), - status.to_string()); + LOG(WARNING) << fmt::format("Pipeline task failed. reason: {}, task: \n{}", + status.to_string(), task->debug_string()); // exec failed,cancel all fragment instance fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, status.to_string()); fragment_ctx->send_report(true); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org