This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 7d34512501 [Bug](pipeline) Fix DCHECK failure (#15928) 7d34512501 is described below commit 7d34512501a2867cfcc012a6054e3422d88ffe92 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Tue Jan 17 12:01:20 2023 +0800 [Bug](pipeline) Fix DCHECK failure (#15928) --- be/src/pipeline/exec/operator.cpp | 4 ++-- be/src/pipeline/exec/operator.h | 4 +++- be/src/pipeline/exec/scan_operator.cpp | 16 ++++++++++++++++ be/src/pipeline/exec/scan_operator.h | 4 ++++ be/src/pipeline/pipeline_task.cpp | 4 ++++ be/src/pipeline/pipeline_task.h | 3 +++ be/src/pipeline/task_scheduler.cpp | 1 + be/src/vec/exec/scan/scanner_context.h | 4 ++++ be/src/vec/exec/scan/vscan_node.cpp | 9 +++++++++ be/src/vec/exec/scan/vscan_node.h | 2 ++ 10 files changed, 48 insertions(+), 3 deletions(-) diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 3b78b59e7c..39cb8b8814 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -44,8 +44,8 @@ const RowDescriptor& OperatorBase::row_desc() { std::string OperatorBase::debug_string() const { std::stringstream ss; - ss << _operator_builder->get_name() << ", source: " << is_source(); - ss << ", sink: " << is_sink() << ", is closed: " << _is_closed; + ss << _operator_builder->get_name() << ", is source: " << is_source(); + ss << ", is sink: " << is_sink() << ", is closed: " << _is_closed; ss << ", is pending finish: " << is_pending_finish(); return ss.str(); } diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index e2fde2d9e5..32ede3618a 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -216,6 +216,8 @@ public: */ virtual bool is_pending_finish() const { return false; } + virtual Status try_close() { return Status::OK(); } + bool is_closed() const { return _is_closed; } MemTracker* mem_tracker() const { return _mem_tracker.get(); } @@ -225,7 +227,7 @@ public: const RowDescriptor& row_desc(); RuntimeProfile* runtime_profile() { return _runtime_profile.get(); } - std::string debug_string() const; + virtual std::string debug_string() const; int32_t id() const { return _operator_builder->id(); } protected: diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 16b210ddf7..f673ca1afa 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -39,8 +39,24 @@ bool ScanOperator::is_pending_finish() const { return _node->_scanner_ctx && !_node->_scanner_ctx->no_schedule(); } +Status ScanOperator::try_close() { + return _node->try_close(); +} + bool ScanOperator::runtime_filters_are_ready_or_timeout() { return _node->runtime_filters_are_ready_or_timeout(); } +std::string ScanOperator::debug_string() const { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, "{}, scanner_ctx is null: {} ", + SourceOperator::debug_string(), _node->_scanner_ctx == nullptr); + if (_node->_scanner_ctx) { + fmt::format_to(debug_string_buffer, ", num_running_scanners = {}, num_scheduling_ctx = {} ", + _node->_scanner_ctx->get_num_running_scanners(), + _node->_scanner_ctx->get_num_scheduling_ctx()); + } + return fmt::to_string(debug_string_buffer); +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index f71a04af8a..a4810f9730 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -43,6 +43,10 @@ public: bool is_pending_finish() const override; bool runtime_filters_are_ready_or_timeout() override; + + std::string debug_string() const override; + + Status try_close() override; }; } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 8aa033a759..1f7d12f15d 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -194,6 +194,10 @@ Status PipelineTask::finalize() { return _sink->finalize(_state); } +Status PipelineTask::try_close() { + return _source->try_close(); +} + Status PipelineTask::close() { int64_t close_ns = 0; Status s; diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 2e57cf6453..3a37778a48 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -110,6 +110,9 @@ public: Status execute(bool* eos); + // Try to close this pipeline task. If there are still some resources need to be released after `try_close`, + // this task will enter the `PENDING_FINISH` state. + Status try_close(); // if the pipeline create a bunch of pipeline task // must be call after all pipeline task is finish to release resource Status close(); diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 3ee4cb82d1..bb5323eccd 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -290,6 +290,7 @@ void TaskScheduler::_do_work(size_t index) { void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state) { // state only should be CANCELED or FINISHED + task->try_close(); if (task->is_pending_finish()) { task->set_state(PENDING_FINISH); _blocked_task_scheduler->add_blocked_task(task); diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index c6c6afe1e3..df5639efa4 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -114,6 +114,10 @@ public: _ctx_finish_cv.notify_one(); } + const int get_num_running_scanners() const { return _num_running_scanners; } + + const int get_num_scheduling_ctx() const { return _num_scheduling_ctx; } + void get_next_batch_of_scanners(std::list<VScanner*>* current_run); void clear_and_join(); diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 281a491827..7b00afb748 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -376,6 +376,15 @@ void VScanNode::release_resource(RuntimeState* state) { ExecNode::release_resource(state); } +Status VScanNode::try_close() { + if (_scanner_ctx.get()) { + // mark this scanner ctx as should_stop to make sure scanners will not be scheduled anymore + // TODO: there is a lock in `set_should_stop` may cause some slight impact + _scanner_ctx->set_should_stop(); + } + return Status::OK(); +} + Status VScanNode::_normalize_conjuncts() { // The conjuncts is always on output tuple, so use _output_tuple_desc; std::vector<SlotDescriptor*> slots = _output_tuple_desc->slots(); diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 1eba5c88e5..b8c237e66e 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -92,6 +92,8 @@ public: void release_resource(RuntimeState* state) override; bool runtime_filters_are_ready_or_timeout(); + Status try_close(); + enum class PushDownType { // The predicate can not be pushed down to data source UNACCEPTABLE, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org