This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 9a7f05caf86 [refactor](status) refactor querycontext and runtime state status (#35035) 9a7f05caf86 is described below commit 9a7f05caf865bfac301ef2c62ba01aa4a3b03801 Author: yiguolei <676222...@qq.com> AuthorDate: Mon May 20 13:12:53 2024 +0800 [refactor](status) refactor querycontext and runtime state status (#35035) --------- Co-authored-by: yiguolei <yiguo...@gmail.com> --- be/src/common/status.h | 6 ++-- be/src/olap/delta_writer_v2.cpp | 2 +- .../local_exchange_sink_operator.cpp | 2 +- be/src/pipeline/pipeline_fragment_context.cpp | 10 ++---- be/src/pipeline/pipeline_fragment_context.h | 10 ------ be/src/runtime/fragment_mgr.cpp | 8 ++--- be/src/runtime/group_commit_mgr.cpp | 4 +-- be/src/runtime/plan_fragment_executor.cpp | 5 ++- be/src/runtime/query_context.cpp | 19 ++++------- be/src/runtime/query_context.h | 29 ++++------------- be/src/runtime/runtime_state.cpp | 19 ++--------- be/src/runtime/runtime_state.h | 37 ++++------------------ be/src/udf/udf.cpp | 2 +- be/src/vec/sink/writer/vtablet_writer.cpp | 2 +- 14 files changed, 40 insertions(+), 115 deletions(-) diff --git a/be/src/common/status.h b/be/src/common/status.h index 6f587d5a28f..f0a02157c22 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -575,7 +575,7 @@ public: } // will copy a new status object to avoid concurrency - Status status() { + Status status() const { std::lock_guard l(mutex_); return error_st_; } @@ -583,7 +583,9 @@ public: private: std::atomic_int16_t error_code_ = 0; Status error_st_; - std::mutex mutex_; + // mutex's lock is not a const method, but we will use this mutex in + // some const method, so that it should be mutable. + mutable std::mutex mutex_; AtomicStatus(const AtomicStatus&) = delete; void operator=(const AtomicStatus&) = delete; diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 34c03fee95e..80978280b92 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -156,7 +156,7 @@ Status DeltaWriterV2::write(const vectorized::Block* block, const std::vector<ui { memtable_flush_running_count_limit = 0; }); while (_memtable_writer->flush_running_count() >= memtable_flush_running_count_limit) { if (_state->is_cancelled()) { - return Status::Cancelled(_state->cancel_reason()); + return _state->cancel_reason(); } std::this_thread::sleep_for(std::chrono::milliseconds(10)); } diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp index 9f6cb670b7d..043b66b2e08 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp @@ -54,7 +54,7 @@ Status LocalExchangeSinkLocalState::close(RuntimeState* state, Status exec_statu if (exec_status.ok()) { DCHECK(_release_count) << "Do not finish correctly! " << debug_string(0) << " state: { cancel = " << state->is_cancelled() << ", " - << state->query_status().to_string() << "} query ctx: { cancel = " + << state->cancel_reason().to_string() << "} query ctx: { cancel = " << state->get_query_ctx()->is_cancelled() << ", " << state->get_query_ctx()->exec_status().to_string() << "}"; } diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index dfc8ab434a7..8f90e2389ab 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1441,8 +1441,8 @@ Status PipelineFragmentContext::submit() { for (auto& t : task) { st = scheduler->schedule_task(t.get()); if (!st) { - std::lock_guard<std::mutex> l(_status_lock); cancel(Status::InternalError("submit context to executor fail")); + std::lock_guard<std::mutex> l(_task_mutex); _total_tasks = submit_tasks; break; } @@ -1539,12 +1539,7 @@ void PipelineFragmentContext::close_a_pipeline() { } Status PipelineFragmentContext::send_report(bool done) { - Status exec_status = Status::OK(); - { - std::lock_guard<std::mutex> l(_status_lock); - exec_status = _query_ctx->exec_status(); - } - + Status exec_status = _query_ctx->exec_status(); // If plan is done successfully, but _is_report_success is false, // no need to send report. if (!_is_report_success && done && exec_status.ok()) { @@ -1577,7 +1572,6 @@ Status PipelineFragmentContext::send_report(bool done) { TUniqueId(), -1, _runtime_state.get(), - [this](Status st) { return update_status(st); }, [this](const Status& reason) { cancel(reason); }}; return _report_status_cb( diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 383706ee9e5..f4e324b6f53 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -107,14 +107,6 @@ public: Status send_report(bool); - Status update_status(Status status) { - std::lock_guard<std::mutex> l(_status_lock); - if (!status.ok() && _query_ctx->exec_status().ok()) { - _query_ctx->set_exec_status(status); - } - return _query_ctx->exec_status(); - } - void trigger_report_if_necessary(); void refresh_next_report_time(); @@ -207,8 +199,6 @@ private: std::atomic_bool _prepared = false; bool _submitted = false; - std::mutex _status_lock; - Pipelines _pipelines; PipelineId _next_pipeline_id = 0; std::mutex _task_mutex; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index faefadfc49a..d910dfe97d5 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -211,14 +211,14 @@ Status FragmentMgr::trigger_pipeline_context_report( // including the final status when execution finishes. void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { DCHECK(req.status.ok() || req.done); // if !status.ok() => done - Status exec_status = req.update_fn(req.status); + Status exec_status = req.status; Status coord_status; FrontendServiceConnection coord(_exec_env->frontend_client_cache(), req.coord_addr, &coord_status); if (!coord_status.ok()) { std::stringstream ss; UniqueId uid(req.query_id.hi, req.query_id.lo); - static_cast<void>(req.update_fn(Status::InternalError( + static_cast<void>(req.cancel_fn(Status::InternalError( "query_id: {}, couldn't get a client for {}, reason is {}", uid.to_string(), PrintThriftNetworkAddress(req.coord_addr), coord_status.to_string()))); return; @@ -438,7 +438,6 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { if (!rpc_status.ok()) { // we need to cancel the execution of this fragment - static_cast<void>(req.update_fn(rpc_status)); req.cancel_fn(rpc_status); return; } @@ -455,7 +454,6 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { LOG_INFO("Going to cancel instance {} since report exec status got rpc failed: {}", print_id(req.fragment_instance_id), rpc_status.to_string()); // we need to cancel the execution of this fragment - static_cast<void>(req.update_fn(rpc_status)); req.cancel_fn(rpc_status); } } @@ -589,7 +587,7 @@ Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* r "timeout or be cancelled. host: {}", BackendOptions::get_localhost()); } - search->second->set_ready_to_execute(false); + search->second->set_ready_to_execute(Status::OK()); return Status::OK(); } diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 62fbbd37979..7c182814e12 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -57,7 +57,7 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state, } } if (UNLIKELY(runtime_state->is_cancelled())) { - return Status::Cancelled<false>(runtime_state->cancel_reason()); + return runtime_state->cancel_reason(); } RETURN_IF_ERROR(status); if (block->rows() > 0) { @@ -134,7 +134,7 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* _get_cond.wait_for(l, std::chrono::milliseconds(left_milliseconds)); } if (runtime_state->is_cancelled()) { - auto st = Status::Cancelled<false>(runtime_state->cancel_reason()); + auto st = runtime_state->cancel_reason(); _cancel_without_lock(st); return st; } diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 64bc0cde977..304a633773a 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -502,7 +502,6 @@ void PlanFragmentExecutor::send_report(bool done) { _fragment_instance_id, _backend_num, _runtime_state.get(), - std::bind(&PlanFragmentExecutor::update_status, this, std::placeholders::_1), std::bind(&PlanFragmentExecutor::cancel, this, std::placeholders::_1)}; // This will send a report even if we are cancelled. If the query completed correctly // but fragments still need to be cancelled (e.g. limit reached), the coordinator will @@ -550,9 +549,9 @@ void PlanFragmentExecutor::cancel(const Status& reason) { if (reason.is<ErrorCode::LIMIT_REACH>()) { _is_report_on_cancel = false; } - _runtime_state->set_is_cancelled(reason.to_string()); + _runtime_state->cancel(reason); // To notify wait_for_start() - _query_ctx->set_ready_to_execute(true); + _query_ctx->set_ready_to_execute(reason); // must close stream_mgr to avoid dead lock in Exchange Node _exec_env->vstream_mgr()->cancel(_fragment_instance_id, reason); diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index c141fc3b223..5360fbe4e4b 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -182,17 +182,15 @@ QueryContext::~QueryContext() { LOG_INFO("Query {} deconstructed, {}", print_id(this->_query_id), mem_tracker_msg); } -void QueryContext::set_ready_to_execute(bool is_cancelled) { +void QueryContext::set_ready_to_execute(Status reason) { set_execution_dependency_ready(); { std::lock_guard<std::mutex> l(_start_lock); - if (!_is_cancelled) { - _is_cancelled = is_cancelled; - } + _exec_status.update(reason); _ready_to_execute = true; } - if (query_mem_tracker && is_cancelled) { - query_mem_tracker->set_is_query_cancelled(is_cancelled); + if (query_mem_tracker && !reason.ok()) { + query_mem_tracker->set_is_query_cancelled(!reason.ok()); } _start_cond.notify_all(); } @@ -211,16 +209,11 @@ void QueryContext::set_execution_dependency_ready() { } void QueryContext::cancel(Status new_status, int fragment_id) { - // we must get this wrong status once query ctx's `_is_cancelled` = true. - set_exec_status(new_status); - // Just for CAS need a left value - bool false_cancel = false; - if (!_is_cancelled.compare_exchange_strong(false_cancel, true)) { + if (!_exec_status.update(new_status)) { return; } - DCHECK(!false_cancel && _is_cancelled); - set_ready_to_execute(true); + set_ready_to_execute(new_status); std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> ctx_to_cancel; { std::lock_guard<std::mutex> lock(_pipeline_map_write_lock); diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index bc2d8fbee7c..37f17b21c87 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -61,7 +61,6 @@ struct ReportStatusRequest { TUniqueId fragment_instance_id; int backend_num; RuntimeState* runtime_state; - std::function<Status(Status)> update_fn; std::function<void(const Status&)> cancel_fn; }; @@ -103,9 +102,9 @@ public: ThreadPoolToken* get_token() { return _thread_token.get(); } - void set_ready_to_execute(bool is_cancelled); + void set_ready_to_execute(Status reason); - [[nodiscard]] bool is_cancelled() const { return _is_cancelled.load(); } + [[nodiscard]] bool is_cancelled() const { return !_exec_status.ok(); } void cancel_all_pipeline_context(const Status& reason); Status cancel_pipeline_context(const int fragment_id, const Status& reason); @@ -113,21 +112,9 @@ public: std::shared_ptr<pipeline::PipelineFragmentContext> pip_ctx); void cancel(Status new_status, int fragment_id = -1); - void set_exec_status(Status new_status) { - if (new_status.ok()) { - return; - } - std::lock_guard<std::mutex> l(_exec_status_lock); - if (!_exec_status.ok()) { - return; - } - _exec_status = new_status; - } + void set_exec_status(Status new_status) { _exec_status.update(new_status); } - [[nodiscard]] Status exec_status() { - std::lock_guard<std::mutex> l(_exec_status_lock); - return _exec_status; - } + [[nodiscard]] Status exec_status() { return _exec_status.status(); } void set_execution_dependency_ready(); @@ -141,10 +128,10 @@ public: bool wait_for_start() { int wait_time = config::max_fragment_start_wait_time_seconds; std::unique_lock<std::mutex> l(_start_lock); - while (!_ready_to_execute.load() && !_is_cancelled.load() && --wait_time > 0) { + while (!_ready_to_execute.load() && _exec_status.ok() && --wait_time > 0) { _start_cond.wait_for(l, std::chrono::seconds(1)); } - return _ready_to_execute.load() && !_is_cancelled.load(); + return _ready_to_execute.load() && _exec_status.ok(); } std::shared_ptr<vectorized::SharedHashTableController> get_shared_hash_table_controller() { @@ -313,7 +300,6 @@ private: // Only valid when _need_wait_execution_trigger is set to true in PlanFragmentExecutor. // And all fragments of this query will start execution when this is set to true. std::atomic<bool> _ready_to_execute {false}; - std::atomic<bool> _is_cancelled {false}; void _init_query_mem_tracker(); @@ -325,10 +311,9 @@ private: std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr; const TQueryOptions _query_options; - std::mutex _exec_status_lock; // All pipeline tasks use the same query context to report status. So we need a `_exec_status` // to report the real message if failed. - Status _exec_status = Status::OK(); + AtomicStatus _exec_status; doris::pipeline::TaskScheduler* _task_scheduler = nullptr; vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr; diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index b5974ebd1da..ac560c2c7e1 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -58,7 +58,6 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params, _data_stream_recvrs_pool(new ObjectPool()), _unreported_error_idx(0), _query_id(fragment_exec_params.query_id), - _is_cancelled(false), _per_fragment_instance_idx(0), _num_rows_load_total(0), _num_rows_load_filtered(0), @@ -114,7 +113,6 @@ RuntimeState::RuntimeState(const TUniqueId& instance_id, const TUniqueId& query_ _unreported_error_idx(0), _query_id(query_id), _fragment_id(fragment_id), - _is_cancelled(false), _per_fragment_instance_idx(0), _num_rows_load_total(0), _num_rows_load_filtered(0), @@ -152,7 +150,6 @@ RuntimeState::RuntimeState(pipeline::PipelineFragmentContext*, const TUniqueId& _unreported_error_idx(0), _query_id(query_id), _fragment_id(fragment_id), - _is_cancelled(false), _per_fragment_instance_idx(0), _num_rows_load_total(0), _num_rows_load_filtered(0), @@ -186,7 +183,6 @@ RuntimeState::RuntimeState(const TUniqueId& query_id, int32_t fragment_id, _unreported_error_idx(0), _query_id(query_id), _fragment_id(fragment_id), - _is_cancelled(false), _per_fragment_instance_idx(0), _num_rows_load_total(0), _num_rows_load_filtered(0), @@ -219,7 +215,6 @@ RuntimeState::RuntimeState(const TQueryGlobals& query_globals) _obj_pool(new ObjectPool()), _data_stream_recvrs_pool(new ObjectPool()), _unreported_error_idx(0), - _is_cancelled(false), _per_fragment_instance_idx(0) { _query_options.batch_size = DEFAULT_BATCH_SIZE; if (query_globals.__isset.time_zone && query_globals.__isset.nano_seconds) { @@ -254,7 +249,6 @@ RuntimeState::RuntimeState() _obj_pool(new ObjectPool()), _data_stream_recvrs_pool(new ObjectPool()), _unreported_error_idx(0), - _is_cancelled(false), _per_fragment_instance_idx(0) { _query_options.batch_size = DEFAULT_BATCH_SIZE; _timezone = TimezoneUtils::default_time_zone; @@ -358,20 +352,13 @@ void RuntimeState::get_unreported_errors(std::vector<std::string>* new_errors) { } } -Status RuntimeState::query_status() { - auto st = _query_ctx->exec_status(); - RETURN_IF_ERROR(st); - std::lock_guard<std::mutex> l(_process_status_lock); - return _process_status; -} - bool RuntimeState::is_cancelled() const { // Maybe we should just return _is_cancelled.load() - return _is_cancelled.load() || (_query_ctx && _query_ctx->is_cancelled()); + return !_exec_status.ok() || (_query_ctx && _query_ctx->is_cancelled()); } -std::string RuntimeState::cancel_reason() const { - return _cancel_reason; +Status RuntimeState::cancel_reason() const { + return _exec_status.status(); } const int64_t MAX_ERROR_NUM = 50; diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index f2e2c887571..d5ebac0f3fc 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -190,7 +190,6 @@ public: return _query_options.__isset.mysql_row_binary_format && _query_options.mysql_row_binary_format; } - Status query_status(); // Appends error to the _error_log if there is space bool log_error(const std::string& error); @@ -206,21 +205,19 @@ public: void get_unreported_errors(std::vector<std::string>* new_errors); [[nodiscard]] bool is_cancelled() const; - std::string cancel_reason() const; - int codegen_level() const { return _query_options.codegen_level; } - void set_is_cancelled(std::string msg) { - if (!_is_cancelled.exchange(true)) { - _cancel_reason = msg; + Status cancel_reason() const; + void cancel(const Status& reason) { + if (_exec_status.update(reason)) { // Create a error status, so that we could print error stack, and // we could know which path call cancel. LOG(WARNING) << "Task is cancelled, instance: " << PrintInstanceStandardInfo(_query_id, _fragment_instance_id) - << ", st = " << Status::Error<ErrorCode::CANCELLED>(msg); + << ", st = " << reason; } else { LOG(WARNING) << "Task is already cancelled, instance: " << PrintInstanceStandardInfo(_query_id, _fragment_instance_id) - << ", original cancel msg: " << _cancel_reason - << ", new cancel msg: " << Status::Error<ErrorCode::CANCELLED>(msg); + << ", original cancel msg: " << _exec_status.status() + << ", new cancel msg: " << reason; } } @@ -230,18 +227,6 @@ public: void set_be_number(int be_number) { _be_number = be_number; } int be_number(void) const { return _be_number; } - // Sets _process_status with err_msg if no error has been set yet. - void set_process_status(const Status& status) { - if (status.ok()) { - return; - } - std::lock_guard<std::mutex> l(_process_status_lock); - if (!_process_status.ok()) { - return; - } - _process_status = status; - } - std::vector<std::string>& output_files() { return _output_files; } void set_import_label(const std::string& import_label) { _import_label = import_label; } @@ -693,9 +678,7 @@ private: TQueryOptions _query_options; ExecEnv* _exec_env = nullptr; - // if true, execution should stop with a CANCELLED status - std::atomic<bool> _is_cancelled; - std::string _cancel_reason; + AtomicStatus _exec_status; int _per_fragment_instance_idx; int _num_per_fragment_instances = 0; @@ -709,12 +692,6 @@ private: // used as send id int _be_number; - // Non-OK if an error has occurred and query execution should abort. Used only for - // asynchronously reporting such errors (e.g., when a UDF reports an error), so this - // will not necessarily be set in all error cases. - std::mutex _process_status_lock; - Status _process_status; - // put here to collect files?? std::vector<std::string> _output_files; std::atomic<int64_t> _num_rows_load_total; // total rows read from source diff --git a/be/src/udf/udf.cpp b/be/src/udf/udf.cpp index 6dd1b5112d0..3e04208e59a 100644 --- a/be/src/udf/udf.cpp +++ b/be/src/udf/udf.cpp @@ -84,7 +84,7 @@ void FunctionContext::set_error(const char* error_msg) { ss << "UDF ERROR: " << error_msg; if (_state != nullptr) { - _state->set_process_status(Status::InternalError(ss.str())); + _state->cancel(Status::InternalError(ss.str())); } } } diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index e4065a66618..621a9ad1131 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -905,7 +905,7 @@ Status VNodeChannel::close_wait(RuntimeState* state) { _close_time_ms = UnixMillis() - _close_time_ms; if (_cancelled || state->is_cancelled()) { - cancel(state->cancel_reason()); + cancel(state->cancel_reason().to_string()); } if (_add_batches_finished) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org