This is an automated email from the ASF dual-hosted git repository. mrhhsg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit d89033d7846058e94d0fccea734e3c9a7668063d Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Mon Sep 9 17:28:19 2024 +0800 [opt](spill) avoid query blocked when reserving failed (#40550) 1. Trigger revoking proactively when revocable size is big and reserving failed. 2. Set the brpc timeout as the query timeout. ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> --- be/src/pipeline/common/agg_utils.h | 10 +++++---- be/src/pipeline/exec/exchange_sink_buffer.cpp | 5 ++++- be/src/pipeline/exec/exchange_sink_buffer.h | 5 +++-- be/src/pipeline/exec/exchange_sink_operator.cpp | 4 ++-- be/src/pipeline/exec/hashjoin_build_sink.cpp | 13 ++++++----- .../exec/partitioned_aggregation_sink_operator.cpp | 25 +++++++++++----------- .../exec/partitioned_aggregation_sink_operator.h | 21 +++++++++++++++--- be/src/pipeline/pipeline_task.cpp | 16 ++++++++++++-- be/src/vec/sink/vdata_stream_sender.cpp | 7 +++++- 9 files changed, 74 insertions(+), 32 deletions(-) diff --git a/be/src/pipeline/common/agg_utils.h b/be/src/pipeline/common/agg_utils.h index e0435954b8b..d67ebb9fdf7 100644 --- a/be/src/pipeline/common/agg_utils.h +++ b/be/src/pipeline/common/agg_utils.h @@ -275,15 +275,17 @@ public: using IteratorBase<ConstIterator, true>::IteratorBase; }; - ConstIterator begin() const { return ConstIterator(this, 0); } + ConstIterator begin() const { return {this, 0}; } ConstIterator cbegin() const { return begin(); } - Iterator begin() { return Iterator(this, 0); } + Iterator begin() { return {this, 0}; } - ConstIterator end() const { return ConstIterator(this, _total_count); } + ConstIterator end() const { return {this, _total_count}; } ConstIterator cend() const { return end(); } - Iterator end() { return Iterator(this, _total_count); } + Iterator end() { return {this, _total_count}; } + + [[nodiscard]] uint32_t total_count() const { return _total_count; } void init_once() { if (_inited) { diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 9a4764a24b9..5027d7c10de 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -87,7 +87,7 @@ void BroadcastPBlockHolderMemLimiter::release(const BroadcastPBlockHolder& holde namespace pipeline { ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id, - int be_number, RuntimeState* state, + PlanNodeId node_id, int be_number, RuntimeState* state, ExchangeSinkLocalState* parent) : HasTaskExecutionCtx(state), _queue_capacity(0), @@ -95,6 +95,7 @@ ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_ _query_id(query_id), _dest_node_id(dest_node_id), _sender_id(send_id), + _node_id(node_id), _be_number(be_number), _state(state), _context(state->get_query_ctx()), @@ -408,6 +409,8 @@ void ExchangeSinkBuffer::_ended(InstanceLoId id) { } void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) { + LOG(INFO) << "send rpc failed, instance id: " << id << ", _dest_node_id: " << _dest_node_id + << ",_sender_id: " << _sender_id << ", node id: " << _node_id << ", err: " << err; _is_finishing = true; _context->cancel(Status::Cancelled(err)); std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]); diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 2d30a492a0d..d43d275cba6 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -177,8 +177,8 @@ struct ExchangeRpcContext { // Each ExchangeSinkOperator have one ExchangeSinkBuffer class ExchangeSinkBuffer final : public HasTaskExecutionCtx { public: - ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id, int be_number, - RuntimeState* state, ExchangeSinkLocalState* parent); + ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id, PlanNodeId node_id, + int be_number, RuntimeState* state, ExchangeSinkLocalState* parent); ~ExchangeSinkBuffer() override = default; void register_sink(TUniqueId); @@ -235,6 +235,7 @@ private: PlanNodeId _dest_node_id; // Sender instance id, unique within a fragment. StreamSender save the variable int _sender_id; + PlanNodeId _node_id; int _be_number; std::atomic<int64_t> _rpc_count = 0; RuntimeState* _state = nullptr; diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 59d2d5f0551..9e17a76d272 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -132,8 +132,8 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { PUniqueId id; id.set_hi(_state->query_id().hi); id.set_lo(_state->query_id().lo); - _sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, p._dest_node_id, _sender_id, - _state->be_number(), state, this); + _sink_buffer = std::make_unique<ExchangeSinkBuffer>( + id, p._dest_node_id, _sender_id, _parent->node_id(), _state->be_number(), state, this); register_channels(_sink_buffer.get()); _queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 9bf6c422af4..ae724549f71 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -123,11 +123,6 @@ size_t HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state) { if (!_build_side_mutable_block.empty()) { size_to_reserve += _build_side_mutable_block.allocated_bytes(); - - // estimating for serialized key - for (auto id : _build_col_ids) { - size_to_reserve += _build_side_mutable_block.get_column_by_position(id)->byte_size(); - } } const size_t rows = _build_side_mutable_block.rows() + state->batch_size(); @@ -143,6 +138,14 @@ size_t HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state) { } size_to_reserve += _evaluate_mem_usage; + if (size_to_reserve > 2L * 1024 * 1024 * 1024) [[unlikely]] { + LOG(INFO) << "**** too big reserve size: " << size_to_reserve << ", rows: " << rows + << ", bucket_size: " << bucket_size + << ", mutable block size: " << _build_side_mutable_block.allocated_bytes() + << ", mutable block cols: " << _build_side_mutable_block.columns() + << ", _build_col_ids.size: " << _build_col_ids.size(); + } + return size_to_reserve; } diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 5c26cfb6b97..17eeb8039df 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -241,6 +241,7 @@ size_t PartitionedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state) { } Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { + const auto size_to_revoke = _parent->revocable_mem_size(state); VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node " << Base::_parent->node_id() << " revoke_memory, size: " << _parent->revocable_mem_size(state) @@ -278,7 +279,7 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { state->get_query_ctx()->increase_revoking_tasks_count(); auto spill_runnable = std::make_shared<SpillRunnable>( state, _shared_state->shared_from_this(), - [this, &parent, state, query_id, submit_timer] { + [this, &parent, state, query_id, size_to_revoke, submit_timer] { DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_cancel", { auto st = Status::InternalError( "fault_inject partitioned_agg_sink " @@ -312,17 +313,17 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { }}; auto* runtime_state = _runtime_state.get(); auto* agg_data = parent._agg_sink_operator->get_agg_data(runtime_state); - Base::_shared_state->sink_status = - std::visit(vectorized::Overload { - [&](std::monostate& arg) -> Status { - return Status::InternalError("Unit hash table"); - }, - [&](auto& agg_method) -> Status { - auto& hash_table = *agg_method.hash_table; - RETURN_IF_CATCH_EXCEPTION(return _spill_hash_table( - state, agg_method, hash_table, _eos)); - }}, - agg_data->method_variant); + Base::_shared_state->sink_status = std::visit( + vectorized::Overload { + [&](std::monostate& arg) -> Status { + return Status::InternalError("Unit hash table"); + }, + [&](auto& agg_method) -> Status { + auto& hash_table = *agg_method.hash_table; + RETURN_IF_CATCH_EXCEPTION(return _spill_hash_table( + state, agg_method, hash_table, size_to_revoke, _eos)); + }}, + agg_data->method_variant); RETURN_IF_ERROR(Base::_shared_state->sink_status); Base::_shared_state->sink_status = parent._agg_sink_operator->reset_hash_table(runtime_state); diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 4a6abb32ed4..65c1cefa63b 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -23,6 +23,7 @@ #include "pipeline/exec/operator.h" #include "vec/exprs/vectorized_agg_fn.h" #include "vec/exprs/vexpr.h" +#include "vec/spill/spill_stream.h" #include "vec/spill/spill_stream_manager.h" namespace doris::pipeline { @@ -57,7 +58,7 @@ public: }; template <typename HashTableCtxType, typename HashTableType> Status _spill_hash_table(RuntimeState* state, HashTableCtxType& context, - HashTableType& hash_table, bool eos) { + HashTableType& hash_table, const size_t size_to_revoke, bool eos) { Status status; Defer defer {[&]() { if (!status.ok()) { @@ -69,8 +70,22 @@ public: Base::_shared_state->in_mem_shared_state->aggregate_data_container->init_once(); - static int spill_batch_rows = 4096; - int row_count = 0; + const auto total_rows = + Base::_shared_state->in_mem_shared_state->aggregate_data_container->total_count(); + + const size_t size_to_revoke_ = std::max<size_t>(size_to_revoke, 1); + + // `spill_batch_rows` will be between 4k and 1M + // and each block to spill will not be larger than 32MB(`MAX_SPILL_WRITE_BATCH_MEM`) + const auto spill_batch_rows = std::min<size_t>( + 1024 * 1024, + std::max<size_t>(4096, vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM * + total_rows / size_to_revoke_)); + + VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() + << ", spill_batch_rows: " << spill_batch_rows << ", total rows: " << total_rows + << ", size_to_revoke: " << size_to_revoke; + size_t row_count = 0; std::vector<TmpSpillInfo<typename HashTableType::key_type>> spill_infos( Base::_shared_state->partition_count); diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index e27d73be62e..1bd74bcab45 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -375,7 +375,8 @@ Status PipelineTask::execute(bool* eos) { _root->reset_reserve_mem_size(_state); DCHECK_EQ(_root->get_reserve_mem_size(_state), 0); - if (reserve_size > 0) { + auto workload_group = _state->get_query_ctx()->workload_group(); + if (workload_group && reserve_size > 0) { auto st = thread_context()->try_reserve_memory(reserve_size); if (!st.ok()) { VLOG_DEBUG << "query: " << print_id(query_id) @@ -384,7 +385,18 @@ Status PipelineTask::execute(bool* eos) { << ", sink name: " << _sink->get_name() << ", node id: " << _sink->node_id() << " failed: " << st.to_string() << ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str(); - { + bool is_high_wartermark = false; + bool is_low_wartermark = false; + workload_group->check_mem_used(&is_low_wartermark, &is_high_wartermark); + if (is_low_wartermark || is_high_wartermark) { + /// The larger reserved memory size is likely due to a larger available revocable size. + /// If the available memory for revoking is large enough, here trigger revoking proactively. + if (_sink->revocable_mem_size(_state) > 512L * 1024 * 1024) { + LOG(INFO) << "query: " << print_id(query_id) + << " has big memory to revoke."; + RETURN_IF_ERROR(_sink->revoke_memory(_state)); + } + _memory_sufficient_dependency->block(); _state->get_query_ctx()->get_pipe_exec_scheduler()->add_paused_task(this); continue; diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index f18467fbad9..b6bfc7862dc 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -115,7 +115,12 @@ Status Channel<Parent>::open(RuntimeState* state) { _brpc_request->set_sender_id(_parent->sender_id()); _brpc_request->set_be_number(_be_number); - _brpc_timeout_ms = std::min(3600, state->execution_timeout()) * 1000; + const auto& query_options = state->query_options(); + if (query_options.__isset.query_timeout) { + _brpc_timeout_ms = query_options.query_timeout * 1000; + } else { + _brpc_timeout_ms = std::min(3600, state->execution_timeout()) * 1000; + } _serializer.set_is_local(_is_local); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org