This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b3d3ffa2de [Bug](pipeline) adjust scanner scheduler.submit and _num_scheduling_ctx maintain (#21843) b3d3ffa2de is described below commit b3d3ffa2de005e4aa4317e30f39b7ab7df57f05b Author: Pxl <pxl...@qq.com> AuthorDate: Tue Jul 18 11:55:21 2023 +0800 [Bug](pipeline) adjust scanner scheduler.submit and _num_scheduling_ctx maintain (#21843) adjust scanner scheduler.submit and _num_scheduling_ctx maintain --- be/src/olap/tablet_schema_cache.cpp | 1 - be/src/vec/exec/scan/pip_scanner_context.h | 2 +- be/src/vec/exec/scan/scanner_context.cpp | 38 ++++++++++++++++++++---------- be/src/vec/exec/scan/scanner_context.h | 11 ++++----- be/src/vec/exec/scan/scanner_scheduler.cpp | 6 +++-- be/src/vec/exec/scan/scanner_scheduler.h | 3 +-- 6 files changed, 36 insertions(+), 25 deletions(-) diff --git a/be/src/olap/tablet_schema_cache.cpp b/be/src/olap/tablet_schema_cache.cpp index 24aa72a995..ee14358495 100644 --- a/be/src/olap/tablet_schema_cache.cpp +++ b/be/src/olap/tablet_schema_cache.cpp @@ -42,7 +42,6 @@ void TabletSchemaCache::stop() { while (!_is_stopped) { std::this_thread::sleep_for(std::chrono::seconds(1)); } - LOG(INFO) << "xxx stopped"; } /** diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index 731c3bb926..c720c22d04 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -45,7 +45,7 @@ public: { std::unique_lock l(_transfer_lock); if (state->is_cancelled()) { - _process_status = Status::Cancelled("cancelled"); + set_status_on_error(Status::Cancelled("cancelled"), false); } if (!_process_status.ok()) { diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index b8e5847a15..049283647c 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -28,6 +28,7 @@ #include <utility> #include "common/config.h" +#include "common/status.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/query_context.h" @@ -188,8 +189,12 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo // At this point, consumers are required to trigger new scheduling to ensure that // data can be continuously fetched. if (has_enough_space_in_blocks_queue() && _num_running_scanners == 0) { - _num_scheduling_ctx++; - _scanner_scheduler->submit(this); + auto state = _scanner_scheduler->submit(this); + if (state.ok()) { + _num_scheduling_ctx++; + } else { + set_status_on_error(state, false); + } } // Wait for block from queue if (wait) { @@ -201,11 +206,11 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo } if (state->is_cancelled()) { - _process_status = Status::Cancelled("cancelled"); + set_status_on_error(Status::Cancelled("cancelled"), false); } - if (!_process_status.ok()) { - return _process_status; + if (!status().ok()) { + return status(); } if (!_blocks_queue.empty()) { @@ -221,12 +226,16 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo return Status::OK(); } -bool ScannerContext::set_status_on_error(const Status& status) { - std::lock_guard l(_transfer_lock); +bool ScannerContext::set_status_on_error(const Status& status, bool need_lock) { + std::unique_lock l(_transfer_lock, std::defer_lock); + if (need_lock) { + l.lock(); + } if (_process_status.ok()) { _process_status = status; _status_error = true; _blocks_queue_added_cv.notify_one(); + _should_stop = true; return true; } return false; @@ -326,10 +335,12 @@ std::string ScannerContext::debug_string() { void ScannerContext::reschedule_scanner_ctx() { std::lock_guard l(_transfer_lock); - auto submit_st = _scanner_scheduler->submit(this); + auto state = _scanner_scheduler->submit(this); //todo(wb) rethinking is it better to mark current scan_context failed when submit failed many times? - if (submit_st.ok()) { + if (state.ok()) { _num_scheduling_ctx++; + } else { + set_status_on_error(state, false); } } @@ -340,10 +351,11 @@ void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) { } std::lock_guard l(_transfer_lock); if (has_enough_space_in_blocks_queue()) { - _num_scheduling_ctx++; - auto submit_st = _scanner_scheduler->submit(this); - if (!submit_st.ok()) { - _num_scheduling_ctx--; + auto state = _scanner_scheduler->submit(this); + if (state.ok()) { + _num_scheduling_ctx++; + } else { + set_status_on_error(state, false); } } diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index fa264c756a..db36dfe22f 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -86,10 +86,12 @@ public: // to return the scanner to the list for next scheduling. void push_back_scanner_and_reschedule(VScannerSPtr scanner); - bool set_status_on_error(const Status& status); + bool set_status_on_error(const Status& status, bool need_lock = true); Status status() { - std::lock_guard l(_transfer_lock); + if (_process_status.is<ErrorCode::END_OF_FILE>()) { + return Status::OK(); + } return _process_status; } @@ -102,10 +104,7 @@ public: } // Return true if this ScannerContext need no more process - virtual bool done() { - std::unique_lock l(_transfer_lock); - return _is_finished || _should_stop || !_process_status.ok(); - } + virtual bool done() { return _is_finished || _should_stop; } // Update the running num of scanners and contexts void update_num_running(int32_t scanner_inc, int32_t sched_inc) { diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index c37760167f..e6bec4f9d9 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -53,7 +53,7 @@ namespace doris::vectorized { -ScannerScheduler::ScannerScheduler() {} +ScannerScheduler::ScannerScheduler() = default; ScannerScheduler::~ScannerScheduler() { if (!_is_init) { @@ -135,6 +135,9 @@ Status ScannerScheduler::init(ExecEnv* env) { } Status ScannerScheduler::submit(ScannerContext* ctx) { + if (ctx->done()) { + return Status::EndOfFile("ScannerContext is done"); + } if (ctx->queue_idx == -1) { ctx->queue_idx = (_queue_idx++ % QUEUE_NUM); } @@ -163,7 +166,6 @@ void ScannerScheduler::_schedule_thread(int queue_id) { // If ctx is done, no need to schedule it again. // But should notice that there may still scanners running in scanner pool. } - return; } [[maybe_unused]] static void* run_scanner_bthread(void* arg) { diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index d88c844a3e..ccc809becd 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -67,7 +67,7 @@ public: Status init(ExecEnv* env); - Status submit(ScannerContext* ctx); + [[nodiscard]] Status submit(ScannerContext* ctx); std::unique_ptr<ThreadPoolToken> new_limited_scan_pool_token(ThreadPool::ExecutionMode mode, int max_concurrency); @@ -86,7 +86,6 @@ private: void _task_group_scanner_scan(ScannerScheduler* scheduler, taskgroup::ScanTaskTaskGroupQueue* scan_queue); -private: // Scheduling queue number. // TODO: make it configurable. static const int QUEUE_NUM = 4; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org