This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 864a0f9bcb [opt](pipeline) Make pipeline fragment context send_report asynchronized (#23142) 864a0f9bcb is described below commit 864a0f9bcb17495f1a9134bb92e64bc538ac3383 Author: Lijia Liu <liutang...@yeah.net> AuthorDate: Thu Sep 28 17:55:53 2023 +0800 [opt](pipeline) Make pipeline fragment context send_report asynchronized (#23142) --- be/src/common/config.cpp | 2 + be/src/common/config.h | 1 + be/src/exec/exec_node.cpp | 3 +- be/src/pipeline/exec/exchange_sink_buffer.cpp | 3 + be/src/pipeline/exec/exchange_sink_buffer.h | 1 + be/src/pipeline/exec/exchange_sink_operator.cpp | 14 +- be/src/pipeline/exec/result_file_sink_operator.cpp | 7 +- be/src/pipeline/exec/result_sink_operator.cpp | 2 +- be/src/pipeline/pipeline_fragment_context.cpp | 144 ++++++++++++--------- be/src/pipeline/pipeline_fragment_context.h | 34 ++--- .../pipeline_x/pipeline_x_fragment_context.cpp | 88 +++---------- .../pipeline_x/pipeline_x_fragment_context.h | 12 +- be/src/pipeline/task_scheduler.cpp | 15 ++- be/src/runtime/fragment_mgr.cpp | 30 ++++- be/src/runtime/fragment_mgr.h | 3 + be/src/runtime/plan_fragment_executor.cpp | 5 +- be/src/runtime/query_context.h | 8 +- be/src/runtime/runtime_state.cpp | 7 + be/src/runtime/runtime_state.h | 5 +- be/src/service/internal_service.cpp | 7 +- be/src/vec/runtime/vdata_stream_mgr.cpp | 10 +- be/src/vec/runtime/vdata_stream_mgr.h | 2 +- be/src/vec/runtime/vdata_stream_recvr.cpp | 24 +++- be/src/vec/runtime/vdata_stream_recvr.h | 7 +- be/src/vec/sink/vdata_stream_sender.cpp | 51 ++++---- be/src/vec/sink/vdata_stream_sender.h | 33 ++--- gensrc/proto/internal_service.proto | 1 + 27 files changed, 283 insertions(+), 236 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 643f6eb23c..e8632d47d9 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -225,6 +225,8 @@ DEFINE_mBool(compress_rowbatches, "true"); DEFINE_mBool(rowbatch_align_tuple_offset, "false"); // interval between profile reports; in seconds DEFINE_mInt32(status_report_interval, "5"); +// The pipeline task has a high concurrency, therefore reducing its report frequency +DEFINE_mInt32(pipeline_status_report_interval, "10"); // if true, each disk will have a separate thread pool for scanner DEFINE_Bool(doris_enable_scanner_thread_pool_per_disk, "true"); // the timeout of a work thread to wait the blocking priority queue to get a task diff --git a/be/src/common/config.h b/be/src/common/config.h index 349a8a93a9..1189169187 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -271,6 +271,7 @@ DECLARE_mBool(compress_rowbatches); DECLARE_mBool(rowbatch_align_tuple_offset); // interval between profile reports; in seconds DECLARE_mInt32(status_report_interval); +DECLARE_mInt32(pipeline_status_report_interval); // if true, each disk will have a separate thread pool for scanner DECLARE_Bool(doris_enable_scanner_thread_pool_per_disk); // the timeout of a work thread to wait the blocking priority queue to get a task diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index c21ae2e5f5..f3870c6346 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -198,7 +198,8 @@ Status ExecNode::close(RuntimeState* state) { << " already closed"; return Status::OK(); } - LOG(INFO) << "fragment_instance_id=" << print_id(state->fragment_instance_id()) << " closed"; + LOG(INFO) << "fragment_instance_id=" << print_id(state->fragment_instance_id()) << ", " + << " id=" << _id << " type=" << print_plan_node_type(_type) << " closed"; _is_closed = true; Status result; diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index f49fea2c03..85e37ee4a9 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -225,6 +225,9 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) { if (request.block) { brpc_request->set_allocated_block(request.block.get()); } + if (!request.exec_status.ok()) { + request.exec_status.to_protobuf(brpc_request->mutable_exec_status()); + } auto* closure = request.channel->get_closure(id, request.eos, nullptr); _instance_to_rpc_ctx[id]._closure = closure; diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 6086b36c22..d5e530af5c 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -94,6 +94,7 @@ struct TransmitInfo { vectorized::PipChannel<Parent>* channel; std::unique_ptr<PBlock> block; bool eos; + Status exec_status; }; template <typename Parent> diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 1b00293258..5d339a7318 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -278,7 +278,8 @@ template <typename ChannelPtrType> void ExchangeSinkOperatorX::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st) { channel->set_receiver_eof(st); - channel->close(state); + // Chanel will not send RPC to the downstream when eof, so close chanel by OK status. + channel->close(state, Status::OK()); } Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, @@ -337,8 +338,8 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block status = channel->send_local_block(&cur_block); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - status = channel->send_block(block_holder, &sent, - source_state == SourceState::FINISHED); + status = channel->send_broadcast_block( + block_holder, &sent, source_state == SourceState::FINISHED); } HANDLE_CHANNEL_STATUS(state, channel, status); } @@ -365,8 +366,8 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); RETURN_IF_ERROR(local_state._serializer.serialize_block( block, current_channel->ch_cur_pb_block())); - auto status = current_channel->send_block(current_channel->ch_cur_pb_block(), - source_state == SourceState::FINISHED); + auto status = current_channel->send_remote_block( + current_channel->ch_cur_pb_block(), source_state == SourceState::FINISHED); HANDLE_CHANNEL_STATUS(state, current_channel, status); current_channel->ch_roll_pb_block(); } @@ -520,8 +521,9 @@ Status ExchangeSinkOperatorX::try_close(RuntimeState* state, Status exec_status) CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state); local_state._serializer.reset_block(); Status final_st = Status::OK(); + Status final_status = exec_status; for (int i = 0; i < local_state.channels.size(); ++i) { - Status st = local_state.channels[i]->close(state); + Status st = local_state.channels[i]->close(state, exec_status); if (!st.ok() && final_st.ok()) { final_st = st; } diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index 26f8fed731..2e375d6908 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -234,8 +234,8 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status) status = channel->send_local_block(&cur_block); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - status = - channel->send_block(_block_holder.get(), nullptr, true); + status = channel->send_broadcast_block(_block_holder.get(), + nullptr, true); } HANDLE_CHANNEL_STATUS(state, channel, status); } @@ -256,7 +256,8 @@ template <typename ChannelPtrType> void ResultFileSinkLocalState::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st) { channel->set_receiver_eof(st); - channel->close(state); + // Chanel will not send RPC to the downstream when eof, so close chanel by OK status. + channel->close(state, Status::OK()); } Status ResultFileSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 424d75b1d9..bba54559fe 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -181,7 +181,7 @@ Status ResultSinkLocalState::close(RuntimeState* state, Status exec_status) { COUNTER_UPDATE(profile()->total_time_counter(), _cancel_dependency->write_watcher_elapse_time()); SCOPED_TIMER(profile()->total_time_counter()); - Status final_status = Status::OK(); + Status final_status = exec_status; if (_writer) { // close the writer Status st = _writer->close(); diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 65cd718949..7d4b091cda 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -28,6 +28,9 @@ #include <pthread.h> #include <stdlib.h> // IWYU pragma: no_include <bits/chrono.h> +#include <fmt/format.h> +#include <fmt/ranges.h> + #include <chrono> // IWYU pragma: keep #include <map> #include <ostream> @@ -125,14 +128,12 @@ PipelineFragmentContext::PipelineFragmentContext( _exec_env(exec_env), _query_ctx(std::move(query_ctx)), _call_back(call_back), - _report_thread_active(false), - _report_status_cb(report_status_cb), _is_report_on_cancel(true), + _report_status_cb(report_status_cb), _group_commit(group_commit) { if (_query_ctx->get_task_group()) { _task_group_entity = _query_ctx->get_task_group()->task_entity(); } - _report_thread_future = _report_thread_promise.get_future(); _fragment_watcher.start(); } @@ -146,16 +147,29 @@ PipelineFragmentContext::~PipelineFragmentContext() { } else { _call_back(_runtime_state.get(), &st); } - DCHECK(!_report_thread_active); } void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { + LOG_INFO("PipelineFragmentContext::cancel") + .tag("query_id", print_id(_query_ctx->query_id())) + .tag("fragment_id", _fragment_id) + .tag("instance_id", print_id(_runtime_state->fragment_instance_id())) + .tag("reason", PPlanFragmentCancelReason_Name(reason)) + .tag("message", msg); + if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) { - LOG(WARNING) << "PipelineFragmentContext " - << PrintInstanceStandardInfo(_query_id, _fragment_id, _fragment_instance_id) - << " is canceled, cancel message: " << msg; + if (reason != PPlanFragmentCancelReason::LIMIT_REACH) { + LOG(WARNING) << "PipelineFragmentContext " + << PrintInstanceStandardInfo(_query_id, _fragment_id, + _fragment_instance_id) + << " is canceled, cancel message: " << msg; + + } else { + _set_is_report_on_cancel(false); // TODO bug llj fix this not projected by lock + } + _runtime_state->set_process_status(_query_ctx->exec_status()); // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe // For stream load the fragment's query_id == load id, it is set in FE. auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id); @@ -164,7 +178,8 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, } // must close stream_mgr to avoid dead lock in Exchange Node - _exec_env->vstream_mgr()->cancel(_fragment_instance_id); + // TODO bug llj fix this other instance will not cancel + _exec_env->vstream_mgr()->cancel(_fragment_instance_id, Status::Cancelled(msg)); // Cancel the result queue manager used by spark doris connector // TODO pipeline incomp // _exec_env->result_queue_mgr()->update_queue_status(id, Status::Aborted(msg)); @@ -199,6 +214,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re LOG_INFO("PipelineFragmentContext::prepare") .tag("query_id", print_id(_query_id)) + .tag("fragment_id", _fragment_id) .tag("instance_id", print_id(local_params.fragment_instance_id)) .tag("backend_num", local_params.backend_num) .tag("pthread_id", (uintptr_t)pthread_self()); @@ -311,6 +327,8 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re _runtime_state->runtime_profile()->add_child(_root_plan->runtime_profile(), true, nullptr); _runtime_state->runtime_profile()->add_child(_runtime_profile.get(), true, nullptr); + _init_next_report_time(); + _prepared = true; return Status::OK(); } @@ -344,54 +362,56 @@ Status PipelineFragmentContext::_build_pipeline_tasks( return Status::OK(); } -void PipelineFragmentContext::_stop_report_thread() { - if (!_report_thread_active) { - return; +void PipelineFragmentContext::_init_next_report_time() { + auto interval_s = config::pipeline_status_report_interval; + if (_is_report_success && interval_s > 0 && _query_ctx->timeout_second > interval_s) { + std::vector<string> ins_ids; + instance_ids(ins_ids); + VLOG_FILE << "enable period report: instance_id=" + << fmt::format("{}", fmt::join(ins_ids, ", ")); + uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC; + // We don't want to wait longer than it takes to run the entire fragment. + _previous_report_time = + MonotonicNanos() + report_fragment_offset - (uint64_t)(interval_s)*NANOS_PER_SEC; + _disable_period_report = false; } +} - _report_thread_active = false; - - _stop_report_thread_cv.notify_one(); - // Wait infinitly to ensure that the report task is finished and the this variable - // is not used in report thread. - _report_thread_future.wait(); +void PipelineFragmentContext::refresh_next_report_time() { + auto disable = _disable_period_report.load(std::memory_order_acquire); + DCHECK(disable == true); + _previous_report_time.store(MonotonicNanos(), std::memory_order_release); + _disable_period_report.compare_exchange_strong(disable, false); } -void PipelineFragmentContext::report_profile() { - SCOPED_ATTACH_TASK(_runtime_state.get()); - VLOG_FILE << "report_profile(): instance_id=" << _runtime_state->fragment_instance_id(); - - _report_thread_active = true; - - std::unique_lock<std::mutex> l(_report_thread_lock); - // tell Open() that we started - _report_thread_started_cv.notify_one(); - - // Jitter the reporting time of remote fragments by a random amount between - // 0 and the report_interval. This way, the coordinator doesn't get all the - // updates at once so its better for contention as well as smoother progress - // reporting. - int report_fragment_offset = rand() % config::status_report_interval; - // We don't want to wait longer than it takes to run the entire fragment. - _stop_report_thread_cv.wait_for(l, std::chrono::seconds(report_fragment_offset)); - while (_report_thread_active) { - if (config::status_report_interval > 0) { - // wait_for can return because the timeout occurred or the condition variable - // was signaled. We can't rely on its return value to distinguish between the - // two cases (e.g. there is a race here where the wait timed out but before grabbing - // the lock, the condition variable was signaled). Instead, we will use an external - // flag, _report_thread_active, to coordinate this. - _stop_report_thread_cv.wait_for(l, - std::chrono::seconds(config::status_report_interval)); - } else { - LOG(WARNING) << "config::status_report_interval is equal to or less than zero, exiting " - "reporting thread."; - break; +void PipelineFragmentContext::trigger_report_if_necessary() { + if (!_is_report_success) { + return; + } + auto disable = _disable_period_report.load(std::memory_order_acquire); + if (disable) { + return; + } + int32_t interval_s = config::pipeline_status_report_interval; + if (interval_s <= 0) { + LOG(WARNING) + << "config::status_report_interval is equal to or less than zero, do not trigger " + "report."; + } + uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) + + (uint64_t)(interval_s)*NANOS_PER_SEC; + if (MonotonicNanos() > next_report_time) { + if (!_disable_period_report.compare_exchange_strong(disable, true, + std::memory_order_acq_rel)) { + return; } - if (VLOG_FILE_IS_ON) { - VLOG_FILE << "Reporting " << (!_report_thread_active ? "final " : " ") - << "profile for instance " << _runtime_state->fragment_instance_id(); + std::vector<string> ins_ids; + instance_ids(ins_ids); + VLOG_FILE << "Reporting " + << "profile for query_id " << print_id(_query_id) + << ", instance ids: " << fmt::format("{}", fmt::join(ins_ids, ", ")); + std::stringstream ss; _runtime_state->runtime_profile()->compute_time_in_profile(); _runtime_state->runtime_profile()->pretty_print(&ss); @@ -401,15 +421,13 @@ void PipelineFragmentContext::report_profile() { } VLOG_FILE << ss.str(); } - - if (!_report_thread_active) { - break; + auto st = send_report(false); + if (!st.ok()) { + disable = true; + _disable_period_report.compare_exchange_strong(disable, false, + std::memory_order_acq_rel); } - - send_report(false); } - - VLOG_FILE << "exiting reporting thread: instance_id=" << _runtime_state->fragment_instance_id(); } // TODO: use virtual function to do abstruct @@ -815,7 +833,6 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr void PipelineFragmentContext::_close_action() { _runtime_profile->total_time_counter()->update(_fragment_watcher.elapsed_time()); send_report(true); - _stop_report_thread(); // all submitted tasks done _exec_env->fragment_mgr()->remove_pipeline_context(shared_from_this()); } @@ -828,7 +845,7 @@ void PipelineFragmentContext::close_a_pipeline() { } } -void PipelineFragmentContext::send_report(bool done) { +Status PipelineFragmentContext::send_report(bool done) { Status exec_status = Status::OK(); { std::lock_guard<std::mutex> l(_status_lock); @@ -838,7 +855,7 @@ void PipelineFragmentContext::send_report(bool done) { // If plan is done successfully, but _is_report_success is false, // no need to send report. if (!_is_report_success && done && exec_status.ok()) { - return; + return Status::NeedSendAgain(""); } // If both _is_report_success and _is_report_on_cancel are false, @@ -846,10 +863,10 @@ void PipelineFragmentContext::send_report(bool done) { // This may happen when the query limit reached and // a internal cancellation being processed if (!_is_report_success && !_is_report_on_cancel) { - return; + return Status::NeedSendAgain(""); } - _report_status_cb( + return _report_status_cb( {false, exec_status, {}, @@ -864,7 +881,8 @@ void PipelineFragmentContext::send_report(bool done) { _runtime_state.get(), std::bind(&PipelineFragmentContext::update_status, this, std::placeholders::_1), std::bind(&PipelineFragmentContext::cancel, this, std::placeholders::_1, - std::placeholders::_2)}); + std::placeholders::_2)}, + shared_from_this()); } } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 8e80a73143..80c38880bf 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -58,7 +58,8 @@ public: // Note: this does not take a const RuntimeProfile&, because it might need to call // functions like PrettyPrint() or to_thrift(), neither of which is const // because they take locks. - using report_status_callback = std::function<void(const ReportStatusRequest)>; + using report_status_callback = std::function<Status( + const ReportStatusRequest, std::shared_ptr<pipeline::PipelineFragmentContext>&&)>; PipelineFragmentContext(const TUniqueId& query_id, const TUniqueId& instance_id, const int fragment_id, int backend_num, std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env, @@ -118,9 +119,7 @@ public: virtual void add_merge_controller_handler( std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {} - virtual void send_report(bool); - - virtual void report_profile(); + virtual Status send_report(bool); Status update_status(Status status) { std::lock_guard<std::mutex> l(_status_lock); @@ -133,12 +132,18 @@ public: [[nodiscard]] taskgroup::TaskGroupPipelineTaskEntity* get_task_group_entity() const { return _task_group_entity; } + void trigger_report_if_necessary(); bool is_group_commit() { return _group_commit; } virtual void instance_ids(std::vector<TUniqueId>& ins_ids) const { ins_ids.resize(1); ins_ids[0] = _fragment_instance_id; } + virtual void instance_ids(std::vector<string>& ins_ids) const { + ins_ids.resize(1); + ins_ids[0] = print_id(_fragment_instance_id); + } + void refresh_next_report_time(); protected: Status _create_sink(int sender_id, const TDataSink& t_data_sink, RuntimeState* state); @@ -147,7 +152,7 @@ protected: template <bool is_intersect> Status _build_operators_for_set_operation_node(ExecNode*, PipelinePtr); virtual void _close_action(); - void _stop_report_thread(); + void _init_next_report_time(); void _set_is_report_on_cancel(bool val) { _is_report_on_cancel = val; } // Id of this query @@ -200,22 +205,17 @@ protected: std::function<void(RuntimeState*, Status*)> _call_back; std::once_flag _close_once_flag; - std::condition_variable _report_thread_started_cv; - // true if we started the thread - bool _report_thread_active; - // profile reporting-related - report_status_callback _report_status_cb; - std::promise<bool> _report_thread_promise; - std::future<bool> _report_thread_future; - std::mutex _report_thread_lock; - - // Indicates that profile reporting thread should stop. - // Tied to _report_thread_lock. - std::condition_variable _stop_report_thread_cv; // If this is set to false, and '_is_report_success' is false as well, // This executor will not report status to FE on being cancelled. bool _is_report_on_cancel; + // 0 indicates reporting is in progress or not required + std::atomic_bool _disable_period_report = true; + std::atomic_uint64_t _previous_report_time = 0; + + // profile reporting-related + report_status_callback _report_status_cb; + private: std::vector<std::unique_ptr<PipelineTask>> _tasks; bool _group_commit; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 64ea086e9a..9369b26ba7 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -119,13 +119,22 @@ PipelineXFragmentContext::~PipelineXFragmentContext() { _call_back(nullptr, &st); } _runtime_state.reset(); - DCHECK(!_report_thread_active); } void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { + LOG_INFO("PipelineXFragmentContext::cancel") + .tag("query_id", print_id(_query_id)) + .tag("fragment_id", _fragment_id) + .tag("reason", reason) + .tag("error message", msg); if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) { - LOG(WARNING) << "PipelineFragmentContext Canceled. reason=" << msg; + if (reason != PPlanFragmentCancelReason::LIMIT_REACH) { + FOR_EACH_RUNTIME_STATE(LOG(WARNING) << "PipelineXFragmentContext cancel instance: " + << print_id(runtime_state->fragment_instance_id());) + } else { + _set_is_report_on_cancel(false); // TODO bug llj + } // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe // For stream load the fragment's query_id == load id, it is set in FE. auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id); @@ -156,7 +165,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r } LOG_INFO("PipelineXFragmentContext::prepare") - .tag("query_id", _query_id) + .tag("query_id", print_id(_query_id)) .tag("fragment_id", _fragment_id) .tag("pthread_id", (uintptr_t)pthread_self()); @@ -215,6 +224,8 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r // 5. Build pipeline tasks and initialize local state. RETURN_IF_ERROR(_build_pipeline_tasks(request)); + _init_next_report_time(); + _prepared = true; return Status::OK(); } @@ -428,64 +439,6 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( return Status::OK(); } -void PipelineXFragmentContext::report_profile() { - FOR_EACH_RUNTIME_STATE( - SCOPED_ATTACH_TASK(runtime_state.get()); - VLOG_FILE << "report_profile(): instance_id=" << runtime_state->fragment_instance_id(); - - _report_thread_active = true; - - std::unique_lock<std::mutex> l(_report_thread_lock); - // tell Open() that we started - _report_thread_started_cv.notify_one(); - - // Jitter the reporting time of remote fragments by a random amount between - // 0 and the report_interval. This way, the coordinator doesn't get all the - // updates at once so its better for contention as well as smoother progress - // reporting. - int report_fragment_offset = rand() % config::status_report_interval; - // We don't want to wait longer than it takes to run the entire fragment. - _stop_report_thread_cv.wait_for(l, std::chrono::seconds(report_fragment_offset)); - while (_report_thread_active) { - if (config::status_report_interval > 0) { - // wait_for can return because the timeout occurred or the condition variable - // was signaled. We can't rely on its return value to distinguish between the - // two cases (e.g. there is a race here where the wait timed out but before grabbing - // the lock, the condition variable was signaled). Instead, we will use an external - // flag, _report_thread_active, to coordinate this. - _stop_report_thread_cv.wait_for( - l, std::chrono::seconds(config::status_report_interval)); - } else { - LOG(WARNING) << "config::status_report_interval is equal to or less than zero, " - "exiting " - "reporting thread."; - break; - } - - if (VLOG_FILE_IS_ON) { - VLOG_FILE << "Reporting " << (!_report_thread_active ? "final " : " ") - << "profile for instance " << runtime_state->fragment_instance_id(); - std::stringstream ss; - runtime_state->runtime_profile()->compute_time_in_profile(); - runtime_state->runtime_profile()->pretty_print(&ss); - if (runtime_state->load_channel_profile()) { - // runtime_state->load_channel_profile()->compute_time_in_profile(); // TODO load channel profile add timer - runtime_state->load_channel_profile()->pretty_print(&ss); - } - VLOG_FILE << ss.str(); - } - - if (!_report_thread_active) { - break; - } - - send_report(false); - } - - VLOG_FILE - << "exiting reporting thread: instance_id=" << runtime_state->fragment_instance_id();) -} - Status PipelineXFragmentContext::_build_pipelines(ObjectPool* pool, const doris::TPipelineFragmentParams& request, const DescriptorTbl& descs, OperatorXPtr* root, @@ -864,12 +817,11 @@ void PipelineXFragmentContext::close_if_prepare_failed() { void PipelineXFragmentContext::_close_action() { _runtime_profile->total_time_counter()->update(_fragment_watcher.elapsed_time()); send_report(true); - _stop_report_thread(); // all submitted tasks done _exec_env->fragment_mgr()->remove_pipeline_context(shared_from_this()); } -void PipelineXFragmentContext::send_report(bool done) { +Status PipelineXFragmentContext::send_report(bool done) { Status exec_status = Status::OK(); { std::lock_guard<std::mutex> l(_status_lock); @@ -879,7 +831,7 @@ void PipelineXFragmentContext::send_report(bool done) { // If plan is done successfully, but _is_report_success is false, // no need to send report. if (!_is_report_success && done && exec_status.ok()) { - return; + return Status::NeedSendAgain(""); } // If both _is_report_success and _is_report_on_cancel are false, @@ -887,7 +839,7 @@ void PipelineXFragmentContext::send_report(bool done) { // This may happen when the query limit reached and // a internal cancellation being processed if (!_is_report_success && !_is_report_on_cancel) { - return; + return Status::NeedSendAgain(""); } std::vector<RuntimeState*> runtime_states(_runtime_states.size()); @@ -895,13 +847,13 @@ void PipelineXFragmentContext::send_report(bool done) { runtime_states[i] = _runtime_states[i].get(); } - _report_status_cb( + return _report_status_cb( {true, exec_status, runtime_states, nullptr, nullptr, done || !exec_status.ok(), _query_ctx->coord_addr, _query_id, _fragment_id, TUniqueId(), _backend_num, _runtime_state.get(), std::bind(&PipelineFragmentContext::update_status, this, std::placeholders::_1), std::bind(&PipelineFragmentContext::cancel, this, std::placeholders::_1, - std::placeholders::_2)}); + std::placeholders::_2)}, + shared_from_this()); } - } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 7c54a411c6..c548b8cfa3 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -60,7 +60,6 @@ public: // Note: this does not take a const RuntimeProfile&, because it might need to call // functions like PrettyPrint() or to_thrift(), neither of which is const // because they take locks. - using report_status_callback = std::function<void(const ReportStatusRequest)>; PipelineXFragmentContext(const TUniqueId& query_id, const int fragment_id, std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env, const std::function<void(RuntimeState*, Status*)>& call_back, @@ -76,6 +75,13 @@ public: } } + void instance_ids(std::vector<string>& ins_ids) const override { + ins_ids.resize(_runtime_states.size()); + for (size_t i = 0; i < _runtime_states.size(); i++) { + ins_ids[i] = print_id(_runtime_states[i]->fragment_instance_id()); + } + } + void add_merge_controller_handler( std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) override { _merge_controller_handlers.emplace_back(handler); @@ -94,9 +100,7 @@ public: void cancel(const PPlanFragmentCancelReason& reason = PPlanFragmentCancelReason::INTERNAL_ERROR, const std::string& msg = "") override; - void send_report(bool) override; - - void report_profile() override; + Status send_report(bool) override; RuntimeState* get_runtime_state(UniqueId fragment_instance_id) override { std::lock_guard<std::mutex> l(_state_map_lock); diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index e71d8da0cd..30079126c2 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -231,8 +231,10 @@ void TaskScheduler::_do_work(size_t index) { auto check_state = task->get_state(); if (check_state == PipelineTaskState::PENDING_FINISH) { DCHECK(!task->is_pending_finish()) << "must not pending close " << task->debug_string(); + Status exec_status = fragment_ctx->get_query_context()->exec_status(); _try_close_task(task, - canceled ? PipelineTaskState::CANCELED : PipelineTaskState::FINISHED); + canceled ? PipelineTaskState::CANCELED : PipelineTaskState::FINISHED, + exec_status); continue; } DCHECK(check_state != PipelineTaskState::FINISHED && @@ -243,10 +245,11 @@ void TaskScheduler::_do_work(size_t index) { // may change from pending FINISH,should called cancel // also may change form BLOCK, other task called cancel - // If pipeline is canceled caused by memory limit, we should send report to FE in order - // to cancel all pipeline tasks in this query - fragment_ctx->send_report(true); - _try_close_task(task, PipelineTaskState::CANCELED); + // If pipeline is canceled, it will report after pipeline closed, and will propagate + // errors to downstream through exchange. So, here we needn't send_report. + // fragment_ctx->send_report(true); + Status cancel_status = fragment_ctx->get_query_context()->exec_status(); + _try_close_task(task, PipelineTaskState::CANCELED, cancel_status); continue; } @@ -276,10 +279,10 @@ void TaskScheduler::_do_work(size_t index) { // exec failed,cancel all fragment instance fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, status.to_string()); - fragment_ctx->send_report(true); _try_close_task(task, PipelineTaskState::CANCELED, status); continue; } + fragment_ctx->trigger_report_if_necessary(); if (eos) { task->set_eos_time(); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 79e67503e1..9a916daf4e 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -137,6 +137,13 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env) REGISTER_HOOK_METRIC(fragment_thread_pool_queue_size, [this]() { return _thread_pool->get_queue_size(); }); CHECK(s.ok()) << s.to_string(); + + s = ThreadPoolBuilder("FragmentInstanceReportThreadPool") + .set_min_threads(48) + .set_max_threads(512) + .set_max_queue_size(102400) + .build(&_async_report_thread_pool); + CHECK(s.ok()) << s.to_string(); } FragmentMgr::~FragmentMgr() = default; @@ -162,6 +169,7 @@ void FragmentMgr::stop() { } _pipeline_map.clear(); } + _async_report_thread_pool->shutdown(); } std::string FragmentMgr::to_http_path(const std::string& file_name) { @@ -172,6 +180,16 @@ std::string FragmentMgr::to_http_path(const std::string& file_name) { return url.str(); } +Status FragmentMgr::trigger_pipeline_context_report( + const ReportStatusRequest req, std::shared_ptr<pipeline::PipelineFragmentContext>&& ctx) { + return _async_report_thread_pool->submit_func([this, req, ctx]() { + coordinator_callback(req); + if (!req.done) { + ctx->refresh_next_report_time(); + } + }); +} + // There can only be one of these callbacks in-flight at any moment, because // it is only invoked from the executor's reporting thread. // Also, the reported status will always reflect the most recent execution status, @@ -539,9 +557,11 @@ void FragmentMgr::remove_pipeline_context( f_context->instance_ids(ins_ids); bool all_done = q_context->countdown(ins_ids.size()); for (const auto& ins_id : ins_ids) { + VLOG_DEBUG << "remove pipeline context " << print_id(ins_id) << ", all_done:" << all_done; _pipeline_map.erase(ins_id); } if (all_done) { + LOG(INFO) << "remove query context " << print_id(query_id); _query_ctx_map.erase(query_id); } } @@ -773,8 +793,9 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, std::shared_ptr<pipeline::PipelineFragmentContext> context = std::make_shared<pipeline::PipelineXFragmentContext>( query_ctx->query_id(), params.fragment_id, query_ctx, _exec_env, cb, - std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this, - std::placeholders::_1), + std::bind<Status>( + std::mem_fn(&FragmentMgr::trigger_pipeline_context_report), this, + std::placeholders::_1, std::placeholders::_2), params.group_commit); { SCOPED_RAW_TIMER(&duration_ns); @@ -851,8 +872,9 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, std::make_shared<pipeline::PipelineFragmentContext>( query_ctx->query_id(), fragment_instance_id, params.fragment_id, local_params.backend_num, query_ctx, _exec_env, cb, - std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this, - std::placeholders::_1)); + std::bind<Status>( + std::mem_fn(&FragmentMgr::trigger_pipeline_context_report), + this, std::placeholders::_1, std::placeholders::_2)); { SCOPED_RAW_TIMER(&duration_ns); auto prepare_st = context->prepare(params, i); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 14c63c559b..395b9b546c 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -99,6 +99,8 @@ public: const PPlanFragmentCancelReason& reason, const std::unique_lock<std::mutex>& state_lock, const std::string& msg = ""); + Status trigger_pipeline_context_report(const ReportStatusRequest, + std::shared_ptr<pipeline::PipelineFragmentContext>&&); // Pipeline version, cancel a fragment instance. void cancel_instance(const TUniqueId& instance_id, const PPlanFragmentCancelReason& reason, @@ -203,6 +205,7 @@ private: UIntGauge* timeout_canceled_fragment_count = nullptr; RuntimeFilterMergeController _runtimefilter_controller; + std::unique_ptr<ThreadPool> _async_report_thread_pool; // used for pipeliine context report }; } // namespace doris diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 59a89b44af..4cec558fc5 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -594,7 +594,8 @@ void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, const .tag("reason", reason) .tag("error message", msg); if (_runtime_state->is_cancelled()) { - LOG(INFO) << "instance is already cancelled, skip cancel again"; + LOG(INFO) << "instance << " << print_id(_runtime_state->fragment_instance_id()) + << "is already cancelled, skip cancel again"; return; } DCHECK(_prepared); @@ -608,7 +609,7 @@ void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, const _query_ctx->set_ready_to_execute(true); // must close stream_mgr to avoid dead lock in Exchange Node - _exec_env->vstream_mgr()->cancel(_fragment_instance_id); + _exec_env->vstream_mgr()->cancel(_fragment_instance_id, Status::Cancelled(msg)); // Cancel the result queue manager used by spark doris connector _exec_env->result_queue_mgr()->update_queue_status(_fragment_instance_id, Status::Aborted(msg)); #ifndef BE_TEST diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 88a8367ff9..2791291bf4 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -42,7 +42,7 @@ namespace doris { struct ReportStatusRequest { bool is_pipeline_x; - const Status& status; + const Status status; std::vector<RuntimeState*> runtime_states; RuntimeProfile* profile; RuntimeProfile* load_channel_profile; @@ -97,7 +97,9 @@ public: // Notice. For load fragments, the fragment_num sent by FE has a small probability of 0. // this may be a bug, bug <= 1 in theory it shouldn't cause any problems at this stage. - bool countdown(int instance_num) { return fragment_num.fetch_sub(instance_num) <= 1; } + bool countdown(int instance_num) { + return fragment_num.fetch_sub(instance_num) <= instance_num; + } ExecEnv* exec_env() { return _exec_env; } @@ -137,10 +139,10 @@ public: if (_is_cancelled) { return false; } + set_exec_status(new_status); _is_cancelled.store(v); set_ready_to_execute(true); - set_exec_status(new_status); return true; } diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 1d5fa990bb..ec18e83eab 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -305,6 +305,13 @@ void RuntimeState::get_unreported_errors(std::vector<std::string>* new_errors) { } } +Status RuntimeState::query_status() { + auto st = _query_ctx->exec_status(); + RETURN_IF_ERROR(st); + std::lock_guard<std::mutex> l(_process_status_lock); + return _process_status; +} + bool RuntimeState::is_cancelled() const { return _is_cancelled.load() || (_query_ctx && _query_ctx->is_cancelled()); } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 87f0f45b6a..02c048052a 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -151,10 +151,7 @@ public: _query_options.enable_common_expr_pushdown; } - Status query_status() { - std::lock_guard<std::mutex> l(_process_status_lock); - return _process_status; - } + Status query_status(); // Appends error to the _error_log if there is space bool log_error(const std::string& error); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 5367135c13..6ef5066d3b 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -556,7 +556,7 @@ void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* TUniqueId tid; tid.__set_hi(request->finst_id().hi()); tid.__set_lo(request->finst_id().lo()); - + signal::set_signal_task_id(tid); Status st = Status::OK(); if (request->has_cancel_reason()) { LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid) @@ -1184,7 +1184,10 @@ void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cont if (!st.ok()) { LOG(WARNING) << "transmit_block failed, message=" << st << ", fragment_instance_id=" << print_id(request->finst_id()) - << ", node=" << request->node_id(); + << ", node=" << request->node_id() + << ", from sender_id: " << request->sender_id() + << ", be_number: " << request->be_number() + << ", packet_seq: " << request->packet_seq(); } } else { st = extract_st; diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp b/be/src/vec/runtime/vdata_stream_mgr.cpp index ad161828f9..70bdb1baff 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.cpp +++ b/be/src/vec/runtime/vdata_stream_mgr.cpp @@ -126,7 +126,9 @@ Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request, } if (eos) { - recvr->remove_sender(request->sender_id(), request->be_number()); + Status exec_status = + request->has_exec_status() ? Status::create(request->exec_status()) : Status::OK(); + recvr->remove_sender(request->sender_id(), request->be_number(), exec_status); } return Status::OK(); } @@ -156,7 +158,7 @@ Status VDataStreamMgr::deregister_recvr(const TUniqueId& fragment_instance_id, P // Notify concurrent add_data() requests that the stream has been terminated. // cancel_stream maybe take a long time, so we handle it out of lock. if (targert_recvr) { - targert_recvr->cancel_stream(); + targert_recvr->cancel_stream(Status::OK()); return Status::OK(); } else { std::stringstream err; @@ -167,7 +169,7 @@ Status VDataStreamMgr::deregister_recvr(const TUniqueId& fragment_instance_id, P } } -void VDataStreamMgr::cancel(const TUniqueId& fragment_instance_id) { +void VDataStreamMgr::cancel(const TUniqueId& fragment_instance_id, Status exec_status) { VLOG_QUERY << "cancelling all streams for fragment=" << fragment_instance_id; std::vector<std::shared_ptr<VDataStreamRecvr>> recvrs; { @@ -191,7 +193,7 @@ void VDataStreamMgr::cancel(const TUniqueId& fragment_instance_id) { // cancel_stream maybe take a long time, so we handle it out of lock. for (auto& it : recvrs) { - it->cancel_stream(); + it->cancel_stream(exec_status); } } diff --git a/be/src/vec/runtime/vdata_stream_mgr.h b/be/src/vec/runtime/vdata_stream_mgr.h index ca0e7ab4b7..d809ff96fb 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.h +++ b/be/src/vec/runtime/vdata_stream_mgr.h @@ -63,7 +63,7 @@ public: Status transmit_block(const PTransmitDataParams* request, ::google::protobuf::Closure** done); - void cancel(const TUniqueId& fragment_instance_id); + void cancel(const TUniqueId& fragment_instance_id, Status exec_status); private: std::mutex _lock; diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index dd3e1cee29..d5abb50e0d 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -46,7 +46,9 @@ VDataStreamRecvr::SenderQueue::SenderQueue(VDataStreamRecvr* parent_recvr, int n : _recvr(parent_recvr), _is_cancelled(false), _num_remaining_senders(num_senders), - _received_first_batch(false) {} + _received_first_batch(false) { + _cancel_status = Status::OK(); +} VDataStreamRecvr::SenderQueue::~SenderQueue() { // Check pending closures, if it is not empty, should clear it here. but it should not happen. @@ -81,6 +83,7 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) { Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block, bool* eos) { if (_is_cancelled) { + RETURN_IF_ERROR(_cancel_status); return Status::Cancelled("Cancelled"); } @@ -270,17 +273,19 @@ void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) { } } -void VDataStreamRecvr::SenderQueue::cancel() { +void VDataStreamRecvr::SenderQueue::cancel(Status cancel_status) { { std::lock_guard<std::mutex> l(_lock); if (_is_cancelled) { return; } _is_cancelled = true; + _cancel_status = cancel_status; if (_dependency) { _dependency->set_always_done(); } - VLOG_QUERY << "cancelled stream: _fragment_instance_id=" << _recvr->fragment_instance_id() + VLOG_QUERY << "cancelled stream: _fragment_instance_id=" + << print_id(_recvr->fragment_instance_id()) << " node_id=" << _recvr->dest_node_id(); } // Wake up all threads waiting to produce/consume batches. They will all @@ -444,14 +449,21 @@ Status VDataStreamRecvr::get_next(Block* block, bool* eos) { } } -void VDataStreamRecvr::remove_sender(int sender_id, int be_number) { +void VDataStreamRecvr::remove_sender(int sender_id, int be_number, Status exec_status) { + if (!exec_status.ok()) { + cancel_stream(exec_status); + return; + } int use_sender_id = _is_merging ? sender_id : 0; _sender_queues[use_sender_id]->decrement_senders(be_number); } -void VDataStreamRecvr::cancel_stream() { +void VDataStreamRecvr::cancel_stream(Status exec_status) { + VLOG_QUERY << "cancel_stream: fragment_instance_id=" << print_id(_fragment_instance_id) + << exec_status; + for (int i = 0; i < _sender_queues.size(); ++i) { - _sender_queues[i]->cancel(); + _sender_queues[i]->cancel(exec_status); } } diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index d0b009b22f..d8ab873f87 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -106,9 +106,9 @@ public: // Indicate that a particular sender is done. Delegated to the appropriate // sender queue. Called from DataStreamMgr. - void remove_sender(int sender_id, int be_number); + void remove_sender(int sender_id, int be_number, Status exec_status); - void cancel_stream(); + void cancel_stream(Status exec_status); void close(); @@ -209,7 +209,7 @@ public: void decrement_senders(int sender_id); - void cancel(); + void cancel(Status cancel_status); void close(); @@ -233,6 +233,7 @@ protected: VDataStreamRecvr* _recvr; std::mutex _lock; bool _is_cancelled; + Status _cancel_status; int _num_remaining_senders; std::condition_variable _data_arrival_cv; std::condition_variable _data_removal_cv; diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 263a063590..61d95538c9 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -109,23 +109,23 @@ Status Channel<Parent>::init(RuntimeState* state) { } template <typename Parent> -Status Channel<Parent>::send_current_block(bool eos) { +Status Channel<Parent>::send_current_block(bool eos, Status exec_status) { // FIXME: Now, local exchange will cause the performance problem is in a multi-threaded scenario // so this feature is turned off here by default. We need to re-examine this logic if (is_local()) { - return send_local_block(eos); + return send_local_block(exec_status, eos); } SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker()); if (eos) { RETURN_IF_ERROR(_serializer.serialize_block(_ch_cur_pb_block, 1)); } - RETURN_IF_ERROR(send_block(_ch_cur_pb_block, eos)); + RETURN_IF_ERROR(send_remote_block(_ch_cur_pb_block, eos, exec_status)); ch_roll_pb_block(); return Status::OK(); } template <typename Parent> -Status Channel<Parent>::send_local_block(bool eos) { +Status Channel<Parent>::send_local_block(Status exec_status, bool eos) { if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) { SCOPED_TIMER(_parent->local_send_timer()); } @@ -140,7 +140,7 @@ Status Channel<Parent>::send_local_block(bool eos) { _local_recvr->add_block(&block, _parent->sender_id(), true); if (eos) { - _local_recvr->remove_sender(_parent->sender_id(), _be_number); + _local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status); } return Status::OK(); } else { @@ -168,7 +168,7 @@ Status Channel<Parent>::send_local_block(Block* block) { } template <typename Parent> -Status Channel<Parent>::send_block(PBlock* block, bool eos) { +Status Channel<Parent>::send_remote_block(PBlock* block, bool eos, Status exec_status) { if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) { SCOPED_TIMER(_parent->brpc_send_timer()); COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); @@ -182,7 +182,7 @@ Status Channel<Parent>::send_block(PBlock* block, bool eos) { SCOPED_TRACK_MEMORY_TO_UNKNOWN(); _closure->cntl.Reset(); } - VLOG_ROW << "Channel<Parent>::send_batch() instance_id=" << _fragment_instance_id + VLOG_ROW << "Channel<Parent>::send_batch() instance_id=" << print_id(_fragment_instance_id) << " dest_node=" << _dest_node_id << " to_host=" << _brpc_dest_addr.hostname << " _packet_seq=" << _packet_seq << " row_desc=" << _row_desc.debug_string(); if (_is_transfer_chain && (_send_query_statistics_with_every_batch || eos)) { @@ -191,6 +191,9 @@ Status Channel<Parent>::send_block(PBlock* block, bool eos) { } _brpc_request.set_eos(eos); + if (!exec_status.ok()) { + exec_status.to_protobuf(_brpc_request.mutable_exec_status()); + } if (block != nullptr) { _brpc_request.set_allocated_block(block); } @@ -228,7 +231,7 @@ Status Channel<Parent>::add_rows(Block* block, const std::vector<int>& rows, boo RETURN_IF_ERROR( _serializer.next_serialized_block(block, _ch_cur_pb_block, 1, &serialized, eos, &rows)); if (serialized) { - RETURN_IF_ERROR(send_current_block(false)); + RETURN_IF_ERROR(send_current_block(false, Status::OK())); } return Status::OK(); @@ -251,29 +254,29 @@ Status Channel<Parent>::close_wait(RuntimeState* state) { } template <typename Parent> -Status Channel<Parent>::close_internal() { +Status Channel<Parent>::close_internal(Status exec_status) { if (!_need_close) { return Status::OK(); } - VLOG_RPC << "Channel::close() instance_id=" << _fragment_instance_id + VLOG_RPC << "Channel::close_internal() instance_id=" << print_id(_fragment_instance_id) << " dest_node=" << _dest_node_id << " #rows= " << ((_serializer.get_block() == nullptr) ? 0 : _serializer.get_block()->rows()) - << " receiver status: " << _receiver_status; + << " receiver status: " << _receiver_status << ", exec_status: " << exec_status; if (is_receiver_eof()) { _serializer.reset_block(); return Status::OK(); } Status status; if (_serializer.get_block() != nullptr && _serializer.get_block()->rows() > 0) { - status = send_current_block(true); + status = send_current_block(true, exec_status); } else { SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker()); if (is_local()) { if (_recvr_is_valid()) { - _local_recvr->remove_sender(_parent->sender_id(), _be_number); + _local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status); } } else { - status = send_block((PBlock*)nullptr, true); + status = send_remote_block((PBlock*)nullptr, true, exec_status); } } // Don't wait for the last packet to finish, left it to close_wait. @@ -285,13 +288,13 @@ Status Channel<Parent>::close_internal() { } template <typename Parent> -Status Channel<Parent>::close(RuntimeState* state) { +Status Channel<Parent>::close(RuntimeState* state, Status exec_status) { if (_closed) { return Status::OK(); } _closed = true; - Status st = close_internal(); + Status st = close_internal(exec_status); if (!st.ok()) { state->log_error(st.to_string()); } @@ -497,7 +500,8 @@ template <typename ChannelPtrType> void VDataStreamSender::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st) { channel->set_receiver_eof(st); - channel->close(state); + // Chanel will not send RPC to the downstream when eof, so close chanel by OK status. + channel->close(state, Status::OK()); } Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { @@ -551,7 +555,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { status = channel->send_local_block(&cur_block); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - status = channel->send_block(block_holder, nullptr, eos); + status = channel->send_broadcast_block(block_holder, nullptr, eos); } HANDLE_CHANNEL_STATUS(state, channel, status); } @@ -578,7 +582,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { status = channel->send_local_block(&cur_block); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - status = channel->send_block(_cur_pb_block, false); + status = channel->send_remote_block(_cur_pb_block, false); } HANDLE_CHANNEL_STATUS(state, channel, status); } @@ -600,7 +604,8 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); RETURN_IF_ERROR( _serializer.serialize_block(block, current_channel->ch_cur_pb_block())); - auto status = current_channel->send_block(current_channel->ch_cur_pb_block(), eos); + auto status = + current_channel->send_remote_block(current_channel->ch_cur_pb_block(), eos); HANDLE_CHANNEL_STATUS(state, current_channel, status); current_channel->ch_roll_pb_block(); } @@ -682,7 +687,7 @@ Status VDataStreamSender::try_close(RuntimeState* state, Status exec_status) { _serializer.reset_block(); Status final_st = Status::OK(); for (int i = 0; i < _channels.size(); ++i) { - Status st = _channels[i]->close(state); + Status st = _channels[i]->close(state, exec_status); if (!st.ok() && final_st.ok()) { final_st = st; } @@ -711,7 +716,7 @@ Status VDataStreamSender::close(RuntimeState* state, Status exec_status) { status = channel->send_local_block(&block); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - status = channel->send_block(_cur_pb_block, false); + status = channel->send_remote_block(_cur_pb_block, false); } HANDLE_CHANNEL_STATUS(state, channel, status); } @@ -719,7 +724,7 @@ Status VDataStreamSender::close(RuntimeState* state, Status exec_status) { } } for (int i = 0; i < _channels.size(); ++i) { - Status st = _channels[i]->close(state); + Status st = _channels[i]->close(state, exec_status); if (!st.ok() && final_st.ok()) { final_st = st; } diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 1698e5e564..dfade831ea 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -273,25 +273,26 @@ public: // Returns the status of the most recently finished transmit_data // rpc (or OK if there wasn't one that hasn't been reported yet). // if batch is nullptr, send the eof packet - virtual Status send_block(PBlock* block, bool eos = false); + virtual Status send_remote_block(PBlock* block, bool eos = false, + Status exec_status = Status::OK()); - virtual Status send_block(BroadcastPBlockHolder* block, [[maybe_unused]] bool* sent, - bool eos = false) { + virtual Status send_broadcast_block(BroadcastPBlockHolder* block, [[maybe_unused]] bool* sent, + bool eos = false) { return Status::InternalError("Send BroadcastPBlockHolder is not allowed!"); } virtual Status add_rows(Block* block, const std::vector<int>& row, bool eos); - virtual Status send_current_block(bool eos); + virtual Status send_current_block(bool eos, Status exec_status); - Status send_local_block(bool eos = false); + Status send_local_block(Status exec_status, bool eos = false); Status send_local_block(Block* block); // Flush buffered rows and close channel. This function don't wait the response // of close operation, client should call close_wait() to finish channel's close. // We split one close operation into two phases in order to make multiple channels // can run parallel. - Status close(RuntimeState* state); + Status close(RuntimeState* state, Status exec_status); // Get close wait's response, to finish channel close operation. Status close_wait(RuntimeState* state); @@ -362,7 +363,7 @@ protected: // Serialize _batch into _thrift_batch and send via send_batch(). // Returns send_batch() status. Status send_current_batch(bool eos = false); - Status close_internal(); + Status close_internal(Status exec_status); Parent* _parent; @@ -476,7 +477,8 @@ public: // Returns the status of the most recently finished transmit_data // rpc (or OK if there wasn't one that hasn't been reported yet). // if batch is nullptr, send the eof packet - Status send_block(PBlock* block, bool eos = false) override { + Status send_remote_block(PBlock* block, bool eos = false, + Status exec_status = Status::OK()) override { COUNTER_UPDATE(Channel<Parent>::_parent->blocks_sent_counter(), 1); std::unique_ptr<PBlock> pblock_ptr; pblock_ptr.reset(block); @@ -489,13 +491,13 @@ public: } } if (eos || block->column_metas_size()) { - RETURN_IF_ERROR(_buffer->add_block({this, std::move(pblock_ptr), eos})); + RETURN_IF_ERROR(_buffer->add_block({this, std::move(pblock_ptr), eos, exec_status})); } return Status::OK(); } - Status send_block(BroadcastPBlockHolder* block, [[maybe_unused]] bool* sent, - bool eos = false) override { + Status send_broadcast_block(BroadcastPBlockHolder* block, [[maybe_unused]] bool* sent, + bool eos = false) override { COUNTER_UPDATE(Channel<Parent>::_parent->blocks_sent_counter(), 1); if (eos) { if (_eos_send) { @@ -520,19 +522,20 @@ public: RETURN_IF_ERROR(Channel<Parent>::_serializer.next_serialized_block( block, _pblock.get(), 1, &serialized, eos, &rows)); if (serialized) { - RETURN_IF_ERROR(send_current_block(eos)); + Status exec_status = Status::OK(); + RETURN_IF_ERROR(send_current_block(eos, exec_status)); } return Status::OK(); } // send _mutable_block - Status send_current_block(bool eos) override { + Status send_current_block(bool eos, Status exec_status) override { if (Channel<Parent>::is_local()) { - return Channel<Parent>::send_local_block(eos); + return Channel<Parent>::send_local_block(exec_status, eos); } SCOPED_CONSUME_MEM_TRACKER(Channel<Parent>::_parent->mem_tracker()); - RETURN_IF_ERROR(send_block(_pblock.release(), eos)); + RETURN_IF_ERROR(send_remote_block(_pblock.release(), eos, exec_status)); return Status::OK(); } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 878544e74b..c757315f0d 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -49,6 +49,7 @@ message PTransmitDataParams { // transfer the RowBatch to the Controller Attachment optional bool transfer_by_attachment = 10 [default = false]; optional PUniqueId query_id = 11; + optional PStatus exec_status = 12; }; message PTransmitDataResult { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org